文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

PHP Kafka 使用详解

2023-08-31 10:50

关注

一 安装PHP扩展
1 rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka 自定义目录 下载扩展并且安装

git clone https://github.com/edenhill/librdkafka.gitcd librdkafka ## 找到自己的安装路径./configuremake && make install

2 安装 php-rdkafka 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.gitcd php-rdkafka ## 找到自己的安装路径/www/server/php/72/bin/phpize ## 这里根据自己安装PHP的路径执行./configure --with-php-config=/www/server/php/72/bin/php-config  ## 这里根据自己安装的PHP版本情况填写路径make && make install

3 在PHP配置文件 php-ini 加上

extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so ##找到自己使用的PHP路径 如果安装多个版本的PHP注意要找到项目使用的PHP版本

重启 php-fpm,查看phpinfo() 或者再命令行执行 php -m 查看扩展是否安装成功

在这里插入图片描述
在这里插入图片描述如果php-m找不到扩展 则需在安装的PHP下找到 /www/server/php/72/etc php-cli.ini (此路径为我的PHP安装路径 需要找到你自己的PHP安装路径) 在此配置文件最下端添加extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so

三 安装 Kafka 服务 开放端口 kafka默认9092端口

1 直接到 kafka官网 , 下载最新的

下载后放在服务器项目的自定目录下 解压  可放在/www/server/ 目录下(Linux服务器)

2 使用命令下载

命令 如没有wget命令则需yum安装wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz解压,进入目录tar -zxvf kafka_2.13-2.5.0.tgz    cd kafka_2.13-2.5.0

四 启动Kafka服务(在放置kafka文件的根目录下执行命令)

1 使用安装包中的脚本启动单节点 Zookeeper 实例   (新版本的kafka已无需执行此命令)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2 使用 kafka-server-start.sh 启动 kafka 服务
 bin/kafka-server-start.sh config/server.properties //普通启动  bin/kafka-server-start.sh -daemon ./config/server.properties //项目上线后需要以守护进程的方式启动

3 创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test发消息: bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test>hello>world监听消息:bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

五 PHP 使用Kafka

1创建一个生产者类

class KafkaProducer{    public static $brokerList = '127.0.0.1:9092';    public static function send($message, $topic)    {        self::producer($message, $topic);    }    public static function producer($message, $topic = 'test')    {        $conf = new \RdKafka\Conf();        $conf->set('metadata.broker.list', self::$brokerList);        $producer = new \RdKafka\Producer($conf);        $topic = $producer->newTopic($topic);        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));        $producer->poll(0);        $result = $producer->flush(10000);        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {            throw new \RuntimeException('Was unable to flush, messages might be lost!');        }    }}

2 创建一个消费类

class KafkaConsumer{    public static $brokerList = '127.0.0.1:9092';      public static function consumer()    {        $conf = new \RdKafka\Conf();        $conf->set('group.id', 'test');        $rk = new \RdKafka\Consumer($conf);        $rk->addBrokers("127.0.0.1");        $topicConf = new \RdKafka\TopicConf();        $topicConf->set('auto.commit.interval.ms', 100);        $topicConf->set('offset.store.method', 'broker');        $topicConf->set('auto.offset.reset', 'smallest');        $topic = $rk->newTopic('test', $topicConf);        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);        while (true) {            $message = $topic->consume(0, 120*10000);            switch ($message->err) {                case RD_KAFKA_RESP_ERR_NO_ERROR:                    var_dump($message);                    break;                case RD_KAFKA_RESP_ERR__PARTITION_EOF:                    echo "No more messages; will wait for more\n";                    break;                case RD_KAFKA_RESP_ERR__TIMED_OUT:                    echo "Timed out\n";                    break;                default:                    throw new \Exception($message->errstr(), $message->err);                    break;            }        }    }}

问题汇总

No Java runtime present, requesting install

因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装

创建 topic 出现:Replication factor: 1 larger than available brokers: 0

意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了

如果安装多个PHP版本 配置信息需要配置到项目使用的PHP版本

来源地址:https://blog.csdn.net/weixin_39104010/article/details/125313440

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯