文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何用Apache Kafka搭建可扩展的数据架构?

2024-11-30 15:34

关注

消息传递系统使多个应用程序可以相互收发数据,不用担心数据的传输和共享。点到点和发布者-订阅者是两种广泛使用的消息传递系统。在点到点模型中,发送方将数据推送到队列,接收方从队列中弹出数据,就像遵循先进先出(FIFO)原则的标准队列系统一样。此外,一旦读取数据,数据就被删除,并且每次只允许一个接收方。接收方读取消息时不存在时间依赖关系。

图1. 点对点消息系统

在发布者-订阅者模型中,发送方称为发布者,接收方称为订阅者。在这种模型中,多个发送方和接收方可以同时读取或写入数据。但是它有时间依赖关系。消费者必须在一定的时间之前消费消息,因为此后消息被删除,即使它没有被读取。这个时间限制可能是一天、一周或一个月,视用户的配置而定。

图2. 发布者-订阅者消息系统

一、Kafka的架构

Kafka架构由几个关键组件组成:

1. 主题

2. 分区

3. 代理

4. 生产者

5. 消费者

6. Kafka集群

7. Zookeeper

图3. Kafka的架构

不妨简单了解一下每个组件。

Kafka将消息存储在不同的主题中。主题是一个组,含有特定类别的消息。它类似数据库中的表。主题由名称作为唯一标识符。不能创建名称相同的两个主题。

主题进一步划分为分区。这些分区的每个记录都与一个名为Offset的唯一标识符相关联,该标识符表示了记录在该分区中的位置。

除此之外,系统中还有生产者和消费者。生产者使用Producing API编写或发布主题中的数据。这些生产者可以在主题或分区层面写入数据。

消费者使用Consumer API从主题中读取或消费数据。它们还可以在主题或分区层面读取数据。执行类似任务的消费者将组成一个组,名为消费者组。

还有其他系统,比如代理(Broker)和Zookeeper,它们在Kafka服务器的后台运行。代理是维护和保存已发布消息记录的软件。它还负责使用offset以正确的顺序将正确的消息传递给正确的使用者。相互之间进行集体通信的代理集可以称为Kafka集群。代理可以动态添加到Kafka集群中或从集群中动态删除,系统不会遇到任何停机。Kafka集群中的其中一个代理名为控制器。它负责管理集群内的状态和副本,并执行管理任务。

另一方面,Zookeeper负责维护Kafka集群的健康状态,并与该集群的每个代理进行协调。它以键值对的形式维护每个集群的元数据。

本教程主要介绍实际实现Apache Kafka的例子。

二、出租车预订应用程序:实际用例

以优步之类的出租车预订服务这一用例为例。这个应用程序使用Apache Kafka通过各种服务(比如事务、电子邮件、分析等)发送和接收消息。

图4出租车应用程序架构图

架构由几个服务组成。Rides服务接收来自客户的打车请求,并将打车详细信息写入到Kafka消息系统上。

然后,Transaction服务读取这些订单详细信息,确认订单和支付状态。在确认这趟打车之后,该Transaction服务将再次在消息系统中写入确认的打车信息,并添加一些额外的详细信息。最后,电子邮件或数据分析等其他服务读取已确认的打车细节,并向客户发送确认邮件,并对其进行一些分析。

我们可以以非常高的吞吐量和极小的延迟实时执行所有这些进程。此外,由于Apache Kafka能够横向扩展,我们可以扩展这个应用程序以处理数百万用户。

三、上述用例的实际实现

本节包含在我们的应用程序中实现Kafka消息系统的快速教程。它包括下载和配置Kafka、创建生产者-消费者函数的步骤。

注意:本教程基于Python编程语言,使用Windows机器。

1.Apache Kafka下载步骤

1)从这个链接(https://kafka.apache.org/downloads)下载最新版本的Apache Kafka。Kafka基于JVM语言,所以必须在系统中安装Java 7或更高版本。

2) 从计算机的C:驱动器解压已下载的zip文件,并将文件夹重命名为/apache-kafka。

3)父目录包含两个子目录:/bin和/config,分别含有Zookeeper和Kafka服务器的可执行文件和配置文件。

2.配置步骤

首先,我们需要为Kafka和Zookeeper服务器创建日志目录。这些目录将存储这些集群的所有元数据以及主题和分区的消息。

注意:默认情况下,这些日志目录创建在/tmp目录中,这是一个易变目录:当系统关闭或重启时,该目录中的所有数据都会消失。我们需要为日志目录设置永久路径来解决这个问题。不妨看看怎么做。

导航到apache-kafka >> config,打开server.properties文件。在这里您可以配置Kafka的许多属性,比如日志目录路径、日志保留时间和分区数量等。

在server.properties文件中,我们必须将日志目录文件的路径从临时/tmp目录改为永久目录。日志目录含有Kafka Server中的生成或写入的数据。若要更改路径,将log.dirs变量由/tmp/kafka-logs改为c:/apache-kafka/kafka-logs。这将使您的日志永久存储。

log.dirs = c: / apache-kafka / kafka-logs

Zookeeper服务器还包含一些日志文件,用于存储Kafka服务器的元数据。若要更改路径,重复上面的步骤,即打开zookeeper.properties文件,并按如下方式替换路径。

dataDir = c: / apache-kafka / zookeeper-logs

该Zookeeper服务器将充当Kafka服务器的资源管理器。

四、运行Kafka和Zookeeper服务器

若要运行Zookeeper服务器,在父目录中打开一个新的cmd提示符,并运行以下命令。

$ .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

图5

保持Zookeeper实例运行。

若要运行Kafka服务器,打开一个单独的cmd提示符,并执行以下代码:

$ .\bin\windows\kafka-server-start.bat .\config\server.properties

保持Kafka和Zookeeper服务器运行;在下一节中,我们将创建生产者和消费者函数,它们用于读取数据并将数据写入到Kafka服务器。

五、创建生产者和消费者函数

为了创建生产者和消费者函数,我们将以前面讨论的电子商务应用程序为例。“订单”服务将充当生产者,将订单细节写入到Kafka服务器,而电子邮件和分析服务将充当消费者,从服务器读取该数据。交易服务将充当消费者和生产者。它读取订单详细信息,并在交易确认后再次将它们写回来。

但首先我们需要安装Kafka Python库,该库含有生产者和消费者的内置函数。

$ pip install kafka-python

现在,创建一个名为kafka-tutorial的新目录。我们将在该目录中创建含有所需函数的Python文件。

$ mkdir kafka-tutorial
$ CD .\kafka-tutorial\

生产者函数:

现在,创建一个名为' rides.py '的Python文件,并将以下代码粘贴到其中。

rides.py

import kafka
import json
import time
import random

topicName = "ride_details"
producer = kafka.KafkaProducer(bootstrap_servers="localhost:9092")

for i in range(1, 10):
ride = {
"id": i,
"customer_id": f"user_{i}",
"location": f"Lat: {random.randint(-90, 90)}, Long: {random.randint(-90, 90)}",
}
producer.send(topicName, json.dumps(ride).encode("utf-8"))
print(f"Ride Details Send Succesfully!")
time.sleep(5)

解释:

首先,我们导入了所有必要的库,包括Kafka。然后,定义主题名称和各项目的列表。记住,主题是一个含有类似类型消息的组。在本例中,该主题将包含所有订单。

然后,我们创建一个KafkaProducer函数的实例,并将其连接到在localhost:9092上运行的Kafka服务器。如果您的Kafka服务器在不同的地址和端口上运行,那么您必须在那里提及服务器的IP和端口号。

之后,我们将生成一些JSON格式的订单,并根据定义的话题名称将它们写入到Kafka服务器。睡眠函数用于生成后续订单之间的间隔。

消费者函数:

transaction.py

import json
import kafka
import random

RIDE_DETAILS_KAFKA_TOPIC = "ride_details"
RIDES_CONFIRMED_KAFKA_TOPIC = "ride_confirmed"

consumer = kafka.KafkaConsumer(
RIDE_DETAILS_KAFKA_TOPIC, bootstrap_servers="localhost:9092"
)
producer = kafka.KafkaProducer(bootstrap_servers="localhost:9092")

print("Listening Ride Details")
while True:
for data in consumer:
print("Loading Transaction..")
message = json.loads(data.value.decode())
customer_id = message["customer_id"]
location = message["location"]
confirmed_ride = {
"customer_id": customer_id,
"customer_email": f"{customer_id}@xyz.com",
"location": location,
"alloted_driver": f"driver_{customer_id}",
"pickup_time": f"{random.randint(1, 20)}mins",
}
print(f"Transaction Completed..({customer_id})")
producer.send(
RIDES_CONFIRMED_KAFKA_TOPIC, json.dumps(confirmed_ride).encode("utf-8")
)

解释:

transaction.py文件用于确认用户所做的交易,并为他们分配司机和估计的载客时间。它从Kafka服务器读取打车细节,并在确认打车后将其再次写入到Kafka服务器。

现在,创建两个名为email.py和analysis .py的Python文件,分别用于向客户发送电子邮件以确认打车和执行一些分析。创建这些文件只是为了表明甚至多个消费者都可以同时从Kafka服务器读取数据。

email.py

import kafka
import json

RIDES_CONFIRMED_KAFKA_TOPIC = "ride_confirmed"
consumer = kafka.KafkaConsumer(
RIDES_CONFIRMED_KAFKA_TOPIC, bootstrap_servers="localhost:9092"
)

print("Listening Confirmed Rides!")
while True:
for data in consumer:
message = json.loads(data.value.decode())
email = message["customer_email"]
print(f"Email sent to {email}!")


analysis.py

import kafka
import json

RIDES_CONFIRMED_KAFKA_TOPIC = "ride_confirmed"
consumer = kafka.KafkaConsumer(
RIDES_CONFIRMED_KAFKA_TOPIC, bootstrap_servers="localhost:9092"
)

print("Listening Confirmed Rides!")
while True:
for data in consumer:
message = json.loads(data.value.decode())
id = message["customer_id"]
driver_details = message["alloted_driver"]
pickup_time = message["pickup_time"]
print(f"Data sent to ML Model for analysis ({id})!")

现在,我们已完成了应用程序。在下一节中,我们将同时运行所有服务并检查性能。

六、测试应用程序

在四个单独的命令提示符中逐一运行每个文件。

$ python transaction.py

$ python email.py

$ python analysis.py

$ python ride.py

图6

当打车详细信息被推送到服务器时,您可以同时接收来自所有文件的输出。您还可以通过删除rides.py文件中的延迟函数来提高处理速度。'rides.py'文件将数据推送到Kafka服务器,另外三个文件同时从Kafka服务器读取数据,并发挥相应的作用。

但愿您对Apache Kafka以及如何实现它已有了基本的了解。

七、结语

我们在本文中了解了Apache Kafka工作原理及实际实现该架构的出租车预订应用程序用例。使用Kafka设计一条可扩展的管道需要认真计划和实施。您可以增加代理和分区的数量,提高这些应用程序的可扩展性。每个分区都独立处理,这样负载可以在它们之间予以分配。此外,您还可以通过设置缓存大小、缓冲区大小或线程数量来优化Kafka配置。

本文中使用的完整代码的GitHub链接如下:https://github.com/aryan0141/apache-kafka-tutorial/tree/master。

原文链接:https://www.kdnuggets.com/2023/04/build-scalable-data-architecture-apache-kafka.html

来源:51CTO技术栈内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯