版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!)
https://blog.csdn.net/yinwenjie/article/details/51336165
目录(?)[+]
接上文:《架构设计:系统间通信(29)——Kafka及场景应用(中2)》
4-5、Kafka原理:消费者
作为Apache Kafka消息队列,它的性能指标相当一部分取决于消费者们的性能——只要消息能被快速消费掉不在Broker端形成拥堵,整个Apache Kafka就不会出现性能瓶颈问题。
4-5-1、基本使用
我们首先使用Kafka Client For JAVA API为各位读者演示一下最简单的Kafka消费者端的使用。以下示例代码可以和上文中所给出的生产者代码相对应,形成一个完整的消息创建——接收——发送过程:
package kafkaTQ;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class KafkaConsumer_GroupOne {
public static void main(String[] args) throws RuntimeException {
// ==============首先各种连接属性
// Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs
// 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)
// 这里我们设置几个关键属性
Properties props = new Properties();
// zookeeper相关的,如果有多个zk节点,这里以“,”进行分割
props.put("zookeeper.connect", "192.168.61.140:2181");
props.put("zookeeper.connection.timeout.ms", "10000");
// 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。
// 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样
String groupname = "group2";
props.put("group.id", groupname);
//==============
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// 我们只创建一个消费者
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("my_topic2", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
// 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition
// 但是一个partition只能分配到一个消费线程去
KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
new Thread(new ConsumerThread(stream)).start();
// 接着锁住主线程,让其不退出
synchronized (KafkaConsumer_GroupTwo.class) {
try {
KafkaConsumer_GroupTwo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
private static class ConsumerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;
public ConsumerThread(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> iterator = this.stream.iterator();
//============这个消费者获取的数据在这里
while(iterator.hasNext()){
MessageAndMetadata<byte[], byte[]> message = iterator.next();
int partition = message.partition();
String topic = message.topic();
String messageT = new String(message.message());
System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]");
}
}
}
}
以上代码片段有几个关键点需要进行一下说明:
“map.put(“my_topic2”, 1);” 这句代码表示将会为名叫“my_topic2”的队列创建数量为1的消费者。在一个进程的连接中,您可以指定创建多个topic的消费者数量。例如:
......
# 为my_topic2的队列创建数量为1的消费者
# 并且为my_topic3的队列创建数量为4的消费者
map.put("my_topic2", 1);
map.put("my_topic3", 4);
......
每一个消费者都需要一个独立的线程进行工作。您可以将工作任务放入已经创建好的线程池(推荐这样做),也可以像以上代码示例中那样创建一个线程并运行任务。
......
# 使用线程池
# 这里的参数就是消费者的总数量
ExecutorService threadPool = Executors.newFixedThreadPool(1);
threadPool.execute(new ConsumerThread(stream));
......
在开发过程中,消费者端无需知道任何一个Broker的位置。但是必须至少知道一个zookeeper服务节点的位置。通过这个位置,消费者端首先会去zookeeper服务上查找指定的topic的分区情况和已有的消费者情况。
4-5-2、分区与消费者负载
Apache Kafka集群中的消费者以线程为单位,如在上一小节代码段所示:我们在一个进程中,为Topic为“my_topic2”的队列创建了一个线程,这个线程就是一个消费者——属于名为“group2”的用户组。这时,Topic中所有分区的消息都会交给这个消费线程进行消费。如下图所示:
虽然一个消费者可以同时消费Topic中多个分区(Partition)的消息,但在生产环境下为了获得更优的消费性能并不建议这样做。由于单个消费者线程的处理能力是有限的,一旦出现数据洪峰,消息就会堆积在Broker端无法被处理(如果消费者端使用了线程池,则可能堆积在消费者端,这要看您怎么编写代码)。所以上一个小节那样的消费者编码方式,最多就是用来做做“Hello World”那样的示例,没有更多的使用价值了。
4-5-3、优化 一:
第一种改进方法,就是让一个消费者只消费一个分区(Partition)中的消息,且整个系统中的消费者大于等于Topic中的分区数量。设计方案如下:
如上图所示,这个Topic下一共有四个分区(Partition),对应的消费者数量也有四个,但是这四个消费者同属于一个进程,存在于同一个物理节点上。我们根据这个设计方案,更改之前消费者端的代码,如下(为了节约篇幅,只给出主要的更改位置):
......
// 后续创建的所有消费者线程,都是属于group2的消费组
String groupname = "group2";
props.put("group.id", groupname);
......
// 在这个进程中,为topic名为my_topic2的队列创建了四个消费者
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("my_topic2", 4);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
......
// 为这四个消费者分配四个不同的线程
// 消费者线程1
KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
new Thread(new ConsumerThread(stream)).start();
// 消费者线程2
stream = topicMessageStreams.get("my_topic2").get(1);
new Thread(new ConsumerThread(stream)).start();
// 消费者线程3
stream = topicMessageStreams.get("my_topic2").get(2);
new Thread(new ConsumerThread(stream)).start();
// 消费者线程4
stream = topicMessageStreams.get("my_topic2").get(3);
new Thread(new ConsumerThread(stream)).start();
......
// 接着锁住主线程,让其不退出
synchronized (KafkaConsumer_GroupTwo.class) {
try {
KafkaConsumer_GroupTwo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
......
4-5-4、优化 二:
显然“优化方案一”中的做法虽然实现了4消费者分别对应4个分区的负载均衡方案,但是受限于单个物理节点的处理性能,所以这种方案的处理性能还有进一步优化的可能。我们可以在多个节点物理节点上均匀散步这些消费者,对Topic分区中的消息进行一一对应的消费,如下图所示:
上图所示的设计思路中,我们使用了2个物理节点完成消息的消费任务,每个服务节点上开启的消费进程上有两个消费者线程。这样Topic中4个分区的消息就会被均匀分布到2个物理节点中,且每一个物理节点处理两个分区中的消息。
注意:可能您在分别启动这些消费进程的时候,由于时间上存在差异,某一台服务节点上的消费进程将暂时被分配多个分区进行消息接收。但没有关系,当这个消费者性能到达瓶颈,分区中的消息出现拥堵的时候,这个分区就会被新的消费者所代替,直到10个消费者线程分别和10个分区建立一一对应关系为止