这篇文章主要介绍了Kafka集群优化的方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Kafka集群优化的方法是什么文章都会有所收获,下面我们一起来看看吧。
背景
个推作为专业的数据智能服务商,已经成功服务了数十万APP,每日的消息下发量达百亿级别,由此产生了海量日志数据。为了应对业务上的各种需求,我们需要采集并集中化日志进行计算,为此个推选用了高可用的、高可靠的、分布式的Flume系统以对海量日志进行采集、聚合和传输。此外,个推也不断对Flume进行迭代升级,以实现自己对日志的特定需求。
原有的异地机房日志汇聚方式,整个流程相对来说比较简单,A机房业务产生的日志通过多种方式写入该机房Kafka集群,然后B机房的Flume通过网络专线实时消费A机房Kafka的日志数据后写入本机房的Kafka集群,所有机房的数据就是通过相同方式在B机房Kakfa集群中集中化管理。如图一所示:
图一:原有异地日志传输模式
但是随着业务量的不断增加,日志数据在逐渐增多的过程中对带宽要求变高,带宽的瓶颈问题日益凸显。按照1G的专线带宽成本2~3w/月来计算,一个异地机房一年仅专线带宽扩容成本就高达30w以上。对此,如何找到一种成本更加低廉且符合当前业务预期的传输方案呢?Avro有快速压缩的二进制数据形式,并能有效节约数据存储空间和网络传输带宽,从而成为优选方案。
优化思路
Avro简介
Avro是一个数据序列化系统。它是Hadoop的一个子项目,也是Apache的一个独立的项目,其主要特点如下:
● 丰富的数据结构;
● 可压缩、快速的二进制数据类型;
● 可持久化存储的文件类型;
● 远程过程调用(RPC);
● 提供的机制使动态语言可以方便地处理数据。
具体可参考官方网站:http://avro.apache.org/
Flume Avro方案
Flume的RPC Source是Avro Source,它被设计为高扩展的RPC服务端,能从其他Flume Agent 的Avro Sink或者Flume SDK客户端,接收数据到Flume Agent中,具体流程如图二所示:
图二:Avro Source流程
针对该模式,我们的日志传输方案计划变更为A机房部署Avro Sink用以消费该机房Kafka集群的日志数据,压缩后发送到B机房的Avro Source,然后解压写入B机房的Kafka集群,具体的传输模式如图三所示:
图三:Flume Avro传输模式
可能存在的问题
我们预估可能存在的问题主要有以下三点:
● 当专线故障的时候,数据是否能保证完整性;
● 该模式下CPU和内存等硬件的消耗评估;
● 传输性能问题。
验证情况
针对以上的几个问题,我们做了几项对比实验。
环境准备情况说明:
两台服务器192.168.10.81和192.168.10.82,以及每台服务器上对应一个Kakfa集群,模拟A机房和B机房;
两个Kafka集群中对应topicA(源端)和topicB(目标端)。在topicA中写入合计大小11G的日志数据用来模拟原始端日志数据。
192.168.10.82上部署一个Flume,模拟原有传输方式。
192.168.10.81服务器部署Avro Sink,192.168.10.82部署Avro Source,模拟Flume Avro传输模式。
原有Flume模式验证(非Avro)
监控Kafka消费情况:
81流量统计:
82流量统计:
消费全部消息耗时:20min
消费总日志条数统计:129,748,260
总流量:13.5G
Avro模式验证
配置说明:
Avro Sink配置:
#kafkasink 是kafkatokafka的sinks的名字,可配多个,空格分开kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSourcekafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect =192.168.10.81:2181kafkatokafka.sources.kafka_dmc_bullet.topic = topicAkafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms =150000kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms =10000kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavrokafkatokafka.sources.kafka_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet的配置,可配置多个sink提高压缩传输效率kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSinkkafkatokafka.sinks.kafkasink_dmc_bullet.hostname =192.168.10.82kafkatokafka.sinks.kafkasink_dmc_bullet.port =55555//与source的rpc端口一一对应kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate//压缩模式kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level =6//压缩率1~9kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet配的channel,只配一个kafkatokafka.channels.channel_dmc_bullet.type = memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000#kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000#kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =5000kafkatokafka.channels.channel_dmc_bullet.keep-alive =60
Avro Source配置:
#kafkasink 是kafkatokafka的sinks的名字,可配多个,空格分开kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type= avrokafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.bind =0.0.0.0kafkatokafka.sources.kafka_dmc_bullet.port =55555//rpc端口绑定kafkatokafka.sources.kafka_dmc_bullet.compression-type= deflate//压缩模式kafkatokafka.sources.kafka_dmc_bullet.batchSize =100#source kafkasink_dmc_bullet的配置kafkatokafka.sinks.kafkasink_dmc_bullet.type= org.apache.flume.sink.kafka.KafkaSinkkafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitionerkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicBkafkatokafka.sinks.kafkasink_dmc_bullet.brokerList =192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =500kafkatokafka.channels.channel_dmc_bullet.type= memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =1000
监控Kafka消费情况
81流量统计:
82流量统计:
消费全部消息耗时:26min
消费总日志条数统计:129,748,260
总流量:1.69G
故障模拟
模拟专线故障,在A、B两机房不通的情况下,Avro Sink报错如下:
监控Kafka消费情况,发现消费者已停止消费:
故障处理恢复后继续消费剩余日志,经统计,总日志条数为:129,747,255。
结论
当专线发生故障时,正在网络传输中的通道外数据可能会有少部分丢失,其丢失原因为网络原因,与Avro模式无关;故障后停止消费的数据不会有任何的丢失问题,由于网络原因丢失的数据需要评估其重要性以及是否需要补传。
流量压缩率达80%以上,同时我们也测试了等级为1~9的压缩率,6跟9非常接近,CPU和内存的使用率与原有传输模式相差不大,带宽的优化效果比较明显。
传输性能由于压缩的原因适当变弱,单Sink由原先20分钟延长至26分钟,可适当增加Sink的个数来提高传输速率。
生产环境实施结果
实施结果如下:
由于还有其它业务的带宽占用,总带宽使用率节省了50%以上,现阶段高峰期带宽速率不超过400Mbps;
每个Sink传输速率的极限大概是3000条每秒,压缩传输速率问题通过增加Sink的方式解决,但会适当增加CPU和内存的损耗。
关于“Kafka集群优化的方法是什么”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Kafka集群优化的方法是什么”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网行业资讯频道。