【Hadoop】【JHS】5-JHS服务之HistoryClientService源码解析

举报
沙漠里的果果酱 发表于 2023/08/09 17:33:17 2023/08/09
【摘要】 【Hadoop】【JHS】5-JHS服务之HistoryClientService源码解析

HistoryClientService
该Service主要用于和JobClient交互。JobClient做为JHS提供给客户端的RPC接口,用于和JHS进行交互。
既然是一个service,那么先看下该service的serviceStart进而serviceInit()方法(该Service没有serviceInit方法)

protected void serviceStart() throws Exception {
  Configuration conf = getConfig();
  YarnRPC rpc = YarnRPC.create(conf);
  //初始化Web Application
  initializeWebApp(conf);
  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.MR_HISTORY_BIND_HOST,
      JHAdminConfig.MR_HISTORY_ADDRESS,
      JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
      JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
  //创建一个jhs rpc服务端,并且要监听的端口以及线程数量。
  //1-参数1为protocol表示协议类型
  //2-参数2表示该中类型的protocol的处理器。此处为内部类:HSClientProtocolHandler
  server = rpc.getServer(HSClientProtocol.class, protocolHandler, address,
          conf, jhsDTSecretManager,
          //mapreduce.jobhistory.client.thread-count
          conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
              JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));

  // Enable service authorization
  if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
    //重新读取hadoop-policy.xml,更新最新的服务级别权限控制。
    server.refreshServiceAcl(conf, new ClientHSPolicyProvider());
  }
  
  server.start();
  this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST,
                                            JHAdminConfig.MR_HISTORY_ADDRESS,
                                            JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
                                            server.getListenerAddress());
  LOG.info("Instantiated HistoryClientService at " + this.bindAddress);

  super.serviceStart();
}

从上述代码可知HistoryClientService的start方法中主要:
1-创建了JHS RPC服务端。并且重新加载了hadoop-policy到内存;
2-初始化initializeWebApp();
创建jhs的web服务器,并且启动;

protected void initializeWebApp(Configuration conf) {
  webApp = new HsWebApp(history);
  
  setupFilters(conf);
  
  InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
  // NOTE: there should be a .at(InetSocketAddress)
  WebApps
      .$for("jobhistory", HistoryClientService.class, this, "ws")
      .with(conf)
      .withHttpSpnegoKeytabKey(
          JHAdminConfig.MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
      .withHttpSpnegoPrincipalKey(
          JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY)
      .withCSRFProtection(JHAdminConfig.MR_HISTORY_CSRF_PREFIX)
      .withXFSProtection(JHAdminConfig.MR_HISTORY_XFS_PREFIX)
      .at(NetUtils.getHostPortString(bindAddress)).start(webApp);
  
  String connectHost = MRWebAppUtil.getJHSWebappURLWithoutScheme(conf);

  MRWebAppUtil.setJHSWebappURLWithoutScheme(conf,
      HostAndPort.fromParts(HostAndPort.fromString(connectHost).getHostText(),
          webApp.getListenerAddress().getPort()).toString());
}

内部类:HistoryClientService.HSClientProtocolHandler为HSClientProtocol类型的RPC请求的处理器。
也就是说所有的JobClient过来的RPC请求都可以经过这个处理进行处理的。
下面简要介绍下这个protocol的RPC处理器有哪些方法:
//获取Job的Counter
getCounters(GetCountersRequest request)
//获取Job的运行状态,例如是否完成。
getJobReport(GetJobReportRequest request)
//获取任务的运行状态
getTaskReport(GetTaskReportRequest request)
//获取任务完成事件信息,例如任务是接收到了哪种事件然后完成。可能为正常完成或者异常结束。
getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request)
killJob(KillJobRequest request)
killTask(KillTaskRequest request)
killTaskAttempt(KillTaskAttemptRequest request)
getDiagnostics(GetDiagnosticsRequest request)
getTaskReports(GetTaskReportsRequest request)


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。