【Hadoop】【JHS】5-JHS服务之HistoryClientService源码解析
HistoryClientService
该Service主要用于和JobClient交互。JobClient做为JHS提供给客户端的RPC接口,用于和JHS进行交互。
既然是一个service,那么先看下该service的serviceStart进而serviceInit()方法(该Service没有serviceInit方法)
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
//初始化Web Application
initializeWebApp(conf);
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
//创建一个jhs rpc服务端,并且要监听的端口以及线程数量。
//1-参数1为protocol表示协议类型
//2-参数2表示该中类型的protocol的处理器。此处为内部类:HSClientProtocolHandler
server = rpc.getServer(HSClientProtocol.class, protocolHandler, address,
conf, jhsDTSecretManager,
//mapreduce.jobhistory.client.thread-count
conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));
// Enable service authorization
if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
//重新读取hadoop-policy.xml,更新最新的服务级别权限控制。
server.refreshServiceAcl(conf, new ClientHSPolicyProvider());
}
server.start();
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
server.getListenerAddress());
LOG.info("Instantiated HistoryClientService at " + this.bindAddress);
super.serviceStart();
}
从上述代码可知HistoryClientService的start方法中主要:
1-创建了JHS RPC服务端。并且重新加载了hadoop-policy到内存;
2-初始化initializeWebApp();
创建jhs的web服务器,并且启动;
protected void initializeWebApp(Configuration conf) {
webApp = new HsWebApp(history);
setupFilters(conf);
InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
// NOTE: there should be a .at(InetSocketAddress)
WebApps
.$for("jobhistory", HistoryClientService.class, this, "ws")
.with(conf)
.withHttpSpnegoKeytabKey(
JHAdminConfig.MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.withHttpSpnegoPrincipalKey(
JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY)
.withCSRFProtection(JHAdminConfig.MR_HISTORY_CSRF_PREFIX)
.withXFSProtection(JHAdminConfig.MR_HISTORY_XFS_PREFIX)
.at(NetUtils.getHostPortString(bindAddress)).start(webApp);
String connectHost = MRWebAppUtil.getJHSWebappURLWithoutScheme(conf);
MRWebAppUtil.setJHSWebappURLWithoutScheme(conf,
HostAndPort.fromParts(HostAndPort.fromString(connectHost).getHostText(),
webApp.getListenerAddress().getPort()).toString());
}
内部类:HistoryClientService.HSClientProtocolHandler为HSClientProtocol类型的RPC请求的处理器。
也就是说所有的JobClient过来的RPC请求都可以经过这个处理进行处理的。
下面简要介绍下这个protocol的RPC处理器有哪些方法:
//获取Job的Counter
getCounters(GetCountersRequest request)
//获取Job的运行状态,例如是否完成。
getJobReport(GetJobReportRequest request)
//获取任务的运行状态
getTaskReport(GetTaskReportRequest request)
//获取任务完成事件信息,例如任务是接收到了哪种事件然后完成。可能为正常完成或者异常结束。
getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request)
killJob(KillJobRequest request)
killTask(KillTaskRequest request)
killTaskAttempt(KillTaskAttemptRequest request)
getDiagnostics(GetDiagnosticsRequest request)
getTaskReports(GetTaskReportsRequest request)
- 点赞
- 收藏
- 关注作者
评论(0)