文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

spark 与flume 1.6.0的示例代码

2023-06-02 19:47

关注

小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitionerobject SparkStreamingFlumePush {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc,Seconds(5))    ssc.checkpoint("d:\\checkpoint")    val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{    //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一    //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二    iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三    }    //总共有两种获取数据的方式,push和poll,这种是push即flume将数据推送给spark 该出的ip、port是spark的ip地址和port    val rds = FlumeUtils.createStream(ssc, "192.168.1.9", 8888, StorageLevel.MEMORY_ONLY)    val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))    .map(x=>(x,1))    .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)        result.print()        ssc.start()    ssc.awaitTermination()          }}
package hgs.spark.streamingimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport java.net.InetAddressimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner//spark支持1.6.0的flume版本//同时需要如下三个包 将三个包放到flume的classpath下面 object SparkStreamingFlumePoll {  def main(args: Array[String]): Unit = {   val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");   val sc = new SparkContext(conf)   val ssc = new StreamingContext(sc,Seconds(5))   ssc.checkpoint("d:\\checkpoint")   val ipSeq =  Seq(new InetSocketAddress("192.168.6.129",8888))   //这种方式通过spark从flume拉取数据   val rds = FlumeUtils.createPollingStream(ssc, ipSeq, StorageLevel.MEMORY_AND_DISK)      val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{    //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一    //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二    iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三    }   val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))    .map(x=>(x,1))    .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)        result.print()        ssc.start()    ssc.awaitTermination()  }}//遇到的错误 scala-library包在flume 的lib下面本来就有,包重复导致的冲突,删除一个

看完了这篇文章,相信你对“spark 与flume 1.6.0的示例代码”有了一定的了解,如果想了解更多相关知识,欢迎关注编程网行业资讯频道,感谢各位的阅读!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯