安装kafka
通过docker
安装zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
安装kafka
docker run -d --name kafka -p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_MESSAGE_MAX_BYTES=20000000 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.50.131:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.50.131:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-e KAFKA_LOG_RETENTION_HOURS=1 \-e KAFKA_LOG_RETENTION_BYTES=10737418240 \-e KAFKA_MAX_REQUEST_SIZE=20582912 \-e KAFKA_REPLICA_FETCH_MAX_BYTES=20582912 \-e KAFKA_FETCH_MESSAGE_MAX_BYTES=20485760 \-t wurstmeister/kafka
通过k8s
增加bitnami repo仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
安装zookeeper
helm install zookeeper bitnami/zookeeper \ --set replicaCount=1 \ --set auth.enabled=false \ --set allowAnonymousLogin=true \ -n rubyruby
可以看到如下的输出信息
** Please be patient while the chart is being deployed **
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.rubyruby.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace rubyruby -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.shTo connect to your ZooKeeper server from outside the cluster execute the following commands:
kubectl port-forward --namespace rubyruby svc/zookeeper 2181:2181 &
zkCli.sh 127.0.0.1:2181
安装kafka
helm install kafka bitnami/kafka \ --set zookeeper.enabled=false \ --set replicaCount=1 \ --set externalZookeeper.servers=zookeeper \ -n rubyruby
输出信息如下
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.rubyruby.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.rubyruby.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.3.1-debian-11-r1 --namespace rubyruby --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace rubyruby -- bashPRODUCER:
kafka-console-producer.sh \--broker-list kafka-0.kafka-headless.rubyruby.svc.cluster.local:9092 \
--topic testCONSUMER:
kafka-console-consumer.sh \--bootstrap-server kafka.rubyruby.svc.cluster.local:9092 \
--topic test \
--from-beginning
php生产者消费者使用代码
set("metadata.broker.list", "kafka.rubyruby.svc.cluster.local:9092");$topicConf = new \RdKafka\TopicConf();//topic关闭自动提交$topicConf->set('auto.commit.enable', 0);$topicConf->set('auto.commit.interval.ms', 1000);//从最近消息开始消费$topicConf->set('auto.offset.reset', 'latest');//kafka topic$conf->setDefaultTopicConf($topicConf);//rdkafka 消息$rdKafkaMessage = new \RdKafka\Message();//设置rdkafka消息topic$rdKafkaMessage->topic_name = 'ruby1';$headers = [ 'event' => 'live.broadcast.send.msg', 'message_id' => md5(microtime(true) . uniqid('msg', true)) ];$payloads = [ 'uid' => "10000001", 'roomid' => "10000002", 'app' => 'rubyruby', 'msg' => "hello world", 'happened_at' => time(), ];$payload['headers'] = $headers;$payload['payload'] = $payloads;//rdkafka消息负载内容$rdKafkaMessage->payload = json_encode($payload);$rdKafkaMessage->timestamp = time();//消息key$rdKafkaMessage->key = md5(microtime(true) . uniqid('key', true));//生产者$producer = new \RdKafka\Producer($conf);//拿到topic$topic = $producer->newTopic($rdKafkaMessage->topic_name);//topic产生消息$topic->producev(RD_KAFKA_PARTITION_UA, 0, json_encode($payload), $rdKafkaMessage->key, $headers);//拿产生消息的结果$result = $producer->flush(1000);if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { echo "send success";}//rdkafka的消费组$conf->set("group.id", "ruby-live");//消费者重定分区$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: $kafka->assign(null); break; default: throw new Exception($err); }});//kafka消费者对象$consumer = new \RdKafka\KafkaConsumer($conf);//自动安排分区$consumer->subscribe([$rdKafkaMessage->topic_name]);//主动设置分区// $consumer->assign([// new \RdKafka\TopicPartition("zzy8", 0),// new \RdKafka\TopicPartition("zzy8", 1),// ]);//拿消息$kafkaMessage = $consumer->consume(1000);echo $kafkaMessage->err . PHP_EOL;if($kafkaMessage->err == RD_KAFKA_RESP_ERR_NO_ERROR){ print_r(json_decode($rdKafkaMessage->payload, true));}else if($kafkaMessage->err == RD_KAFKA_RESP_ERR__TIMED_OUT){ echo "连接超时";}//提交确认消息try { $consumer->commitAsync($kafkaMessage); // $consumer->commit($kafkaMessage);} catch (Exception $e) { throw new MessageAcknowledgeException($message, 'commit kafka message failed', $e);}//结束消息监听$consumer->unsubscribe();
来源地址:https://blog.csdn.net/fanghailiang2016/article/details/127275062