【51CTO.com快译】Kafka Connect是一种特别强大的开源数据流工具;有了它,将Kafka与其他数据技术结合使用非常轻松。作为一种分布式技术,Kafka Connect提供了特别高的可用性和独立于Kafka集群的弹性扩展。Kafka Connect使用源或sink连接件发送进出Kafka主题的数据,无需代码即可与多种非Kafka技术实现整合。
图1
可靠的开源Kafka连接件可供许多流行的数据技术使用,您还有机会编写自己的连接件。本文介绍了一个真实的实际数据用例,即如何使用Kafka Connect将来自Kafka的实时流数据与Elasticsearch(以启用索引Kafka记录的可扩展搜索)和Kibana(以便可视化那些结果)整合起来。
图2
针对表明Kafka和Kafka Connect优点的一个用例,我受到CDC新冠疫情数据跟踪器的启发。基于Kafka的跟踪器从多个位置、以多种格式并使用多种协议收集实时新冠病毒检测数据,并将这些事件处理成易于使用的可视化结果。跟踪器还有必要的数据治理机制,以确保结果快速到达,并值得信任。
我开始寻找一个同样复杂且引人注目的用例——但理想情况下,不像新冠疫情那样令人担忧。最终,我发现了一个有趣的领域:月潮,包括公开可用的流REST API和采用简单JSON格式的丰富数据。
月潮数据
潮汐遵循太阴日,这是一个24小时50分钟的周期;在此期间,地球完全自转到轨道卫星下方的同一点。每个太阴日有月球引力引起的两个高潮和两个低潮:
图3. 来自美国国家海洋和大气管理局
美国国家海洋和大气管理局(NOAA)提供了一个REST API,可以从全球潮汐站轻松获取详细的传感器数据。
图4
比如说,下列REST调用指定了潮汐站ID、数据类型(我选择了海平面)和数据(平均海平面),并请求一个采用公制单位的最近结果:
https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json
该调用返回JSON结果,含有潮汐站的经纬度、时间和水位值。请注意,您必须记住您调用的是什么,以便了解所返回结果的数据类型、数据和单位!
- {"metadata": {
- "id":"8724580",
- "name":"Key West",
- "lat":"24.5508”,
- "lon":"-81.8081"},
- "data":[{
- "t":"2020-09-24 04:18",
- "v":"0.597",
- "s":"0.005", "f":"1,0,0,0", "q":"p"}]}
启动数据管道(使用REST源连接件)
要开始创建Kafka Connect流数据管道,我们必须先准备Kafka集群和Kafka Connect集群。
图5
接下来,我们引入一个REST连接件,比如这个可用的开源连接件。我们会将其部署到AWS S3存储桶(如果需要,参照这些说明)。 然后我们将要求Kafka Connect集群使用S3存储桶,对它同步以便在集群中可见,配置连接件,最后让它运行起来。这种“BYOC”(自带连接件)方法确保您有无数的方法来寻找满足特定要求的连接件。
图6
下列示例演示使用“curl”命令将完全开源的Kafka Connect部署环境配置成可使用REST API。请注意,您需要更改URL、名称和密码以匹配您自己的部署:
- curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d '
- {
- "name": "source_rest_tide_1",
- "config": {
- "key.converter":"org.apache.kafka.connect.storage.StringConverter",
- "value.converter":"org.apache.kafka.connect.storage.StringConverter",
- "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
- "tasks.max": "1",
- "rest.source.poll.interval.ms": "600000",
- "rest.source.method": "GET",
- "rest.source.url": "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json",
- "rest.source.headers": "Content-Type:application/json,Accept:application/json",
- "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
- "rest.source.destination.topics": "tides-topic"
- }
- }
该代码创建的连接件任务以10分钟为间隔轮询REST API,并将结果写入到“tides-topic”Kafka主题。通过随机选择五个潮汐传感器以这种方式收集数据,潮汐数据现在通过五个配置和五个连接件填充了潮汐主题。
图7
结束管道(使用Elasticsearch sink连接件)
为了将该潮汐数据放在某个地方,我们将在管道末端引入Elasticsearch集群和Kibana。 我们将配置一个开源Elasticsearch sink连接件,以便向Elasticsearch发送数据。
图8
以下示例配置使用sink名称、类、Elasticsearch索引和我们的Kafka主题。如果索引尚未存在,会创建一个有默认映射的索引。
- curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d '
- {
- "name" : "elastic-sink-tides",
- "config" :
- {
- "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector",
- "tasks.max" : 3,
- "topics" : "tides",
- "connect.elastic.hosts" : ”ip",
- "connect.elastic.port" : 9201,
- "connect.elastic.kcql" : "INSERT INTO tides-index SELECT * FROM tides-topic",
- "connect.elastic.use.http.username" : ”elasticName",
- "connect.elastic.use.http.password" : ”elasticPassword"
- }
- }'
该管道现在可运作起来。然而,由于默认索引映射,进入到Tides索引的所有潮汐数据是字符串。
图9
需要自定义映射以准确地绘制我们的时间序列数据。我们将为下面的潮汐索引创建这个自定义映射,使用JSON“t”字段用于自定义日期,“v”作为两倍数,“name”作为代表聚合的关键字。
- curl -u elasticName:elasticPassword ”elasticURL:9201/tides-index" -X PUT -H 'Content-Type: application/json' -d'
- {
- "mappings" : {
- "properties" : {
- "data" : {
- "properties" : {
- "t" : { "type" : "date",
- "format" : "yyyy-MM-dd HH:mm"
- },
- "v" : { "type" : "double" },
- "f" : { "type" : "text" },
- "q" : { "type" : "text" },
- "s" : { "type" : "text" }
- }
- },
- "metadata" : {
- "properties" : {
- "id" : { "type" : "text" },
- "lat" : { "type" : "text" },
- "long" : { "type" : "text" },
- "name" : { "type" : ”keyword" } }}}} }'
每次更改Elasticsearch索引映射时,通常都需要Elasticsearch“重新索引”(删除索引并重新索引所有数据)。数据既可以从现有的Kafka sink连接件重放,就像我们在这个用例中所做的那样,也可以使用Elasticsearch重新索引操作来获取。
使用Kibana可视化数据
为了可视化潮汐数据,我们先用Kibana创建一个索引模式,将“t”配置为时间过滤器字段。然后,我们将创建一个可视化,选择线图类型。最后,我们将配置图设置,以便y轴显示30分钟内的平均潮位,x 轴显示随时间变化的该数据。
结果是下图显示了五个样本潮汐站的潮汐变化,管道从这些潮汐站收集数据:
图10
结果
我们可以从可视化中清楚地看到潮汐的周期性,每个太阴日出现两次高潮。
图11
更令人惊讶的是,每个全球潮汐站的高潮和低潮之间的间隔不一样。这不仅受月球的影响,还受太阳、当地地理、天气和气候变化的影响。这个示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana帮助演示可视化的优点:它们通常可以揭示原始数据无法揭示的信息!
原文How to Use Kafka Connect to Create an Open Source Data Pipeline for Processing Real-Time Data,作者:Paul Brebner
【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】