小编给大家分享一下Hadoop如何实现job提交,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
从如下地方开始,就要进行job的提交了
boolean isSuccess = job.waitForCompletion(true);
之后,进入Job类的waitForCompletion方法。
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } //---- return isSuccessful(); }> > 这里输入引用文本
public void submit() throws IOException, InterruptedException, ClassNotFoundException { connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); }
connect方法负责初始化集群信息:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
集群信息cluster,包括什么,应该很清晰:
private ClientProtocolProvider clientProtocolProvider; private ClientProtocol client;
private UserGroupinformation ugi; private Configuration conf; private FileSystem fs = null;
private Path sysDir = null; private Path stagingAreaDir = null; private Path jobHistoryDir = null;
略微分析下, ClientProtocolProvider是客户端协议的生产者,对应的客户端是ClientProtocol。
ClientProtocolProvider规定了2个方法:
create
close 分别也用来创建和关闭客户端ClientProtocol。
而,ClientProtocolProvider的具体实现类有2个。
可以看到,有两个协议生产者,分别是yarn和local的。
那么,对应的客户端ClientProtocol,也会有两个。
ClientProtocol是个接口,里面规定了如下几个方法:
那么,不同的客户端yarn或者local,实现其中的方法即可。 因为,我们是本地Eclipse运行,直接看local即可,yarn的原理差不多,
OK,经过connect方法之后,cluster中这几个就有啦,即使没有的话,get的时候,也会初始化的。
之后, 使用集群的,FileSystem和client创建一个submiter。
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
然后,调用submitter 的submitJobInternal方法提交作业,OK,进入submitJobInternal方法。
JobSubmiter类的submitJobInternal方法大致过程如下:
//获得staging路径,注意:集群cluster中有这个路径的名称,只不过这里需要创建路径。 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//在staging路径下创建一个以jobid为标示的文件夹 JobID jobId = submitClient.getNewJobID(); Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//job需要的一些文件和jar包之类的,都放到刚才的那个submitJobDir路径下 copyAndConfigureFiles(job, submitJobDir);
具体的东西包括:
String files = conf.get("tmpfiles"); String libjars = conf.get("tmpjars"); String archives = conf.get("tmparchives");
//写入job输入的分片信息 int maps = writeSplits(job, submitJobDir);
split信息包括两个部分。 首先调用Inputformat获得分片的个数,具体如何获得,后续讲。 将返回的分片数组逐个遍历并持久化到一个文件。
SplitMetaInfo[] info = writeNewSplits(conf, splits, out); 而writeNewSplits代码主要就是写分片信息到文件中。
之后,将split的分片信息持久化一个元数据文件。 writeJobSplitMetaInfo方法。
//将job的描述信息,写到一个job.xml放到相应的staging目录下的jobid目录。 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); writeConf(conf, submitJobFile);
FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { conf.writeXml(out); } finally { out.close(); }
//提交作业 status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
OK,提交作业部分的代码就到这,后续写写,app master运行的过程。
总结,提交作业的主要功能。
这些东西都放到hdfs,作为所有节点共享访问的地方。之后,app master会访问这个目录,copy job的配置文件到本地并创建job对象,并根据split的信息,创建对应的maptaskrunable。运行。
但是,总的job信息依然在hdfs上。
以上是“Hadoop如何实现job提交”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程之家行业资讯频道!