這篇文章主要介紹“Hadoo是怎么將作業(yè)提交給集群的”,在日常操作中,相信很多人在Hadoo是怎么將作業(yè)提交給集群的問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Hadoo是怎么將作業(yè)提交給集群的”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
專注于為中小企業(yè)提供做網(wǎng)站、網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)壽縣免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了近1000家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
通過(guò)圖可知主要有三個(gè)部分,即: 1) JobClient:作業(yè)客戶端。 2) JobTracker:作業(yè)的跟蹤器。 3) TaskTracker:任務(wù)的跟蹤器。
MapReduce將作業(yè)提交給JobClient,然后JobClient與JobTracker交互,JobTracker再去監(jiān)控與分配TaskTracker,完成具體作業(yè)的處理。
以下分析的是Hadoop2.6.4的源碼。請(qǐng)注意: 源碼與之前Hadoop版本的略有差別,所以有些概念還是與上圖有點(diǎn)差別。
**job.waitForCompletion(true)**
跟蹤waitForCompletion, 注意其中的submit(),如下:
/** * Submit the job to the cluster and wait for it to finish. */ public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
參數(shù) verbose ,如果想在控制臺(tái)打印當(dāng)前的任務(wù)執(zhí)行進(jìn)度,則設(shè)為true
**
** 在submit 方法中會(huì)把Job提交給對(duì)應(yīng)的Cluster,然后不等待Job執(zhí)行結(jié)束就立刻返回
同時(shí)會(huì)把Job實(shí)例的狀態(tài)設(shè)置為JobState.RUNNING,從而來(lái)表示Job正在進(jìn)行中
然后在Job運(yùn)行過(guò)程中,可以調(diào)用getJobState()來(lái)獲取Job的運(yùn)行狀態(tài)
/** * Submit the job to the cluster and return immediately. */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
而在任務(wù)提交前,會(huì)先通過(guò)connect()方法鏈接集群(Cluster):
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
這是一個(gè)線程保護(hù)方法。這個(gè)方法中根據(jù)配置信息初始化了一個(gè)Cluster對(duì)象,即代表集群
public Cluster(Configuration conf) throws IOException { this(null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); } private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
而在上段代碼之前,
private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class);
可以看出創(chuàng)建客戶端代理階段使用了java.util.ServiceLoader,包含LocalClientProtocolProvider(本地作業(yè))和YarnClientProtocolProvider(yarn作業(yè))(hadoop有一個(gè)Yarn參數(shù)mapreduce.framework.name用來(lái)控制你選擇的應(yīng)用框架。在MRv2里,mapreduce.framework.name有兩個(gè)值:local和yarn),此處會(huì)根據(jù)mapreduce.framework.name的配置創(chuàng)建相應(yīng)的客戶端
mapred-site.xml:
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
submitter.submitJobInternal(Job.this, cluster);
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { int keyLen = CryptoUtils.isShuffleEncrypted(conf) ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
通過(guò)如下代碼正式提交Job到Y(jié)arn:
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
到最后,通過(guò)RPC的調(diào)用,最終會(huì)返回一個(gè)JobStatus對(duì)象,它的toString方法可以在JobClient端打印運(yùn)行的相關(guān)日志信息。
if (status != null) { return status; }
public String toString() { StringBuffer buffer = new StringBuffer(); buffer.append("job-id : " + jobid); buffer.append("uber-mode : " + isUber); buffer.append("map-progress : " + mapProgress); buffer.append("reduce-progress : " + reduceProgress); buffer.append("cleanup-progress : " + cleanupProgress); buffer.append("setup-progress : " + setupProgress); buffer.append("runstate : " + runState); buffer.append("start-time : " + startTime); buffer.append("user-name : " + user); buffer.append("priority : " + priority); buffer.append("scheduling-info : " + schedulingInfo); buffer.append("num-used-slots" + numUsedSlots); buffer.append("num-reserved-slots" + numReservedSlots); buffer.append("used-mem" + usedMem); buffer.append("reserved-mem" + reservedMem); buffer.append("needed-mem" + neededMem); return buffer.toString(); }
(到這里任務(wù)都給yarn了,這里就只剩下監(jiān)控(如果設(shè)置為true的話)),即:
if (verbose) { monitorAndPrintJob(); }
這只是完成了作業(yè)Job的提交。
到此,關(guān)于“Hadoo是怎么將作業(yè)提交給集群的”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
文章名稱:Hadoo是怎么將作業(yè)提交給集群的
轉(zhuǎn)載注明:http://vcdvsql.cn/article38/peshpp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、網(wǎng)站導(dǎo)航、響應(yīng)式網(wǎng)站、微信公眾號(hào)、App開(kāi)發(fā)、網(wǎng)站內(nèi)鏈
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)