执行start-dfs.sh脚本后,集群是如何启动的?
本文阅读并注释了start-dfs脚本,以及namenode和datanode的启动主要流程流程源码。
源码获取
-
拉取Apache Hadoop官方源码
https://github.com/apache/hadoop
-
用idea打开...
-
切换到想看的版本...
这里用的最新版本3.3.1
阅读目标
本篇的阅读目标是搞明白hadoop中的start-dfs.sh启动脚本执行后都做了什么,hadoop中的NameNode,DataNode启动过程等大致流程,不会细追细节。
hdfs集群的启动命令为:start-dfs.sh
, 脚本的位置在下图中:
![image-
脚本中大致分位两块内容,第一部分是调用hdfs-config.sh脚本配置hdfs以及hadoop的参数以及环境等,第二部分是启动datanode、namenode以及secondary namenode等等。我们的重点是看第二部分的启动流程。
hdfs-config 简述
start-dfs.sh中启动hdfs-config.sh的代码如下:
# let"s locate libexec...
if [[ -n "${HADOOP_HOME}" ]]; then
HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
else
HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi
HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
# shellcheck disable=SC2034
HADOOP_NEW_CONFIG=true
if [[ -f "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh" ]]; then
. "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh"
else
echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hdfs-config.sh." 2>&1
exit 1
fi
在hdfs-config.sh脚本中会尝试启动hdfs-evn.sh脚本(如果存在)
之后会检查以及设置HDFS的各种参数,例如:
# turn on the defaults
export HDFS_AUDIT_LOGGER=${HDFS_AUDIT_LOGGER:-INFO,NullAppender}
export HDFS_NAMENODE_OPTS=${HDFS_NAMENODE_OPTS:-"-Dhadoop.security.logger=INFO,RFAS"}
export HDFS_SECONDARYNAMENODE_OPTS=${HDFS_SECONDARYNAMENODE_OPTS:-"-Dhadoop.security.logger=INFO,RFAS"}
export HDFS_DATANODE_OPTS=${HDFS_DATANODE_OPTS:-"-Dhadoop.security.logger=ERROR,RFAS"}
export HDFS_PORTMAP_OPTS=${HDFS_PORTMAP_OPTS:-"-Xmx512m"}
# depending upon what is being used to start Java, these may need to be
# set empty. (thus no colon)
export HDFS_DATANODE_SECURE_EXTRA_OPTS=${HDFS_DATANODE_SECURE_EXTRA_OPTS-"-jvm server"}
export HDFS_NFS3_SECURE_EXTRA_OPTS=${HDFS_NFS3_SECURE_EXTRA_OPTS-"-jvm server"}
再之后会启动hadoop-config.sh脚本:
# shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh
if [[ -n "${HADOOP_COMMON_HOME}" ]] &&
[[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then
. "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh"
elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then
. "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then
. "${HADOOP_HOME}/libexec/hadoop-config.sh"
else
echo "ERROR: Hadoop common not found." 2>&1
exit 1
fi
hadoop-config.sh
是最基本的、公用的环境变量配置脚本,会再调用etc/hadoop/hadoop-env.sh
脚本。主要是配置java启动项相关参数,比如java执行环境的classpath等。
hdfs-config.sh
一系列脚本的整体功能就是保证启动hdfs集群前对hdfs和hadoop的各种环境变量进行配置。
start-dfs.sh
后续就是逐步启动各个节点(namenodes,datanodes,secondary namenodes,quorumjournal nodes,quorumjournal nodes),如果是ha集群还会启动ZK Failover controllers
脚本代码
start-dfs.sh
中启动namenode的代码:
#---------------------------------------------------------
# namenodes
# 找到配置中(如果没有配置取当前节点为)的namenode节点
NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)
if [[ -z "${NAMENODES}" ]]; then
NAMENODES=$(hostname)
fi
# 执行启动命令
echo "Starting namenodes on [${NAMENODES}]"
hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs"
--workers
--config "${HADOOP_CONF_DIR}"
--hostnames "${NAMENODES}"
--daemon start
namenode ${nameStartOpt}
HADOOP_JUMBO_RETCOUNTER=$?
去hadoop-hdfs > src > mian > bin > hdfs
中查看namenode
命令:
# 命令描述:用于启动namenode
hadoop_add_subcommand "namenode" daemon "run the DFS namenode"
# 命令处理程序
namenode)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME="org.apache.hadoop.hdfs.server.namenode.NameNode"
hadoop_add_param HADOOP_OPTS hdfs.audit.logger "-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER}"
;;
# 执行函数命令
# everything is in globals at this point, so call the generic handler
hadoop_generic_java_subcmd_handler
这里就定位到了具体的处理类org.apache.hadoop.hdfs.server.namenode.NameNode
。继续跟进到hadoop_generic_java_subcmd_handler
函数定义的地方-脚本hdfs
:
## @description Handle subcommands from main program entries
## @audience private
## @stability evolving
## @replaceable yes
function hadoop_generic_java_subcmd_handler
{
# ...... 省略
# do the hard work of launching a daemon or just executing our interactive
# java class
if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then
if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then
hadoop_secure_daemon_handler
"${HADOOP_DAEMON_MODE}"
"${HADOOP_SUBCMD}"
"${HADOOP_SECURE_CLASSNAME}"
"${daemon_pidfile}"
"${daemon_outfile}"
"${priv_pidfile}"
"${priv_outfile}"
"${priv_errfile}"
"${HADOOP_SUBCMD_ARGS[@]}"
else
hadoop_daemon_handler
"${HADOOP_DAEMON_MODE}"
"${HADOOP_SUBCMD}"
"${HADOOP_CLASSNAME}"
"${daemon_pidfile}"
"${daemon_outfile}"
"${HADOOP_SUBCMD_ARGS[@]}"
fi
exit $?
else
hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"
fi
}
function hadoop_java_exec
{
# run a java command. this is used for
# non-daemons
local command=$1
local class=$2
shift 2
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
hadoop_debug "Final JAVA_HOME: ${JAVA_HOME}"
hadoop_debug "java: ${JAVA}"
hadoop_debug "Class name: ${class}"
hadoop_debug "Command line options: $*"
export CLASSPATH
#shellcheck disable=SC2086
exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
}
可以看到最终还是利用java命令来执行该类。
NameNode 源码
在源码中定位到org.apache.hadoop.hdfs.server.namenode.NameNode
类。按照类的加载顺序来看NameNode启动流程:
静态代码块
static{
HdfsConfiguration.init();
}
// 继续跟进代码,进入HdfsConfiguration类中:
@InterfaceAudience.Private
public class HdfsConfiguration extends Configuration {
static {
addDeprecatedKeys();
// 加载默认的配置文件
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-rbf-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("hdfs-rbf-site.xml");
}
public static void init() {
}
mian方法
public static void main(String argv[]) throws Exception {
//分析传入的参数,是否是帮助参数
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
//打印一些启动日志信息
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
//创建namenode
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
//加入集群,HA,联邦集群都是有多个NameNode
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
需要关注的是NameNode namenode = createNameNode(argv, null);
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
// 日志信息
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
// 准备配置文件对象
conf = new HdfsConfiguration();
// 将一些通用参数解析到配置中。
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// 解析其余的NameNode特定参数,放到配置中。
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
boolean aborted = false;
// 针对startup对象进行switch case 选择
switch (startOpt) {
case FORMAT:
// 格式化
// 安装hadoop后第一次启动之前要执行的格式化命令 hadoop namenode -format
aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
case GENCLUSTERID:
String clusterID = NNStorage.newClusterID();
LOG.info("Generated new cluster id: {}", clusterID);
terminate(0);
return null;
case ROLLBACK:
aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
case BOOTSTRAPSTANDBY:
String[] toolArgs = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
case INITIALIZESHAREDEDITS:
aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
case BACKUP:
case CHECKPOINT:
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
case RECOVER:
NameNode.doRecovery(startOpt, conf);
return null;
case METADATAVERSION:
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
case UPGRADEONLY:
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
default:
// 正常启动的时候就会走到这里
// metrics:度量系统记录详细运行信息
DefaultMetricsSystem.initialize("NameNode");
// 初始化NameNode
return new NameNode(conf);
}
}
NameNode构造方法
public NameNode(Configuration conf) throws IOException {
// 默认为正常的NameNode
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
// 将配置文件赋值到父类的静态变量中
super(conf);
// 初始化Tracer
// 在“进程”中使用一个Tracer实例来收集和分发它的跟踪范围。Tracer实例是所有跟踪的一站式商店。
this.tracer = new Tracer.Builder("NameNode").
conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
build();
// TracerConfigurationManager类提供了通过RPC协议在运行时管理跟踪器配置的函数。
this.tracerConfigurationManager =
new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.role = role;
// clientNamenodeAddress : 客户端将用来访问这个namenode或名称服务的namenode地址。对于使用逻辑URI的HA配置,它将是逻辑地址。
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(
conf, nsId);
// 虚拟机中搭建集群启动日志打印为:
// 2021-07-07 17:45:46,560 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: Clients are to use wanglj01:9000 to access this namenode/service.
if (clientNamenodeAddress != null) {
LOG.info("Clients should use {} to access"
+ " this namenode/service.", clientNamenodeAddress);
}
// ha集群相关
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
// 给联邦模式下准备的,主要是设置联邦模式下namenode的地址和RPC地址
initializeGenericKeys(conf, nsId, namenodeId);
// 初始化namenode的核心方法
initialize(getConf());
// HA相关内容
state.prepareToEnterState(haContext);
try {
haContext.writeLock();
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stopAtException(e);
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stopAtException(e);
throw e;
}
// 启动成功
notBecomeActiveInSafemode = conf.getBoolean(
DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE,
DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT);
this.started.set(true);
}
NameNode#initialize
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
// UserGroupInformation类作用:
// Hadoop的用户和组信息。该类封装了一个JAAS Subject,并提供了确定用户用户名和组的方法。它同时支持Windows、Unix和Kerberos登录模块。
// 方法简介:设置UGI的静态配置。特别是设置安全身份验证机制和组查找服务。
UserGroupInformation.setConfiguration(conf);
// 以NameNode配置的用户登录。
loginAsNameNodeUser(conf);
// 初始化namemode的度量系统
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
// 初始化jvm监听的度量系统
// JvmPauseMonitor类的作用:
// 此类建立一个简单的线程。在此线程中,在循环中运行sleep一段时间方法,如果sleep花费的时间比传递给sleep方法的时间长,
// 就意味着JVM或者宿主机已经出现了停顿处理现象,可能会导致其它问题,如果这种停顿被监测出来,线程会打印一个消息。
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
if (conf.getBoolean(DFS_NAMENODE_GC_TIME_MONITOR_ENABLE,
DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT)) {
long observationWindow = conf.getTimeDuration(
DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS,
DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT,
TimeUnit.MILLISECONDS);
long sleepInterval = conf.getTimeDuration(
DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS,
DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS_DEFAULT,
TimeUnit.MILLISECONDS);
gcTimeMonitor = new Builder().observationWindowMs(observationWindow)
.sleepIntervalMs(sleepInterval).build();
gcTimeMonitor.start();
metrics.getJvmMetrics().setGcTimeMonitor(gcTimeMonitor);
}
// 启动httpserver
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 启动nameNode时从磁盘加载fsimage以及edits文件,初始化FsNamesystem、FsDirectory、 LeaseManager等
loadNamesystem(conf);
startAliasMapServerIfNecessary(conf);
// 创建rpcserver,支持namenode与datanode,client进行通信的协议
// 封装了NameNodeRpcServer clientRpcServer,支持 ClientNamenodeProtocol、DatanodeProtocolPB等协议
// 啥是rpc看这里:https://www.zhihu.com/question/25536695
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server"s bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
// 启动常用到主备状态的服务
startCommonServices(conf);
// 启动一个计时器,定期将NameNode度量写入日志文件。可以通过配置禁用此行为。
startMetricsLogger(conf);
}
NameNode#startCommonServices
private void startCommonServices(Configuration conf) throws IOException {
// 创建NameNodeResourceChecker、激活BlockManager等
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
// 非NamenodeRole.NAMENODE的角色在此处启动HttpServer
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
// 启动RPCServer
rpcServer.start();
// 启动各插件
try {
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
} catch (RuntimeException e) {
String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
pluginsValue, e);
throw e;
}
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
FSNamesystem#startCommonServices
方法用于启动对主备状态都通用的服务:
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
// 创建NameNodeResourceChecker(资源检查线程),并立即检查一次
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert !blockManager.isPopulatingReplQueues();
// 设置一些启动过程中的信息
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);
// 激活blockManager: 保存与存储在Hadoop集群中的块相关的信息。
blockManager.activate(conf, completeBlocksTotal);
} finally {
writeUnlock("startCommonServices");
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = (serviceAddress != null) ?
serviceAddress.getHostName() : "";
}
BlockManager#activate
public void activate(Configuration conf, long blockTotal) {
// 启动PendingReplicationBlocks,这个类主要是对数据块进行一些记账工作。类似于Block可能存放在那个Datanode上这种。
pendingReconstruction.start();
// 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager-- Monitor
datanodeManager.activate(conf);
// 启动redundancyThread, 大致作用是:
// 计算可以在数据节点上调度的块复制和块失效工作。datanode将在下一个心跳时被告知这项工作
// 如果有任何重构请求超时,获取它们并将它们放回需要的重构队列中
// 重新扫描之前推迟的块列表
this.redundancyThread.setName("RedundancyMonitor");
this.redundancyThread.start();
// 启动storageInfoDefragmenterThread
// 它监视StorageInfo TreeSet的碎片,并在它低于某个阈值时压缩它。
storageInfoDefragmenterThread.setName("StorageInfoMonitor");
storageInfoDefragmenterThread.start();
//块汇报线程穹顶(心跳检测机制)
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
// 初始化安全模式
bmSafeMode.activate(blockTotal);
}
namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注
与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中,
RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。
posted @ 2021-07-09 14:49 坐井 阅读(0) 评论(0) 编辑 收藏 举报 刷新评论刷新页面返回顶部 Copyright © 2021 坐井Powered by .NET 5.0 on Kubernetes