文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SparkStreaming两种方式连接Flume

2020-08-06 04:59

关注

SparkStreaming两种方式连接Flume

SparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark 拉取 Flume的输出数据);

 Flume向SparkStreaming推送数据没有研究明白,有大佬指点一下吗?

万分感谢!

1.Spark拉取Flume数据:

导入两个jar包到flume/lib下

 否则抛出这两个异常:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink

java.lang.IllegalStateException: begin() called when transaction is OPEN!

2.编写flume 工作文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding
a1.sources.r1.fileHeader=true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 当前主机端口
a1.sinks.k1.hostname = 192.168.137.88
a1.sinks.k1.port = 9999

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.编写SparkStreaming程序:

package day02

import java.net.InetSocketAddress

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}


object StreamingFlume {

  def main(args: Array[String]): Unit = {
    //1.创建SparkConf对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")
    //2.创建SparkContext对象
    val sc = new SparkContext(conf)
    //设置日志输出格式,只打印异常日志,在这里设置没有用
    //sc.setLogLevel("WARN")
    //3.创建StreamingContext,Seconds(5):轮询机制,多久执行一次
    val ssc = new StreamingContext(sc, Seconds(5))
    //4.定义一个flume集合,可以接受多个flume数据,多个用,隔开需要new
    val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))
    //5.获取flume中的数据,
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)
    // 6.截取flume数据:{"header":xxxxx   "body":xxxxxx}
    val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
    lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 4。开启flume监控文件,开启SparkStreaming程序:

向指定目录上传文件

 

 

 

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯