这个方案是错误的,因为我之前并没有正确了解waterdrop,waterdrop并不支持实时数据处理,所有针对日志文件,是不可行的, 虽然我这边已经处理,并且从日志中清洗得到了需要的数据。
这个方案是可行的,其中在使用clickhouse de 外部表引擎kafka 的时候,遇到了很多问题,最后查看官方的提问,以及作者的回复,得以解决。
未进行测试验证。但是waterdrop 群里有人做过。
- flume 的 avro 需要注意相关。
-
flume 发送方的slink 需要填写的是接受方的ip,以及端口。我这边本地发送方是 104 ,接受方式118。 相关配置
# 定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置 source 组件 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/logs/gps.log # 将数据流复制给多个channel #agent1.sources.source1.selector.type = replicating # 配置 sink 组件 agent1.sinks.sink1.type = avro agent1.sinks.sink1.hostname = 192.168.108.118 agent1.sinks.sink1.port = 4141 agent1.sinks.sink1.request-timeout = 500000 # 配置 channel 组件 agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 100000 agent1.channels.channel1.transactionCapacity = 10000 agent1.channels.channel1.keep.alive = 80 # 给 source 和 sink 绑定 channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 agent1.sources.source1.interceptors = filter1 search-replace1 agent1.sources.source1.interceptors.filter1.type = REGEX_FILTER agent1.sources.source1.interceptors.filter1.regex = (接收到的数据为:) agent1.sources.source1.interceptors.filter1.excludeEvents = false #agent1.sources.source1.interceptors.filter2.type = REGEX_FILTER #agent1.sources.source1.interceptors.filter2.regex = (接收到的数据为:) #agent1.sources.source1.interceptors.filter2.excludeEvents = false #agent1.sources.source1.interceptors = search-replace1 agent1.sources.source1.interceptors.search-replace1.type = search_replace agent1.sources.source1.interceptors.search-replace1.searchPattern = [^{]*(?=\{) agent1.sources.source1.interceptors.search-replace1.replaceString =
-
这里的source 就是读取的日志文件地址
-
channel 配置的规则是 capacity > transactionCapacity > batchSize
-
接收方的配置如下
#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置 source 组件 agent1.sources.source1.type = avro # 要监听的 hostname 或者IP地址 agent1.sources.source1.bind = 192.168.108.118 agent1.sources.source1.port = 4141 # 配置 sink 组件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://cluster1/user/oracle/%Y-%m-%d #Flume在HDFS文件夹下创建新文件的固定前缀 agent1.sinks.sink1.hdfs.filePrefix = access_log #允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 #agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 #向 HDFS 写入内容时每次批量操作的 Event 数量 agent1.sinks.sink1.hdfs.batchSize= 5000 #文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 agent1.sinks.sink1.hdfs.fileType = DataStream #文件写入格式 agent1.sinks.sink1.hdfs.writeFormat = Text #替换转义序列时是否使用本地时间戳 agent1.sinks.sink1.hdfs.useLocalTimeStamp = true #Flume在HDFS文件夹下创建新文件的后缀 #agent1.sinks.sink1.hdfs.fileSuffix = .txt #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件) agent1.sinks.sink1.hdfs.rollSize = 0 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 agent1.sinks.sink1.hdfs.rollInterval = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) agent1.sinks.sink1.hdfs.rollCount = 0 #允许HDFS操作的毫秒数,例如:open,write, flush, close。如果很多HFDS操作超时,这个配置应该增大。 agent1.sinks.sink1.hdfs.callTimeout = 480000 #每个HDFS sink的HDFS的IO操作线程数(例如:open,write) agent1.sinks.sink1.hdfs.threadsPoolSize = 10 #在发起一个关闭命令后,HDFS sink必须尝试重命名文件的次数 agent1.sinks.sink1.hdfs.closeTries = 0 #在几秒钟之间连续尝试关闭文件 agent1.sinks.sink1.hdfs.retryInterval = 60 # 配置 channel 组件 agent1.channels.channel1.type = memory agent1.channels.channel1.transactionCapacity = 10000 agent1.channels.channel1.keep.alive = 80 agent1.channels.channel1.capacity = 100000 # 给 source 和 sink 绑定 channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
-
接收方的source 这里是118自己,因为它需要把自己的端口发布出去,114 才可以连接上,进行传输。这里建议填写自己的ip,不要填写localhost以及127.0.0.0
-
发送读取日志文件的时候,需要注意,是大写的F,注意这里与 linux 的区分。
-
- kafka 相关问题以及集群的搭建。
-
这里的同事之前都搭建好了,直接使用就可以了。相关写入的flume 配置文件如下(这里只是展示 sink1 的配置)
# 配置 sink 组件 agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink #agent1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.1.8.111:9092,192.168.108.112:9092,192.168.108.113:9092,192.168.108.118:9092,192.168.108.119:9092 # agent1.sinks.sink1.brokerList = master:9092,slave1:9092,slave2:9092 agent1.sinks.sink1.topic = s agent1.sinks.sink1.kafka.flumeBatchSize = 20 agent1.sinks.sink1.kafka.producer.acks = 1 agent1.sinks.sink1.kafka.producer.linger.ms = 1 #agent1.sinks.sink1.kafka.compression.type = snappy
-
kafka 相关命令以及原理,参考上面的文档地址
-
- clickhouse 相关
-
kafka 的外部表引擎
CREATE TABLE kafka_structure_test (encrypt String,date DateTime,lon String,lat String,vec1 String,vec2 String,vec3 String,direction String,altitude String,state String,alarm String,vehicleNo String,vehicleColor String,id String,createBy String,createDt DateTime) ENGINE = Kafka SETTINGS kafka_broker_list = "192.168.108.118:9092,192.168.108.119:9092", kafka_topic_list = "wl_vehicle_data_clean", kafka_group_name = "wl_vehicle_data_up", kafka_format = "JSONEachRow", kafka_row_delimiter = " ", kafka_num_consumers = 1,kafka_max_block_size = 500;
-
kafka 引擎相关文档参考 文档地址
-
物化视图建立
CREATE MATERIALIZED VIEW consumer TO t_plt_vehicle_location_test AS select id,encrypt,date as up_date,lon,createBy as create_by,createDt as create_dt,lat,vec1,vec2,vec3,direction,altitude,state,alarm, vehicleNo as vehicleno,vehicleColor as vehiclecolor from kafka_structure_test ;
-
clickhouse 的测试表建立
create table t_plt_vehicle_location_test ( id String default "MSG0", encrypt String default "0", up_date DateTime default "1970-01-01 00:00:01", lon String default -1, create_by String default "UP_EXG_MSG_REAL_LOCATION", create_dt DateTime default now() , lat String default -1, vec1 String default -1, vec2 String default -1, vec3 String default -1, direction String default -1, altitude String default -1, state String default -1, alarm String default -1, vehicleno String default "-1", vehiclecolor String default "-1", alarm_code String default "-1" ) ENGINE = MergeTree() partition by toYYYYMM(up_date) ORDER BY (vehicleno,up_date) SETTINGS index_granularity = 8192
-
我们这边在使用clickhouse 的kafka 引擎的时候,遇到了一个问题,kafka 引擎连接上kafka 以后,隔一段时间就自动掉线,连接中断无法消费,后面查看github 的相关提问,发现这是clickhouse 的相关bug, 修复好是在 ClickHouse 19.13.2.19 版本。相关链接
-
后续其他问题,也可以在上面进行查看。报错查看clickhouse 的日志。或者在kafka 查看消费者消费的offset 即可看到 kafka 引擎是否还在连接kafka 进行消费。
-
- clickhouse 的问题,clickhouse 相关对于kafka 的支持相关还不是很稳定,我们这边查看升级的版本也是19年年底才修复的问题。
- 遇到小的问题,别人都没遇到的问题,请在三仔细查看,因为那多半是你犯了基础的低级错误,而别人没犯。所以你问别人,别人多半都不知道。
- 细心,在细心。遇到错误,不要相信之前的逻辑判断。从新进行分析梳理。一点点的验证。