消息传递系统使多个应用程序可以相互收发数据,不用担心数据的传输和共享。点到点和发布者-订阅者是两种广泛使用的消息传递系统。在点到点模型中,发送方将数据推送到队列,接收方从队列中弹出数据,就像遵循先进先出(FIFO)原则的标准队列系统一样。此外,一旦读取数据,数据就被删除,并且每次只允许一个接收方。接收方读取消息时不存在时间依赖关系。
图1. 点对点消息系统
在发布者-订阅者模型中,发送方称为发布者,接收方称为订阅者。在这种模型中,多个发送方和接收方可以同时读取或写入数据。但是它有时间依赖关系。消费者必须在一定的时间之前消费消息,因为此后消息被删除,即使它没有被读取。这个时间限制可能是一天、一周或一个月,视用户的配置而定。
图2. 发布者-订阅者消息系统
一、Kafka的架构
Kafka架构由几个关键组件组成:
1. 主题
2. 分区
3. 代理
4. 生产者
5. 消费者
6. Kafka集群
7. Zookeeper
图3. Kafka的架构
不妨简单了解一下每个组件。
Kafka将消息存储在不同的主题中。主题是一个组,含有特定类别的消息。它类似数据库中的表。主题由名称作为唯一标识符。不能创建名称相同的两个主题。
主题进一步划分为分区。这些分区的每个记录都与一个名为Offset的唯一标识符相关联,该标识符表示了记录在该分区中的位置。
除此之外,系统中还有生产者和消费者。生产者使用Producing API编写或发布主题中的数据。这些生产者可以在主题或分区层面写入数据。
消费者使用Consumer API从主题中读取或消费数据。它们还可以在主题或分区层面读取数据。执行类似任务的消费者将组成一个组,名为消费者组。
还有其他系统,比如代理(Broker)和Zookeeper,它们在Kafka服务器的后台运行。代理是维护和保存已发布消息记录的软件。它还负责使用offset以正确的顺序将正确的消息传递给正确的使用者。相互之间进行集体通信的代理集可以称为Kafka集群。代理可以动态添加到Kafka集群中或从集群中动态删除,系统不会遇到任何停机。Kafka集群中的其中一个代理名为控制器。它负责管理集群内的状态和副本,并执行管理任务。
另一方面,Zookeeper负责维护Kafka集群的健康状态,并与该集群的每个代理进行协调。它以键值对的形式维护每个集群的元数据。
本教程主要介绍实际实现Apache Kafka的例子。
二、出租车预订应用程序:实际用例
以优步之类的出租车预订服务这一用例为例。这个应用程序使用Apache Kafka通过各种服务(比如事务、电子邮件、分析等)发送和接收消息。
图4出租车应用程序架构图
架构由几个服务组成。Rides服务接收来自客户的打车请求,并将打车详细信息写入到Kafka消息系统上。
然后,Transaction服务读取这些订单详细信息,确认订单和支付状态。在确认这趟打车之后,该Transaction服务将再次在消息系统中写入确认的打车信息,并添加一些额外的详细信息。最后,电子邮件或数据分析等其他服务读取已确认的打车细节,并向客户发送确认邮件,并对其进行一些分析。
我们可以以非常高的吞吐量和极小的延迟实时执行所有这些进程。此外,由于Apache Kafka能够横向扩展,我们可以扩展这个应用程序以处理数百万用户。
三、上述用例的实际实现
本节包含在我们的应用程序中实现Kafka消息系统的快速教程。它包括下载和配置Kafka、创建生产者-消费者函数的步骤。
注意:本教程基于Python编程语言,使用Windows机器。
1.Apache Kafka下载步骤
1)从这个链接(https://kafka.apache.org/downloads)下载最新版本的Apache Kafka。Kafka基于JVM语言,所以必须在系统中安装Java 7或更高版本。
2) 从计算机的C:驱动器解压已下载的zip文件,并将文件夹重命名为/apache-kafka。
3)父目录包含两个子目录:/bin和/config,分别含有Zookeeper和Kafka服务器的可执行文件和配置文件。
2.配置步骤
首先,我们需要为Kafka和Zookeeper服务器创建日志目录。这些目录将存储这些集群的所有元数据以及主题和分区的消息。
注意:默认情况下,这些日志目录创建在/tmp目录中,这是一个易变目录:当系统关闭或重启时,该目录中的所有数据都会消失。我们需要为日志目录设置永久路径来解决这个问题。不妨看看怎么做。
导航到apache-kafka >> config,打开server.properties文件。在这里您可以配置Kafka的许多属性,比如日志目录路径、日志保留时间和分区数量等。
在server.properties文件中,我们必须将日志目录文件的路径从临时/tmp目录改为永久目录。日志目录含有Kafka Server中的生成或写入的数据。若要更改路径,将log.dirs变量由/tmp/kafka-logs改为c:/apache-kafka/kafka-logs。这将使您的日志永久存储。
Zookeeper服务器还包含一些日志文件,用于存储Kafka服务器的元数据。若要更改路径,重复上面的步骤,即打开zookeeper.properties文件,并按如下方式替换路径。
该Zookeeper服务器将充当Kafka服务器的资源管理器。
四、运行Kafka和Zookeeper服务器
若要运行Zookeeper服务器,在父目录中打开一个新的cmd提示符,并运行以下命令。
图5
保持Zookeeper实例运行。
若要运行Kafka服务器,打开一个单独的cmd提示符,并执行以下代码:
保持Kafka和Zookeeper服务器运行;在下一节中,我们将创建生产者和消费者函数,它们用于读取数据并将数据写入到Kafka服务器。
五、创建生产者和消费者函数
为了创建生产者和消费者函数,我们将以前面讨论的电子商务应用程序为例。“订单”服务将充当生产者,将订单细节写入到Kafka服务器,而电子邮件和分析服务将充当消费者,从服务器读取该数据。交易服务将充当消费者和生产者。它读取订单详细信息,并在交易确认后再次将它们写回来。
但首先我们需要安装Kafka Python库,该库含有生产者和消费者的内置函数。
现在,创建一个名为kafka-tutorial的新目录。我们将在该目录中创建含有所需函数的Python文件。
生产者函数:
现在,创建一个名为' rides.py '的Python文件,并将以下代码粘贴到其中。
解释:
首先,我们导入了所有必要的库,包括Kafka。然后,定义主题名称和各项目的列表。记住,主题是一个含有类似类型消息的组。在本例中,该主题将包含所有订单。
然后,我们创建一个KafkaProducer函数的实例,并将其连接到在localhost:9092上运行的Kafka服务器。如果您的Kafka服务器在不同的地址和端口上运行,那么您必须在那里提及服务器的IP和端口号。
之后,我们将生成一些JSON格式的订单,并根据定义的话题名称将它们写入到Kafka服务器。睡眠函数用于生成后续订单之间的间隔。
消费者函数:
解释:
transaction.py文件用于确认用户所做的交易,并为他们分配司机和估计的载客时间。它从Kafka服务器读取打车细节,并在确认打车后将其再次写入到Kafka服务器。
现在,创建两个名为email.py和analysis .py的Python文件,分别用于向客户发送电子邮件以确认打车和执行一些分析。创建这些文件只是为了表明甚至多个消费者都可以同时从Kafka服务器读取数据。
现在,我们已完成了应用程序。在下一节中,我们将同时运行所有服务并检查性能。
六、测试应用程序
在四个单独的命令提示符中逐一运行每个文件。
图6
当打车详细信息被推送到服务器时,您可以同时接收来自所有文件的输出。您还可以通过删除rides.py文件中的延迟函数来提高处理速度。'rides.py'文件将数据推送到Kafka服务器,另外三个文件同时从Kafka服务器读取数据,并发挥相应的作用。
但愿您对Apache Kafka以及如何实现它已有了基本的了解。
七、结语
我们在本文中了解了Apache Kafka工作原理及实际实现该架构的出租车预订应用程序用例。使用Kafka设计一条可扩展的管道需要认真计划和实施。您可以增加代理和分区的数量,提高这些应用程序的可扩展性。每个分区都独立处理,这样负载可以在它们之间予以分配。此外,您还可以通过设置缓存大小、缓冲区大小或线程数量来优化Kafka配置。
本文中使用的完整代码的GitHub链接如下:https://github.com/aryan0141/apache-kafka-tutorial/tree/master。
原文链接:https://www.kdnuggets.com/2023/04/build-scalable-data-architecture-apache-kafka.html