文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何理解Spark Streaming的数据可靠性和一致性

2023-06-19 11:13

关注

如何理解Spark Streaming的数据可靠性和一致性,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

眼下大数据领域最热门的词汇之一便是流计算了,其中最耀眼的项目无疑是来自Spark社区的Spark Streaming项目,其从一诞生就受到广泛关注并迅速发展,目前已有追赶并超越Storm的架势。

对于流计算而言,毫无疑问最核心的特点是它的低时延能力,这主要是来自对数据不落磁盘就进行计算的内部机制,但这也带来了数据可靠性的问题,即有节点失效或者网络异常时,如何在节点间进行合适的协商来进行重传。更进一步的,若发生计划外的数据重传,怎么能保证没有产生重复的数据,所有数据都是精确一次的(Exact Once)?如果不解决这些问题,大数据的流计算将无法满足大多数企业级可靠性要求而流于徒有虚名。

下面将重点分析Spark Streaming是如何设计可靠性机制并实现数据一致性的。

Driver HA

由于流计算系统是长期运行、数据不断流入的,因此其Spark守护进程(Driver)的可靠性是至关重要的,它决定了Streaming程序能否一直正确地运行下去。

如何理解Spark Streaming的数据可靠性和一致性

图一 Driver数据持久化

Driver实现HA的解决方案就是将元数据持久化,以便重启后的状态恢复。如图一所示,Driver持久化的元数据包括:

如何理解Spark Streaming的数据可靠性和一致性

图二 Driver故障恢复

Driver失败重启后:

通过如上的数据备份和恢复机制,Driver实现了故障后重启、依然能恢复Streaming任务而不丢失数据,因此提供了系统级的数据高可靠。

可靠的上下游IO系统

流计算主要通过网络socket通信来实现与外部IO系统的数据交互。由于网络通信的不可靠特点,发送端与接收端需要通过一定的协议来保证数据包的接收确认、和失败重发机制。

不是所有的IO系统都支持重发,这至少需要实现数据流的持久化,同时还要实现高吞吐和低时延。在Spark Streaming官方支持的data source里面,能同时满足这些要求的只有Kafka,因此在最近的Spark Streaming release里面,也是把Kafka当成推荐的外部数据系统。

除了把Kafka当成输入数据源(inbound data source)之外,通常也将其作为输出数据源(outbound data source)。所有的实时系统都通过Kafka这个MQ来做数据的订阅和分发,从而实现流数据生产者和消费者的解耦。

一个典型的企业大数据中心数据流向视图如下所示:

如何理解Spark Streaming的数据可靠性和一致性

图三 企业大数据中心数据流向视图

除了从源头保证数据可重发之外,Kafka更是流数据Exact Once语义的重要保障。Kafka提供了一套低级API,使得client可以访问topic数据流的同时也能访问其元数据。Spark Streaming的每个接收任务可以从指定的Kafka topic、partition和offset去获取数据流,各个任务的数据边界很清晰,任务失败后可以重新去接收这部分数据而不会产生“重叠的”数据,因而保证了流数据“有且仅处理一次”。

可靠的接收器

在Spark 1.3版本之前,Spark Streaming是通过启动专用的Receiver任务来完成从Kafka集群的数据流拉取。

Receiver任务启动后,会使用Kafka的高级API来创建topicMessageStreams对象,并逐条读取数据流缓存,每个batchInerval时刻到来时由JobGenerator提交生成一个spark计算任务。

由于Receiver任务存在宕机风险,因此Spark提供了一个高级的可靠接收器-ReliableKafkaReceiver类型来实现可靠的数据收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批数据持久化到磁盘后,更新topic-partition的offset信息,再去接收下一批Kafka数据。万一Receiver失败,重启后还能从WAL里面恢复出已接收的数据,从而避免了Receiver节点宕机造成的数据丢失(以下代码删除了细枝末节的逻辑):

class ReliableKafkaReceiver{  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null  override def onStart(): Unit = {    // Initialize the topic-partition / offset hash map.    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]    // Initialize the block generator for storing Kafka message.    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(      topics.values.sum, "KafkaMessageHandler")    blockGenerator.start()    val topicMessageStreams = consumerConnector.createMessageStreams(      topics, keyDecoder, valueDecoder)    topicMessageStreams.values.foreach { streams =>      streams.foreach { stream =>        messageHandlerThreadPool.submit(new MessageHandler(stream))      }    }  }

启用WAL后虽然Receiver的数据可靠性风险降低了,但却由于磁盘持久化带来的开销,系统整体吞吐率会有明显的下降。因此,在最新发布的Spark 1.3版本里,Spark Streaming增加了使用Direct API的方式来实现Kafka数据源的访问。

引入了Direct API后,Spark Streaming不再启动常驻的Receiver接收任务,而是直接分配给每个Batch及RDD最新的topic partition offset。job启动运行后Executor使用Kafka的simple consumer API去获取那一段offset的数据。

这样做的好处不仅避免了Receiver宕机带来的数据可靠性风险,同时也由于避免使用ZooKeeper做offset跟踪,而实现了数据的精确一次性(以下代码删除了细枝末节的逻辑):

class DirectKafkaInputDStream{  protected val kc = new KafkaCluster(kafkaParams)  protected var currentOffsets = fromOffsets  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)  }

预写日志 Write Ahead Log

Spark 1.2开始提供了预写日志能力,用于Receiver数据及Driver元数据的持久化和故障恢复。WAL之所以能提供持久化能力,是因为它利用了可靠的HDFS做数据存储。

Spark Streaming预写日志机制的核心API包括:

以上核心API在数据接收和恢复阶段的交互示意图如图四所示。

如何理解Spark Streaming的数据可靠性和一致性

图四 基于WAL的数据接收和恢复示意图

从WriteAheadLogWriter的源码里可以清楚地看到,每次写入一块数据buffer到HDFS后都会调用flush方法去强制刷入磁盘,然后才去取下一块数据。因此receiver接收的数据是可以保证持久化到磁盘了,因而做到了较好的数据可靠性。

private[streaming] class WriteAheadLogWriter{  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {    data.rewind() // Rewind to ensure all data in the buffer is retrieved    val lengthToWrite = data.remaining()    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)    stream.writeInt(lengthToWrite)    if (data.hasArray) {      stream.write(data.array())    } else {      while (data.hasRemaining) {        val array = new Array[Byte](data.remaining)        data.get(array)        stream.write(array)      }    }    flush()    nextOffset = stream.getPos()    segment  }

看完上述内容,你们掌握如何理解Spark Streaming的数据可靠性和一致性的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网行业资讯频道,感谢各位的阅读!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯