文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

php rdkafka操作kafka消息队列——k8s从入门到高并发系列教程(十七)

2023-10-01 22:35

关注

安装kafka

通过docker

安装zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

安装kafka

docker run  -d --name kafka -p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_MESSAGE_MAX_BYTES=20000000 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.50.131:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.50.131:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-e KAFKA_LOG_RETENTION_HOURS=1 \-e KAFKA_LOG_RETENTION_BYTES=10737418240 \-e KAFKA_MAX_REQUEST_SIZE=20582912 \-e KAFKA_REPLICA_FETCH_MAX_BYTES=20582912 \-e KAFKA_FETCH_MESSAGE_MAX_BYTES=20485760 \-t wurstmeister/kafka

通过k8s

增加bitnami repo仓库

helm repo add bitnami https://charts.bitnami.com/bitnami

安装zookeeper

helm install zookeeper bitnami/zookeeper \  --set replicaCount=1 \  --set auth.enabled=false \  --set allowAnonymousLogin=true \  -n rubyruby

可以看到如下的输出信息

** Please be patient while the chart is being deployed **

ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:

zookeeper.rubyruby.svc.cluster.local

To connect to your ZooKeeper server run the following commands:

export POD_NAME=$(kubectl get pods --namespace rubyruby -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh

To connect to your ZooKeeper server from outside the cluster execute the following commands:

kubectl port-forward --namespace rubyruby svc/zookeeper 2181:2181 &
zkCli.sh 127.0.0.1:2181

安装kafka

helm install kafka bitnami/kafka \  --set zookeeper.enabled=false \  --set replicaCount=1 \  --set externalZookeeper.servers=zookeeper \  -n rubyruby

输出信息如下 

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.rubyruby.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.rubyruby.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.3.1-debian-11-r1 --namespace rubyruby --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace rubyruby -- bash

    PRODUCER:
        kafka-console-producer.sh \

            --broker-list kafka-0.kafka-headless.rubyruby.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \

            --bootstrap-server kafka.rubyruby.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

 php生产者消费者使用代码

set("metadata.broker.list", "kafka.rubyruby.svc.cluster.local:9092");$topicConf = new \RdKafka\TopicConf();//topic关闭自动提交$topicConf->set('auto.commit.enable', 0);$topicConf->set('auto.commit.interval.ms', 1000);//从最近消息开始消费$topicConf->set('auto.offset.reset', 'latest');//kafka topic$conf->setDefaultTopicConf($topicConf);//rdkafka 消息$rdKafkaMessage = new \RdKafka\Message();//设置rdkafka消息topic$rdKafkaMessage->topic_name = 'ruby1';$headers = [  'event' => 'live.broadcast.send.msg',  'message_id' => md5(microtime(true) . uniqid('msg', true))  ];$payloads = [  'uid' => "10000001",  'roomid' => "10000002",  'app' => 'rubyruby',  'msg' => "hello world",  'happened_at' => time(),  ];$payload['headers'] = $headers;$payload['payload'] = $payloads;//rdkafka消息负载内容$rdKafkaMessage->payload = json_encode($payload);$rdKafkaMessage->timestamp = time();//消息key$rdKafkaMessage->key = md5(microtime(true) . uniqid('key', true));//生产者$producer = new \RdKafka\Producer($conf);//拿到topic$topic = $producer->newTopic($rdKafkaMessage->topic_name);//topic产生消息$topic->producev(RD_KAFKA_PARTITION_UA, 0, json_encode($payload), $rdKafkaMessage->key, $headers);//拿产生消息的结果$result = $producer->flush(1000);if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {  echo "send success";}//rdkafka的消费组$conf->set("group.id", "ruby-live");//消费者重定分区$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {    switch ($err) {        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:            $kafka->assign($partitions);            break;        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:            $kafka->assign(null);            break;        default:            throw new Exception($err);    }});//kafka消费者对象$consumer = new \RdKafka\KafkaConsumer($conf);//自动安排分区$consumer->subscribe([$rdKafkaMessage->topic_name]);//主动设置分区// $consumer->assign([// new \RdKafka\TopicPartition("zzy8", 0),// new \RdKafka\TopicPartition("zzy8", 1),// ]);//拿消息$kafkaMessage = $consumer->consume(1000);echo $kafkaMessage->err . PHP_EOL;if($kafkaMessage->err == RD_KAFKA_RESP_ERR_NO_ERROR){  print_r(json_decode($rdKafkaMessage->payload, true));}else if($kafkaMessage->err == RD_KAFKA_RESP_ERR__TIMED_OUT){  echo "连接超时";}//提交确认消息try {    $consumer->commitAsync($kafkaMessage);    // $consumer->commit($kafkaMessage);} catch (Exception $e) {    throw new MessageAcknowledgeException($message, 'commit kafka message failed', $e);}//结束消息监听$consumer->unsubscribe();

来源地址:https://blog.csdn.net/fanghailiang2016/article/details/127275062

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯