【Hadoop】【Yarn】NodeManager启动源码分析
【摘要】 【Hadoop】【Yarn】NodeManager启动源码分析
NodeManager.main()
---NodeManager.initAndStartNodeManager()
------NodeManager.init()
------NodeManager.start()
NodeManager作为一个service,自然少不了所有Service生命周期的两个过程:
1-AbstractService.init()---> NodeManager.serviceInit()
2-AbstractService.start()--->NodeManager.serviceStart()
下面详细分析两个过程:
【初始化过程】
protected void serviceInit(Configuration conf) throws Exception {
UserGroupInformation.setConfiguration(conf);
1-所有保留恢复功能
rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
//启动作业恢复功能,主要就是创建LevelDB数据库连接;
initAndStartRecoveryStore(conf);
//管理container的token以及nm的token。所有的TokenSecretManager都负责token的生成和校验;
NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf, nmStore);
NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(nmStore);
//从leveldb中尝试恢复token(只有container恢复特性打开的时候才需要)
recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
this.dirsHandler = new LocalDirsHandlerService(metrics);
//分布式调度能力(是机会调度的基础)
boolean isDistSchedulingEnabled = conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
ResourcePluginManager pluginManager = createResourcePluginManager();
pluginManager.initialize(context);
((NMContext)context).setResourcePluginManager(pluginManager);
//初始化LinuxContainerExecutor,用来启动container
ContainerExecutor exec = createContainerExecutor(conf);
exec.init(context);
//所有的删除线程都被会封装成Task线程,交给DeletionService进行处理和管理;
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager时间处理器
this.dispatcher = createNMDispatcher();
// 健康检查服务
this.nodeHealthChecker = new NodeHealthCheckerService(dirsHandler);
addService(nodeHealthChecker);
((NMContext)context).setContainerExecutor(exec);
((NMContext)context).setDeletionService(del);
// 节点状态更新线程
nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
//同步心跳中来自RM的黑名单信息
if (YarnConfiguration.isNodeBlacklistingEnabled(conf)) {
BlacklistManager blacklistManager = new BlacklistManager(conf);
((NMContext)context).setBlacklistManager(blacklistManager);
addService(blacklistManager);
}
// 动态刷新nodelabel信息并且上报到RM,有两种方式实现1-脚本;2-配置
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (nodeLabelsProvider != null) {
addIfService(nodeLabelsProvider);
nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
}
// 动态刷新节点属性信息并且上报给RM,同样有两种方式
nodeAttributesProvider = createNodeAttributesProvider(conf);
if (nodeAttributesProvider != null) {
addIfService(nodeAttributesProvider);
nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}
//周期性监控NodeManager节点资源信息,并且将资源信息上报给RM
nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
//底层实现是ContainerManagerImpl是nodemanger里面container的状态机,Yarn里面所有ManagerImpl结尾的都是状态机
containerManager = createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
//缓存application的日志归集状态;
this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
context);
addService(nmLogAggregationStatusTracker);
((NMContext)context).setNMLogAggregationStatusTracker(
this.nmLogAggregationStatusTracker);
//nodemanager web服务器
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
int maxAllocationsPerAMHeartbeat = conf.getInt(
YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
YarnConfiguration.DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
((NMContext) context).setQueueableContainerAllocator(new DistributedOpportunisticContainerAllocator(
context.getContainerTokenSecretManager(),maxAllocationsPerAMHeartbeat));
//注册两种状态机到事件中央处理器中
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
DefaultMetricsSystem.initialize("NodeManager");
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
this.nmCollectorService = createNMCollectorService(context);
addService(nmCollectorService);
}
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
nmStore.setNodeStatusUpdater(nodeStatusUpdater);
// Do secure login before calling init for added services.
doSecureLogin();
registerMXBean();
context.getContainerExecutor().start();
//上面只是将各个service添加进来了,这一步才是真正的初始化动作。分别会去执行上述service的serviceInit方法;
super.serviceInit(conf);
}
【启动过程】
AbstractService.start()
---CompositeService.serviceStart()
protected void serviceStart() throws Exception {
List<Service> services = getServices();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": starting services, size=" + services.size());
}
for (Service service : services) {
// start the service. If this fails that service
// will be stopped and an exception raised
service.start();
}
super.serviceStart();
}
依次将初始化阶段添加打service全部启动;
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)