文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

PHP小白搭建Kafka环境以及初步使用rdkafka

2023-09-02 22:41

关注

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录


前言

提示:windows环境安装失败,Linux环境安装成功(以下并没有windows安装示例)

一、安装java(Kafka必须安装java,因为kafka依赖java核心)

下载地址:链接: https://www.oracle.com/java/technologies/downloads/
在这里插入图片描述
将文件放在Linux目录中后进行解压:

假设我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目录下1、tar -zxvf jdk-20_linux-x64_bin.tar.gz2、mv jdk.0.20 ./jdk3、vim /etc/profile  JAVA_HOME=/root/src/uap/web/third/jdk PATH=/root/src/uap/web/third/jdk/bin:$PATH export JAVA_HOME4、source /ect/profile5、java -version (出现下图极为成功)

在这里插入图片描述

二、安装以及配置Kafka、zookeeper

1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)

下载地址:https://kafka.apache.org/downloads
提示:不要下载带src的那个,具体我也不知道,因为我也是个小白
在这里插入图片描述

假设我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目录下1、tar -zxvf kafka_2.12-3.5.1.tgz2、mv kafka.2.12 ./kafka3、创建kafka日志文件 mkdir -p ./kafka_data/log/kafka mkdir -p ./kafka_data/log/zookeeper mkdir -p ./kafka_data/zookeeper4、cd ./kafka/configvim server.properties listeners=PLAINTEXT://localhost:9092 (34行左右,添加对应的host、port) broker.id=0 port=9092 host.name=192.168.1.241 log.dirs=/root/src/uap/web/third/kafka_data/log/kafka zookeeper.connect=localhost:2181wdvim zookeeper.properties dataDir=/root/src/uap/web/third/kafka_data/zookeeper dataLogDir=/root/src/uap/web/third/kafka_data/log/zookeeper clientPort=2181 maxClientCnxns=100 tickTimes=2000 initLimit=10 syncLimit=5wd5、cd ../ 进入kafka目录下#启动zookeeper./bin/zookeeper-server-start.sh ./config/zookeeper.properties &//如果其中报错,大部分应该是报JAVA_HOME 这个说明你没有配置 /etc/profile 上面有./bin/kafka-server-start.sh -daemon ./config/server.properties &

2.配置topid

代码如下(示例):

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt返回值:Created topic myt.  创建成功/否则失败

3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的

例如:阿里云开发者社区,php安装rdkafka教程
剩下逻辑就直接贴代码了

生产者:public function producer(){        $conf = new RdKafka\Conf();        $conf->set('metadata.broker.list', 'localhost:9092');        $producer = new RdKafka\Producer($conf);        $topic = $producer->newTopic("mytest");        //获取数据库数据,存入kafka中        $wanchk = $this->db->query("SELECT * FROM hf_alarm_wanchk");        foreach ($wanchk as $k => $v){            $topic->produce(RD_KAFKA_PARTITION_UA, 0, array2json($v));            $producer->poll(0);        }                $result = $producer->flush(10000);        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {            throw new \RuntimeException('Was unable to flush, messages might be lost!');        }        $producer->purge(RD_KAFKA_PURGE_F_QUEUE);        $producer->flush(10000);    }消费者://这个代码需要使用终端运行:// /bin/php -c /etc/php.ini  -f  /入口文件目录/index.php (类)consumer (方法)consumer public function consumer()    {        $conf = new \RdKafka\Conf();        $conf->set('group.id', 'mytest');        $rk = new \RdKafka\Consumer($conf);        $rk->addBrokers("127.0.0.1");        $topicConf = new \RdKafka\TopicConf();        $topicConf->set('auto.commit.interval.ms', 100);        $topicConf->set('offset.store.method', 'broker');        $topicConf->set('auto.offset.reset', 'smallest');        $topic = $rk->newTopic('mytest', $topicConf);        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);        while (true) {            $message = $topic->consume(0, 120 * 10000);            switch ($message->err) {                case RD_KAFKA_RESP_ERR_NO_ERROR:                    var_dump($message);                    break;                case RD_KAFKA_RESP_ERR__PARTITION_EOF:                    echo "No more messages; will wait for more\n";                    break;                case RD_KAFKA_RESP_ERR__TIMED_OUT:                    echo "Timed out\n";                    break;                default:                    throw new \Exception($message->errstr(), $message->err);                    break;            }        }    } 

来源地址:https://blog.csdn.net/a836236241/article/details/132495663

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯