一 安装PHP扩展
1 rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka 自定义目录 下载扩展并且安装
git clone https://github.com/edenhill/librdkafka.gitcd librdkafka ## 找到自己的安装路径./configuremake && make install
2 安装 php-rdkafka 扩展
git clone https://github.com/arnaud-lb/php-rdkafka.gitcd php-rdkafka ## 找到自己的安装路径/www/server/php/72/bin/phpize ## 这里根据自己安装PHP的路径执行./configure --with-php-config=/www/server/php/72/bin/php-config ## 这里根据自己安装的PHP版本情况填写路径make && make install
3 在PHP配置文件 php-ini 加上
extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so ##找到自己使用的PHP路径 如果安装多个版本的PHP注意要找到项目使用的PHP版本
重启 php-fpm,查看phpinfo() 或者再命令行执行 php -m 查看扩展是否安装成功
如果php-m找不到扩展 则需在安装的PHP下找到 /www/server/php/72/etc php-cli.ini (此路径为我的PHP安装路径 需要找到你自己的PHP安装路径) 在此配置文件最下端添加extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so
三 安装 Kafka 服务 开放端口 kafka默认9092端口
1 直接到 kafka官网 , 下载最新的
下载后放在服务器项目的自定目录下 解压 可放在/www/server/ 目录下(Linux服务器)
2 使用命令下载
命令 如没有wget命令则需yum安装wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz解压,进入目录tar -zxvf kafka_2.13-2.5.0.tgz cd kafka_2.13-2.5.0
四 启动Kafka服务(在放置kafka文件的根目录下执行命令)
1 使用安装包中的脚本启动单节点 Zookeeper 实例 (新版本的kafka已无需执行此命令)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2 使用 kafka-server-start.sh 启动 kafka 服务
bin/kafka-server-start.sh config/server.properties //普通启动 bin/kafka-server-start.sh -daemon ./config/server.properties //项目上线后需要以守护进程的方式启动
3 创建topic:
bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test发消息: bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test>hello>world监听消息:bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
五 PHP 使用Kafka
1创建一个生产者类
class KafkaProducer{ public static $brokerList = '127.0.0.1:9092'; public static function send($message, $topic) { self::producer($message, $topic); } public static function producer($message, $topic = 'test') { $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', self::$brokerList); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic($topic); $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); $producer->poll(0); $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); } }}
2 创建一个消费类
class KafkaConsumer{ public static $brokerList = '127.0.0.1:9092'; public static function consumer() { $conf = new \RdKafka\Conf(); $conf->set('group.id', 'test'); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic('test', $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } }}
问题汇总
No Java runtime present, requesting install
因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装
创建 topic 出现:Replication factor: 1 larger than available brokers: 0
意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了
如果安装多个PHP版本 配置信息需要配置到项目使用的PHP版本
来源地址:https://blog.csdn.net/weixin_39104010/article/details/125313440