文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

HDFS 底层交互原理,看这篇就够了

2024-12-13 23:59

关注

HDFS全称是 Hadoop Distribute File System,是 Hadoop最重要的组件之一,也被称为分步式存储之王。本文主要从 HDFS 高可用架构组成、HDFS 读写流程、如何保证可用性以及高频面试题出发,提高大家对 HDFS 的认识,掌握一些高频的 HDFS 面试题。本篇文章概览如下图:

本篇文章概览

1.HA 架构组成

1.1HA架构模型

在 HDFS 1.X 时,NameNode 是 HDFS 集群中可能发生单点故障的节点,集群中只有一个 NameNode,一旦 NameNode 宕机,整个集群将处于不可用的状态。

在 HDFS 2.X 时,HDFS 提出了高可用(High Availability, HA)的方案,解决了 HDFS 1.X 时的单点问题。在一个 HA 集群中,会配置两个 NameNode ,一个是 Active NameNode(主),一个是 Stadby NameNode(备)。主节点负责执行所有修改命名空间的操作,备节点则执行同步操作,以保证与主节点命名空间的一致性。HA 架构模型如下图所示:

HA 架构组成2

HA 集群中所包含的进程的职责各不相同。为了使得主节点和备用节点的状态一致,采用了 Quorum Journal Manger (QJM)方案解决了主备节点共享存储问题,如图 JournalNode 进程,下面依次介绍各个进程在架构中所起的作用:

注意:QJM 方案是基于 Paxos 算法实现的,集群由 2N + 1 JouranlNode 进程组成,最多可以容忍 N 台 JournalNode 宕机,宕机数大于 N 台,这个算法就失效了!

DataNode 向 NameNode 汇报当前块信息的时间间隔,默认 6 小时,其配置参数名如下:

  1.  
  2.  <name>dfs.blockreport.intervalMsecname
  3.  21600000 
  4.  Determines block reporting interval in  
  5. milliseconds. 
  6.  

 

1.2HA主备故障切换流程

HA 集群刚启动时,两个 NameNode 节点状态均为 Standby,之后两个 NameNode 节点启动 ZKFC 进程后会去 ZooKeeper 集群抢占分步式锁,成功获取分步式锁,ZooKeeper 会创建一个临时节点,成功抢占分步式锁的 NameNode 会成为 Active NameNode,ZKFC 便会实时监控自己的 NameNode。

HDFS 提供了两种 HA 状态切换方式:一种是管理员手动通过DFSHAAdmin -faieover执行状态切换;另一种则是自动切换。下面分别从两种情况分析故障的切换流程:

主 NameNdoe 宕机后,备用 NameNode 如何升级为主节点?

当主 NameNode 宕机后,对应的 ZKFC 进程检测到 NameNode 状态,便向 ZooKeeper 发生删除锁的命令,锁删除后,则触发一个事件回调备用 NameNode 上的 ZKFC

ZKFC 得到消息后先去 ZooKeeper 争夺创建锁,锁创建完成后会检测原先的主 NameNode 是否真的挂掉(有可能由于网络延迟,心跳延迟),挂掉则升级备用 NameNode 为主节点,没挂掉则将原先的主节点降级为备用节点,将自己对应的 NameNode 升级为主节点。

主 NameNode 上的 ZKFC 进程挂掉,主 NameNode 没挂,如何切换?

ZKFC 挂掉后,ZKFC 和 ZooKeeper 之间 TCP 链接会随之断开,session 也会随之消失,锁被删除,触发一个事件回调备用 NameNode ZKFC,ZKFC 得到消息后会先去 ZooKeeper 争夺创建锁,锁创建完成后也会检测原先的主 NameNode 是否真的挂掉,挂掉则升级 备用 NameNode 为主节点,没挂掉则将主节点降级为备用节点,将自己对应的 NameNode 升级为主节点。

1.3Block、packet及chunk 概念

在 HDFS 中,文件存储是按照数据块(Block)为单位进行存储的,在读写数据时,DFSOutputStream使用 Packet 类来封装一个数据包。每个 Packet 包含了若干个 chunk 和对应的 checksum。

2.源码级读写流程

2.1HDFS 读流程

HDFS读流程

我们以从 HDFS 读取一个 information.txt 文件为例,其读取流程如上图所示,分为以下几个步骤:

打开 information.txt 文件:首先客户端调用 DistributedFileSystem.open() 方法打开文件,这个方法在底层会调用DFSclient.open() 方法,该方法会返回一个 HdfsDataInputStream 对象用于读取数据块。但实际上真正读取数据的是 DFSInputStream ,而 HdfsDataInputStream 是 DFSInputStream 的装饰类(new HdfsDataInputStream(DFSInputStream))。

从 NameNode 获取存储 information.txt 文件数据块的 DataNode 地址:即获取组成 information.txt block 块信息。在构造输出流 DFSInputStream 时,会通过调用 getBlockLocations() 方法向 NameNode 节点获取组成 information.txt 的 block 的位置信息,并且 block 的位置信息是按照与客户端的距离远近排好序。

连接 DataNode 读取数据块: 客户端通过调用 DFSInputStream.read() 方法,连接到离客户端最近的一个 DataNode 读取 Block 块,数据会以数据包(packet)为单位从 DataNode 通过流式接口传到客户端,直到一个数据块读取完成;DFSInputStream会再次调用 getBlockLocations() 方法,获取下一个最优节点上的数据块位置。

直到所有文件读取完成,调用 close() 方法,关闭输入流,释放资源。

从上述流程可知,整个过程最主要涉及到 open()、read()两个方法(其它方法都是在这两个方法的调用链中调用,如getBlockLocations()),下面依次介绍这2个方法的实现。

注:本文是以 hadoop-3.1.3 源码为基础!

事实上,在调用 DistributedFileSystem.open()方法时,底层调用的是 DFSClient.open()方法打开文件,并构造 DFSInputStream 输入流对象。

  1. public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) 
  2.       throws IOException { 
  3.     //检查DFSClicent 的运行状况 
  4.     checkOpen(); 
  5.     // 从 namenode 获取 block 位置信息,并存到 LocatedBlocks 对象中,最终传给 DFSInputStream 的构造方法 
  6.     try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) { 
  7.       LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0); 
  8.       //调用 openInternal 方法,获取输入流 
  9.       return openInternal(locatedBlocks, src, verifyChecksum); 
  10.     } 
  11.   } 

整个 open()方法分为两部分:

第一部分是,调用 checkOpen()方法检查 DFSClient 的运行状况,调用getLocateBlocks()方法,获取 block 的位置消息

第二部分是,调用openInternal()方法,获取输入流。

  1. private DFSInputStream openInternal(LocatedBlocks locatedBlocks, String src, 
  2.       boolean verifyChecksum) throws IOException { 
  3.     if (locatedBlocks != null) { 
  4.         //获取纠删码策略,纠删码是 Hadoop 3.x 的新特性,默认不启用纠删码策略 
  5.       ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy(); 
  6.       if (ecPolicy != null) { 
  7.           //如果用户指定了纠删码策略,将返回一个 DFSStripedInputStream 对象 
  8.           //DFSStripedInputStream 会将数据逻辑字节范围的请求转换为存储在 DataNode 上的内部块 
  9.         return new DFSStripedInputStream(this, src, verifyChecksum, ecPolicy, 
  10.             locatedBlocks); 
  11.       } 
  12.      //如果未指定纠删码策略,调用 DFSInputStream 的构造方法,并且返回该 DFSInputStream 的对象 
  13.       return new DFSInputStream(this, src, verifyChecksum, locatedBlocks); 
  14.     } else { 
  15.       throw new IOException("Cannot open filename " + src); 
  16.     } 
  17.   } 
  1. DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, 
  2.       LocatedBlocks locatedBlocks) throws IOException { 
  3.     this.dfsClient = dfsClient; 
  4.     this.verifyChecksum = verifyChecksum; 
  5.     this.src = src; 
  6.     synchronized (infoLock) { 
  7.       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); 
  8.     } 
  9.     this.locatedBlocks = locatedBlocks; 
  10.     //调用 openInfo 方法,参数:refreshLocatedBlocks,是否要更新 locateBlocks 属性。 
  11.     openInfo(false); 
  12.   } 

构造方法做了2件事:

第一部分是初始化 DFSInputStream 属性,其中 verifyChecksum 含义是:读取数据时是否进行校验,cachingStrategy,指的是缓存策略。

第二部分,调用 openInfo()方法。

思考:为甚么要更新最后一个数据块长度?

因为可能会有这种情况出现,当客户端在读取文件时,最后一个文件块可能还在构建的状态(正在被写入),Datanode 还未上报最后一个文件块,那么 namenode 所保存的数据块长度有可能小于 Datanode实际存储的数据块长度,所以需要与 Datanode 通信以确认最后一个数据块的真实长度。

获取到 DFSInputStream 流对象后,并且得到了文件的所有 Block 块的位置信息,接下来调用read()方法,从 DataNode 读取数据块。

注:在openInfo() 方法

在openInfp()中,会从 namenode 获取当前正在读取文件的最后一个数据块的长度 lastBlockBeingWrittenLength,如果返回的最后一个数据块的长度为 -1 ,这是一种特殊情况:即集群刚重启,DataNode 可能还没有向 NN 进行完整的数据块汇报,这时部分数据块位置信息还获取不到,也获取不到这些块的长度,则默认会重试 3 次,默认每次等待 4 秒,重新去获取文件对应的数据块的位置信息以及最后数据块长度;如果最后一个数据块的长度不为 -1,则表明,最后一个数据块已经是完整状态。

  1. public synchronized int read(@Nonnull final byte buf[], int offint len) 
  2.       throws IOException { 
  3.     //验证输入的参数是否可用 
  4.     validatePositionedReadArgs(pos, buf, off, len); 
  5.     if (len == 0) { 
  6.       return 0; 
  7.     } 
  8.     //构造字节数组作为容器  
  9.     ReaderStrategy byteArrayReader = 
  10.         new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient); 
  11.     //调用 readWithStrategy()方法读取数据 
  12.     return readWithStrategy(byteArrayReader); 
  13.   } 

当用户代码调用read()方法时,其底层调用的是 DFSInputStream.read()方法。该方法从输入流的 off 位置开始读取,读取 len 个字节,然后存入 buf 字节数组中。源码中构造了一个 ByteArrayStrategy 对象,该对象封装了 5 个属性,分别是:字节数组 buf,读取到的字节存入该字节数组;off,读取的偏移量;len,将要读取的目标长度;readStatistics,统计计数器,客户端。最后通过调用 readWithStrategy()方法去读取文件数据块的数据。

总结:HDFS 读取一个文件,调用流程如下:(中间涉及到的部分方法未列出)

usercode 调用 open() ---> DistributedFileSystem.open() ---> DFSClient.open() ---> 返回一个 DFSInputStream 对象给 DistributedFileSystem ---> new hdfsDataInputStream(DFSInputStream) 并返回给用户;

usercode 调用 read() ---> 底层DFSIputStream.read() ---> readWithStrategy(bytArrayReader)

2.2HDFS 写流程

介绍完 HDFS 读的流程,接下来看看一个文件的写操作的实现。从下图中可以看出,HDFS 写流程涉及的方法比较多,过程也比较复杂。

在 namenode 创建文件: 当 client 写一个新文件时,首先会调用 DistributeedFileSytem.creat() 方法,DistributeFileSystem 是客户端创建的一个对象,在收到 creat 命令之后,DistributeFileSystem 通过 RPC 与 NameNode 通信,让它在文件系统的 namespace 创建一个独立的新文件;namenode 会先确认文件是否存在以及客户端是否有权限,确认成功后,会返回一个 HdfsDataOutputStream 对象,与读流程类似,这个对象底层包装了一个 DFSOutputStream 对象,它才是写数据的真正执行者。

建立数据流 pipeline 管道: 客户端得到一个输出流对象,还需要通过调用 ClientProtocol.addBlock()向 namenode 申请新的空数据块,addBlock( ) 会返回一个 LocateBlock 对象,该对象保存了可写入的 DataNode 的信息,并构成一个 pipeline,默认是有三个 DataNode 组成。

通过数据流管道写数据: 当 DFSOutputStream调用 write()方法把数据写入时,数据会先被缓存在一个缓冲区中,写入的数据会被切分成多个数据包,每当达到一个数据包长度(默认65536字节)时,

DFSOutputStream会构造一个 Packet 对象保存这个要发送的数据包;新构造的 Packet 对象会被放到 DFSOutputStream维护的 dataQueue 队列中,DataStreamer 线程会从 dataQueue 队列中取出 Packet 对象,通过底层 IO 流发送到 pipeline 中的第一个 DataNode,然后继续将所有的包转到第二个 DataNode 中,以此类推。发送完毕后,

这个 Packet 会被移出 dataQueue,放入 DFSOutputStream 维护的确认队列 ackQueue 中,该队列等待下游 DataNode 的写入确认。当一个包已经被 pipeline 中所有的 DataNode 确认了写入磁盘成功,这个数据包才会从确认队列中移除。

关闭输入流并提交文件: 当客户端完成了整个文件中所有的数据块的写操作之后,会调用 close() 方法关闭输出流,客户端还会调用 ClientProtoclo.complete( ) 方法通知 NameNode 提交这个文件中的所有数据块,

NameNode 还会确认该文件的备份数是否满足要求。对于 DataNode 而言,它会调用 blockReceivedAndDelete() 方法向 NameNode 汇报,NameNode 会更新内存中的数据块与数据节点的对应关系。

从上述流程来看,整个写流程主要涉及到了 creat()、write()这些方法,下面着重介绍下这两个方法的实现。当调用 DistributeedFileSytem.creat() 方法时,其底层调用的其实是 DFSClient.create()方法,其源码如下:

  1. public DFSOutputStream create(String src, FsPermission permission, 
  2.           EnumSet flag, boolean createParent,  
  3.                     short replication,long blockSize, 
  4.                     Progressable progress, int buffersize, 
  5.                     ChecksumOpt checksumOpt,  
  6.                     InetSocketAddress[] favoredNodes, 
  7.                     String ecPolicyName) throws IOException { 
  8.     //检查客户端是否已经打开 
  9.     checkOpen(); 
  10.     final FsPermission masked = applyUMask(permission); 
  11.     LOG.debug("{}: masked={}", src, masked); 
  12.     //调用 DFSOutputStream.newStreamForCreate()创建输出流对象 
  13.     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, 
  14.         src, masked, flag, createParent, replication, blockSize, progress, 
  15.         dfsClientConf.createChecksum(checksumOpt), 
  16.         getFavoredNodesStr(favoredNodes), ecPolicyName); 
  17.     //获取 HDFS 文件的租约 
  18.     beginFileLease(result.getFileId(), result); 
  19.     return result; 
  20.   } 

DistributeFileSystem.create()在底层会调用 DFSClient.create()方法。该方法主要完成三件事:

租约:指的是租约持有者在规定时间内获得该文件权限(写文件权限)的合同

第一,检查客户端是否已经打开

第二,调用静态的 newStreamForCreate() 方法,通过 RPC 与 NameNode 通信创建新文件,并构建出 DFSOutputStream流

第三,执行 beginFileLease() 方法,获取新J建文件的租约

  1. static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, 
  2.       FsPermission masked, EnumSet flag, boolean createParent, 
  3.       short replication, long blockSize, Progressable progress, 
  4.       DataChecksum checksum, String[] favoredNodes, String ecPolicyName) 
  5.       throws IOException { 
  6.   
  7.     try (TraceScope ignored = 
  8.              dfsClient.newPathTraceScope("newStreamForCreate", src)) { 
  9.       HdfsFileStatus stat = null
  10.  
  11.       // 如果发生异常,并且异常为 RetryStartFileException ,便重新调用create()方法,重试次数为 10 
  12.       boolean shouldRetry = true
  13.       //重试次数为 10 
  14.       int retryCount = CREATE_RETRY_COUNT; 
  15.       while (shouldRetry) { 
  16.         shouldRetry = false
  17.         try { 
  18.            //调用 ClientProtocol.create() 方法,在命名空间中创建 HDFS 文件 
  19.           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, 
  20.               new EnumSetWritable<>(flag), createParent, replication, 
  21.               blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); 
  22.           break; 
  23.         } catch (RemoteException re) { 
  24.           IOException e = re.unwrapRemoteException(AccessControlException.class, 
  25.               //....此处省略了部分异常类型 
  26.               UnknownCryptoProtocolVersionException.class); 
  27.           if (e instanceof RetryStartFileException) {//如果发生异常,判断异常是否为 RetryStartFileException 
  28.             if (retryCount > 0) { 
  29.               shouldRetry = true
  30.               retryCount--; 
  31.             } else { 
  32.               throw new IOException("Too many retries because of encryption" + 
  33.                   " zone operations", e); 
  34.             } 
  35.           } else { 
  36.             throw e; 
  37.           } 
  38.         } 
  39.       } 
  40.       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); 
  41.       final DFSOutputStream out
  42.       if(stat.getErasureCodingPolicy() != null) { 
  43.         //如果用户指定了纠删码策略,将创建一个 DFSStripedOutputStream 对象 
  44.         out = new DFSStripedOutputStream(dfsClient, src, stat, 
  45.             flag, progress, checksum, favoredNodes); 
  46.       } else { 
  47.         //如果没指定纠删码策略,调用构造方法创建一个 DFSOutputStream 对象 
  48.         out = new DFSOutputStream(dfsClient, src, stat, 
  49.             flag, progress, checksum, favoredNodes, true); 
  50.       } 
  51.       //启动输出流对象的 Datastreamer 线程 
  52.       out.start(); 
  53.       return out
  54.     } 
  55.   } 

newStreamForCreate()方法总共涉及三个部分:

当构建完 DFSOutputStream 输出流时,客户端调用 write() 方法把数据包写入 dataQueue 队列,在将数据包发送到 DataNode 之前,DataStreamer会向 NameNode 申请分配一个新的数据块

然后建立写这个数据块的数据流管道(pipeline),之后DataStreamer 会从 dataQueue 队列取出数据包,通过 pipeline 依次发送给各个 DataNode。每个数据包(packet)都有对应的序列号,当一个数据块中所有的数据包都发送完毕,

并且都得到了 ack 消息确认后,Datastreamer会将当前数据块的 pipeline 关闭。通过不断循环上述过程,直到该文件(一个文件会被切分为多个 Block)的所有数据块都写完成。

调用 ClientProtocol.create()方法,创建文件,如果发生异常为 RetryStartFileException ,则默认重试10次

调用 DFSStripedOutputStream 或 DFSOutputStream 构造方法,构造输出流对象

启动 Datastreamer 线程,Datastreamer 是 DFSOutputStream 中的一个内部类,负责构建 pipeline 管道,并将数据包发送到 pipeline 中的第一个 DataNode

  1. protected synchronized void writeChunk(ByteBuffer buffer, int len, 
  2.       byte[] checksum, int ckoff, int cklen) throws IOException { 
  3.     writeChunkPrepare(len, ckoff, cklen); 
  4.   
  5.     //将当前校验数据、校验块写入数据包中 
  6.     currentPacket.writeChecksum(checksum, ckoff, cklen); 
  7.     currentPacket.writeData(buffer, len); 
  8.     currentPacket.incNumChunks(); 
  9.     getStreamer().incBytesCurBlock(len); 
  10.  
  11.     // 如果当前数据包已经满了,或者写满了一个数据块,则将当前数据包放入发送队列中 
  12.     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || 
  13.             getStreamer().getBytesCurBlock() == blockSize) { 
  14.       enqueueCurrentPacketFull(); 
  15.     } 
  16.   } 

最终写数据调用都的是 writeChunk ()方法,其会首先调用 checkChunkPrepare()构造一个 Packet 对象保存数据包,

然后调用writeCheckSum()和writeData()方法,将校验块数据和校验和写入 Packet 对象中。

当 Packet 对象写满时(每个数据包都可以写入 maxChunks 个校验块),则调用 enqueueCurrentPacketFull()方法,将当前的 Packet 对象放入 dataQueue 队列中,等待 DataStreamer 线程的处理。

如果当前数据块中的所有数据都已经发送完毕,则发送一个空数据包标识所有数据已经发送完毕。

3.HDFS 如何保证可用性?

在 1.1 节中已经阐述了 HDFS 的高可用的架构,分别涉及到 NameNode,DataNode,Journalnode,ZKFC等组件。所以,在谈及 HDFS 如何保证可用性,要从多个方面去回答。

4.HDFS 高频面试题

本文转载自微信公众号「小林玩大数据」,可以通过以下二维码关注。转载本文请联系小林玩大数据公众号。

 

来源:小林玩大数据内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯