文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

spark rdd转dataframe 写入mysql的实例讲解

2024-04-02 19:55

关注

dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍。spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行计算,这种情况下不会spark streaming的人也都可以方便的享受到实时计算带来的好处。    

下面的示例为读取本地文件成rdd并隐式转换成dataframe对数据进行查询,最后以追加的形式写入mysql表的过程,scala代码示例如下


import java.sql.Timestamp
import org.apache.spark.sql.{SaveMode, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
object DataFrameSql {
 case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)extends Serializable{
 override def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp)
 }
 def main(args:Array[String]): Unit ={
 val conf = new SparkConf()
 conf.setMaster("local[2]")
// ----------------------
 //参数 spark.sql.autoBroadcastJoinThreshold 设置某个表是否应该做broadcast,默认10M,设置为-1表示禁用
 //spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果
 // spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom
 //spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩
// ----------------------
 conf.set("spark.sql.shuffle.partitions","20") //默认partition是200个
 conf.setAppName("dataframe test")
 val sc = new SparkContext(conf)
 val sqc = new SQLContext(sc)
 val ac = sc.accumulator(0,"fail nums")
 val file = sc.textFile("src\\main\\resources\\000000_0")
 val log = file.map(lines => lines.split(" ")).filter(line =>
  if (line.length != 4) { //做一个简单的过滤
  ac.add(1)
  false
  } else true)
  .map(line => memberbase(line(0).toLong, line(1),Timestamp.valueOf(line(2)), line(3).toInt))
 // 方法一、利用隐式转换
 import sqc.implicits._
 val dftemp = log.toDF() // 转换
 
 val df = dftemp.registerTempTable("memberbaseinfo")
 
 val sqlcommand="select * from memberbaseinfo"
 val sel = sqc.sql(sqlcommand)
 val prop = new java.util.Properties
 prop.setProperty("user","etl")
 prop.setProperty("password","xxx")
 // 调用DataFrameWriter将数据写入mysql
 val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // 表可以不存在
 println(ac.name.get+" "+ac.value)
 sc.stop()
 }
}

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯