这篇文章主要介绍Flume如何采集到HDFS,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
一、需求:
采集指定文件的内容到HDFS
技术选型:exec - memory - hdfs
a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /home/hadoop/data/data.log# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://192.168.0.129:9000/user/hadoop/flumea1.sinks.k1.hdfs.batchSize = 10 #10行产生新文件a1.sinks.k1.hdfs.fileType = DataStream #压缩格式a1.sinks.k1.hdfs.writeFormat = Text #格式类型# Use a channel which buffers events in memorya1.channels.c1.type = memory# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动:
./flume-ng agent \--name a1 \--conf $FLUME_HOME/conf \--conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \-Dflume.root.logger=INFO,console \-Dflume.monitoring.type=http \-Dflume.monitoring.port=34343
添加测试数据:
[hadoop@hadoop001 data]$ touch data.log[hadoop@hadoop001 data]$ echo test >> data.log[hadoop@hadoop001 data]$ echo test >> data.log[hadoop@hadoop001 data]$ echo test >> data.log[hadoop@hadoop001 data]$ echo test >> data.log[hadoop@hadoop001 data]$ echo test >> data.log
检查HDFS:
[hadoop@hadoop001 flume]$ hdfs dfs -text hdfs://192.168.0.129:9000/user/hadoop/flume/*18/08/09 20:59:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicabletesttesttesttesttest
二、需求:
采集指定文件夹的内容到(HDFS或者控制台)
==》文件夹下文件不能修改切不能重名
==》处理完当前文件添加.COMPLETED标识
a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/hadoop/data/a1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memory# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
三、需求:(生产使用,记录偏移量)
采集指定文件夹和文件内容到(控制台或者HDFS)
a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = TAILDIRa1.sources.r1.channels = c1#记录偏移量,重启续传a1.sources.r1.positionFile = /home/hadoop/script/flume/taildir_position.jsona1.sources.r1.filegroups = f1 f2#监控指定log文件a1.sources.r1.filegroups.f1 =/home/hadoop/data/example.loga1.sources.r1.headers.f1.headerKey1 = value1#监控文加下的所有log*文件夹和内容a1.sources.r1.filegroups.f2 = /home/hadoop/data/test/.*log.*a1.sources.r1.headers.f2.headerKey1 = value2a1.sources.r1.headers.f2.headerKey2 = value2-2# 控制台输出a1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动:
./flume-ng agent \--name a1 \--conf $FLUME_HOME/conf \--conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \-Dflume.root.logger=INFO,console
记录偏移量:
[hadoop@hadoop001 flume]$ cat taildir_position.json
[{"inode":679982,"pos":14,"file":"/home/hadoop/data/example.log"}
{"inode":679984,"pos":0,"file":"/home/hadoop/data/test/log1.log"}]
以上是“Flume如何采集到HDFS”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网行业资讯频道!