一、Hadoop是什么?
Hadoop是一个开源的分布式计算框架,用于处理和存储大规模数据集,它是由 Apache Software Foundation维护,能够帮助用户在商用硬件集群上以可靠、高效、容错的方式处理和分析海量数据。为了更好地理解 Hadoop是什么,我们列举了Hadoop一些里程碑:
- 2002年: Nutch项目启动,目标是实现全面的网页抓取、索引和查询功能。
- 2003年: Google发布了三篇具有影响力的论文(Google File System(GFS)、MapReduce和Bigtable),为 Hadoop的文件存储架构奠定了基础。
- 2004年: Cutting在 Nutch中实现了类似 GFS的功能,形成了后来的 Hadoop分布式文件系统(HDFS)。
- 2005年: Nutch项目中实现了 MapReduce的初步版本,随后 Hadoop从 Nutch中分离出来,成为一个独立的开源项目。
- 2006年: Yahoo!雇佣 Doug Cutting,并为Hadoop的发展提供支持,同年,Apache Hadoop项目正式启动。
- 2008年: Hadoop成为 Apache顶级项目,并迎来了快速发展。同年,Cloudera公司成立,推动 Hadoop商业化进程。
- 从此,Hadoop迅猛发展,成为全球最流行的大数据处理框架之一。
二、Hadoop的核心组件
Hadoop的核心组件包括以下 4个:
- HDFS:HDFS是 Hadoop的数据存储层,它负责将大量数据分块存储到集群中的不同节点上,从而实现分布式保存和冗余备份。数据被切分成小块,并复制到多个节点,以防硬件故障。
- MapReduce:MapReduce是 Hadoop的分布式计算模型,它将处理大规模数据集的任务分发到多个节点,允许并行处理。MapReduce由两个阶段组成:Map阶段负责将任务分解为多个小任务;Reduce阶段负责对小任务的结果进行汇总。
- YARN:YARN 是 Hadoop的资源管理层,它负责管理和调度集群中的计算资源。YARN允许多个作业在同一 Hadoop集群上并行执行,这大大提高了 Hadoop集群的利用率和扩展能力。
- Hadoop Common:Hadoop Common是 Hadoop的核心库,提供必要的工具和实用程序,用于支持其他 Hadoop模块。
它们的关系如下:
接下来我们将对各个组件进行详细的分析。
1. HDFS
HDFS,全称 Hadoop Distributed File System(分布式文件系统),它是 Hadoop的核心组件之一,旨在解决海量数据的存储问题。
(1) HDFS 架构概述
HDFS是主从结构的分布式文件系统,由两类节点组成:
- NameNode :NameNode(主节点是 HDFS 的中心控制节点,负责管理文件系统的元数据(比如文件和目录的树状结构,文件块的位置、用户权限等)。它不直接存储数据,而是记录数据存储在哪些 DataNode 上。
- DataNode :DataNode(数据节点)是实际存储数据的节点,它们接收数据块,并定期向 NameNode汇报自己存储的块信息和健康状态。
此外,还有一个可选的组件Secondary NameNode,它用于辅助 NameNode 的元数据备份和日志合并,帮助维持文件系统的高可用性。
HDFS的架构如下图:
(2) 数据存储机制
HDFS的文件是分块存储的,大文件被按照固定大小(默认是 128MB,早期版本是 64MB)划分为多个数据块(Block),每个文件块被存储在集群中的不同 DataNode 上。
为了防止数据因节点故障而丢失,HDFS做了数据冗余与容错机制,它会对每个数据块进行复制,默认情况下每个数据块有 3 副本:
- 一个副本存储在与客户端最近的节点上。
- 第二个副本存储在不同机架的节点上(防止机架故障)。
- 第三个副本存储在第二个副本所在机架的其他节点上。
基于上述的副本机制,HDFS可以确保即使部分 DataNode 失效,数据依然可以通过其他存有副本的节点恢复。
(3) 读写操作流程
HDFS 文件写入过程:
- 客户端与 NameNode 交互:客户端首先将写请求发给 NameNode,然后 NameNode 返回存储该文件每个块的若干 DataNode 节点位置。
- 数据块写入 DataNode:客户端将数据块发送至其中一个 DataNode,这个 DataNode 会将数据块传递给下一个 DataNode,依次类推,直到所有节点都保存该数据块的副本。
- 状态更新:上传完成后,所有涉及的 DataNode 会将其存储状态通知 NameNode,并提交流程结束。
HDFS 文件读取过程:
- 获取元数据:客户端向 NameNode 请求文件位置信息,NameNode 返回相关文件块及其所在 DataNode 的位置。
- 从 DataNode 读取数据块:客户端根据 NameNode 提供的位置从相关的 DataNode 直接读取文件的不同数据块并组装回文件。
- 容错处理:如果某个 DataNode 失效,客户端无法从该NameNode获取块信息,它会尝试从存储副本的其他 DataNode 读取。
2. MapReduce
MapReduce是 Hadoop的分布式计算框架,通过将复杂的任务分解成多个独立的简单任务来实现并行计算,它的核心思想是“Map”和“Reduce”两个阶段:
- Map阶段:将原始数据映射(map)为键值对(key-value pairs)。
- Reduce阶段:将具有相同键的数值进行聚合(reduce)。
(1) MapReduce 执行流程
MapReduce 执行流程包含以下5个步骤:
① Job划分
一个完整的 MapReduce任务称为一个Job,Job是由多个Task构成的,分为Map Task和Reduce Task。
② Input Splitting(输入分片)
MapReduce处理输入数据时,首先将大文件切分成较小的Splits,Map Task的数量通常与输入分片数量一致,每一个Map Task处理一个分片的数据。
③ Map阶段
- 每个Map Task拿到一份Input Split的数据,通过RecordReader将数据转化为一对对的
形式,这里的键值对((K1, V1))根据业务的需求构造。 - Map函数逐条处理这些键值对,输出
形式的新的键值对。 - 这些中间键值对
在写入本地磁盘之前会进行Sort(排序) 和 Partition(分区) 操作,Partition的作用是将具有相同键(K2)的键值对分发到相同的 Reducer中执行。
④ Shuffle and Sort(分发与排序)
Shuffle发生在 Map阶段结束和 Reduce阶段之间,具体过程如下:
- 排序:每个Map Task输出的
对会按键(K2)进行排序,确保同一键的所有值(V2)聚集在一起。 - 分区:Map Task的输出会根据 Partition函数的哈希值发送到不同的Reduce任务中。
- 拉取数据:Reducer从每个 Map输出中拉取需要的分区文件,经过网络传输将其聚合。
⑤ Reduce阶段
- 每个Reduce Task接收到的内容是经过 Shuffle过程后所有键值对(
>)的集合。 - Reduce函数(用户自定义)会对每个K2执行聚合计算,输出为新的键值对
。 - 最终输出结果会通过 RecordWriter写入 HDFS或者其他存储系统。
整个流程可以用下图解释:
2. YARN
YARN(Yet Another Resource Negotiator,另一种资源调度器)是Hadoop 2.x版本中引入的一个集群资源管理框架,它的设计初衷是解决 Hadoop 1.x中 MapReduce计算框架的资源调度和管理局限性,可以支持各种应用程序调度的需求。
(1) 核心组件
YARN包含以下 5个核心组件:
① ResourceManager
ResourceManager(RM,资源管理器)负责全局集群资源的管理和调度,它是YARN的中央控制器,协调集群中的所有应用和计算资源。ResourceManager有两个重要的子组件:
- Scheduler(调度器):负责为应用程序按需分配资源,但不负责任务的执行和重新启动。调度器的排期是决定如何把资源多租户化、多应用程序化的关键,它可以实现不同的调度策略(如公平调度器,容量调度器等)。
- Applications Manager(应用程序管理器):负责各种应用程序的生命周期管理,包括应用程序的启动、检查、资源监控和故障恢复。
② NodeManager
NodeManager(NM,节点管理器)是YARN架构中的分布式代理,负责管理每个计算节点上的资源,具体负责:
- 资源报告:将本节点的CPU、内存等资源使用情况汇报给ResourceManager。
- 容器管理:协调和管理每一个容器(Container)的生命周期,包括启动、监控和停止容器。
- 任务监控和报告:监控执行的任务,并向ResourceManager报告其状态和进度。
③ ApplicationMaster
ApplicationMaster(AM,应用程序管理器)是为每个具体应用程序(如MapReduce Job、Spark Job)启动的专用进程,它负责协调整个应用程序生命周期的调度和执行,协调 ResourceManager与 NodeManager,动态申请和释放资源。每一个应用程序在提交时都会启动一个对应的ApplicationMaster实例。 AM的职责包括:
- 申请资源:向ResourceManager请求所需的资源,定义CPU和内存需求。
- 任意调度:根据资源信息及负载情况,决定将任务分配到哪个节点/容器执行。
- 容器监控:监控启动的任务并处理故障。
④ Container
Container(容器)是YARN中的资源分配单位,它将逻辑运行环境(如CPU、内存等涉及硬件维度的资源)与应用程序任务绑定在一起。ApplicationMaster可以向ResourceManager申请多个容器,并在这些容器中分配任务进行具体的计算。
⑤ Client
客户端负责与YARN进行交互,提交应用程序请求,并向YARN查询任务的执行进度和结果。客户端将资源需求信息传递给ResourceManager,RM会为该任务分配资源,然后将其控制权交给对应的ApplicationMaster。
核心组件模型如下图:
(2) 工作流程
YARN工作流程包含以下4个步骤:
① 应用程序启动流程
- 启动应用程序:客户端通过API或命令行向YARN集群提交应用程序。此时,客户端给RM发送请求,描述任务的资源需求及执行规范。
- 生成ApplicationMaster:ResourceManager根据集群的整体资源利用情况,为应用程序分配第一个容器(Container),并启动相应的ApplicationMaster。
- ApplicationMaster初始化:ApplicationMaster会在启动后向ResourceManager注册自己,并根据初始任务和资源需求向RM申请更多的资源。
- 分配资源:ResourceManager根据集群的实时负载情况和调度策略,将余下的容器分配给ApplicationMaster,AC根据任务需求启动容器,并将计算任务分配给这些容器去执行。
② 资源调度流程
- 请求资源:ApplicationMaster向ResourceManager提交资源申请。请求中指定了计算任务所需的资源(如CPU、内存)以及在何处优先执行(一定节点上或任意节点)。
- 资源心跳与分配:NodeManager通过定期心跳将节点的可用资源(包括剩余内存、CPU等情况)汇报给ResourceManager。ResourceManager根据集群整体资源情况,通过调度器(Scheduler)为机器或容器分配任务。
- 任务分配与启动:ApplicationMaster得到资源分配信息后,再与NodeManager通信,为任务启动容器并分配计算任务。
③ 任务运行与监控
一旦任务开始执行,NodeManager会为Container提供隔离的运行环境(如JVM),ApplicationMaster监视任务的运行状态,并通过心跳与NodeManager通信,确保任务成功完成或在出现故障时重新调度任务。
④ 应用完成与资源回收
当ApplicationMaster检测到所有任务均已成功完成,它会向ResourceManager发送一个"完成"信号,表示应用程序已经完成。随后,ResourceManager会通知NodeManager释放任务所占用的资源容器,集群整体资源状态更新。
三、代码实战
在代码实战环节,我们将通过一个完整的示例来展示如何在 Java中实现一个 MapReduce任务,并将处理结果存储回 HDFS。
任务描述:计算给定文件中每个单词出现的次数文件格式:CVS或者JSON项目结构:项目结构如下:
wordcount/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── WordsCounterMapper.java
│ │ │ ├── WordsCounterReducer.java
│ │ │ └── WordsCounterDriver.java
│ └── resources/
├── input/
│ └── input.cvs
│ input.cvs
└── output/
1.安装Hadoop
我自己Mac电脑安装的是 Hadoop-3.4.1,查看版本指令:hadoop version,关于安装 Hadoop,可以参考这篇文章
2.处理文件
(1) 处理 CSV文件
假设我们有一个超大的 CSV文件:input.csv,如下内容只是展示前几行数据:
id,name,address
1,yuanjava,hangzhou
2,juejin,beijin
3,didi,beijing
...
我们可以使用开源的 Apache Commons CSV工具类来处理该文件,对应的依赖如下:
// maven依赖
org.apache.commons
commons-csv
1.12.0
// gradle 依赖
implementation 'org.apache.commons:commons-csv:1.12.0'
(2) 处理 Json文件
假设我们有一个超大的 Json文件:input.json,如下内容只是展示前几行数据:
[
{"id": 1, "name": "yuanjava", "address": "hangzhou"},
{"id": 2, "name": "juejin", "address": "beijing"},
{"id": 3, "name": "didi", "address": "beijing"},
...
]
我们可以使用开源的 Jackson库来处理文件,对应的依赖如下:
// maven依赖
com.fasterxml.jackson.core
jackson-databind
2.18.1
// gradle 依赖
implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.1'
3.增加 Hadoop依赖
// maven依赖
org.apache.hadoop
hadoop-common
3.4.1
org.apache.hadoop
hadoop-mapreduce-client-core
3.4.1
// gradle依赖
implementation 'org.apache.hadoop:hadoop-common:3.4.1'
implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:3.4.1'
4.编写 Mapper类
Mapper类的作用是处理输入数据,并为每个输入记录生成键值对,在词频统计任务中,Mapper 的任务是将每个单词映射为一个中间键值对 (word, 1)。
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.io.StringReader;
public class WordsCounterMapper extends Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
if (line.isEmpty()) {
return; // Skip empty lines
}
// json以'{'开头, CVS以'['开头
if (line.startsWith("{") || line.startsWith("[")) {
processJson(line, context);
} else {
processCsv(line, context);
}
}
private void processJson(String line, Context context) throws IOException, InterruptedException{
JsonNode rootNode = objectMapper.readTree(line);
if (rootNode.isArray()) {
for (JsonNode node : rootNode) {
String name = node.get("name").asText();
word.set(name);
context.write(word, one);
}
} else if (rootNode.isObject()) {
String name = rootNode.get("name").asText();
word.set(name);
context.write(word, one);
}
}
private void processCsv(String line, Context context) throws IOException, InterruptedException{
StringReader reader = new StringReader(line);
Iterable records = CSVFormat.DEFAULT.parse(reader);
for (CSVRecord record : records) {
// Assuming the CSV has a header row and "name" is one of the columns
String name = record.get("name");
word.set(name);
context.write(word, one);
}
}
}
5.编写 Reducer类
Reducer类的作用是对来自 Mapper的中间键值对进行汇总,在词频统计任务中,Reduce 的任务是对相同单词的计数进行累加。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordsCounterDriver {
public static void main(String[] args){
if (args.length != 2) {
System.err.println("Please enter input path and output path.");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordCounter");
job.setJarByClass(WordsCounterDriver.class);
job.setMapperClass(WordsCounterMapper.class);
job.setReducerClass(WordsCounterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6.编写 Driver 类
Driver 类用于配置 MapReduce 作业并启动作业。它指定了 Mapper 和 Reducer 的实现类,以及输入和输出路径等。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordsCounterDriver {
public static void main(String[] args){
if (args.length != 2) {
System.err.println("Please enter input path and output path.");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordCounter");
job.setJarByClass(WordsCounterDriver.class);
job.setMapperClass(WordsCounterMapper.class);
job.setReducerClass(WordsCounterReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
7.运行和查看结果
(1) 运行步骤:
- 编译和打包:将上述代码编译并打包成一个 JAR 文件。
- 上传数据到 HDFS:将待处理的 CSV 和 JSON 数据上传到 HDFS 的一个目录中。
- 执行 MapReduce作业:在 Hadoop集群上运行该 JAR文件,并指定输入和输出路径,指令如下:
hadoop jar wordcounter.jar WordCounterDriver input/input.cvs output/
hadoop jar wordcounter.jar WordCounterDriver input/input.json output/
- /input/input.cvs(json) 是 HDFS上包含 CSV和 JSON文件的目录。
- /output 是用于存储结果的 HDFS目录。注意:输出目录不能预先存在,否则作业将失败。
(2) 查看输出结果
任务完成后,输出结果将会保存在指定的输出目录中,我们可以使用以下命令查看结果:
hadoop fs -cat output/part-r-00000
输出结果可能如下:
yuanjava 100
juejin 3000
didi 100
...
8.代码解释与优化
(1) Mapper详解
继承与泛型:Mapper
map 方法:对每一行文本进行分割,然后对每个单词输出一个键值对。
(2) Reducer详解
继承与泛型:Reducer
reduce 方法:对每个单词的所有计数进行累加输出。
(3) Driver详解
Job 配置:设置 Mapper和 Reducer类,指定输入输出格式。
路径设置:通过命令行参数指定输入输出路径。
(4) 优化建议
- Combiner使用:在 Map端进行部分汇总,减少传输到 Reduce端的数据量。
- 数据压缩:启用中间数据压缩减少网络传输开销。
- 分区与排序:根据数据特性自定义分区器和排序规则。
通过这个简单的例子,我们展示了如何在 Java中实现一个基本的 MapReduce程序,通过定义 Mapper和 Reducer 再结合 Driver,能够实现对大规模数据集的分布式处理。如果要处理更复杂的任务,可以通过自定义分区器、排序规则、Combiner 等方式进行优化。
通过此示例,我们可以更好地理解 Hadoop MapReduce 的工作原理和编程模型以及它对于大数据处理的重要性。
四、总结
本文,我们分析了 Hadoop的核心组件及其工作原理,让我们对 Hadoop有了一定的认识。本人有几年 Hadoop的使用经验,从整体上看,Hadoop的使用属于中等难度,Hadoop的生态比完善,学习难度比较大,但是,不得不说 Hadoop的设计思维很优秀,值得我们花时间去学习。
2003年,Google发布 Google File System(GFS)、MapReduce和 Bigtable 三篇论文后,Doug Cutting和 Michael J. Cafarella抓住了机会,共同创造了 Hadoop。Google的这三篇经典论文是大数据领域的经典之作,但它的影响力远不止大数据领域,因此,如果想成为一名优秀的工程师,阅读原滋原味的优秀论文绝对是受益无穷的一种方式。
Hadoop展示了大数据领域一个优秀的架构模式:集中管理,分布式存储与计算。这种优秀的架构模式同样还运用在 Spark、Kafka、Flink、HBase、Elasticsearch、Cassandra等这些优秀的框架上,它在大数据领域展示了显著的优势。
最近一年,我从事的项目有幸和 MIT,Standford这样顶尖学府出来的工程师合作,他们强悍的数学建模能力以及对同一个问题思考的深度确实让我望尘莫及,在互联网大厂卷了这么多年,每天都有写完的需求开不完的会,绝大多数程序员都被业务裹挟着,导致很多优秀的人无法从业务中抽离出来去研究更深层领域的东西,陷入无尽的内卷。
如何在这个内卷的环境中让自己立于不败之地?基本功绝对是重中之重。
最后,因为 Hadoop的内容太多,很难仅凭本文把 Hadoop讲透,希望在分享我个人对 Hadoop理解的同时也能抛砖引玉,激发同行写出更多优秀的文章,对于技术,对于行业产生共多思考的共鸣。