這篇文章主要介紹Spark-submit腳本有什么用,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:主機域名、網(wǎng)站空間、營銷軟件、網(wǎng)站建設(shè)、鹿寨網(wǎng)站維護、網(wǎng)站推廣。
spark程序的提交是通過spark-submit腳本實現(xiàn)的,我們從它開始一步步解開spark提交集群的步驟。
spark-submit的主要命令行:exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
是執(zhí)行spark-class腳本,并將spark.deploy.SparkSubmit類作為第一個參數(shù)。
1、 spark-class
最關(guān)鍵的就是下面這句了:
CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") exec "${CMD[@]}"
首先循環(huán)讀取ARG參數(shù),加入到CMD中。然后執(zhí)行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@
這個是真正執(zhí)行的第一個spark的類。
該類在launcher模塊下,簡單的瀏覽下代碼:
public static void main(String[] argsArray) throws Exception { ... List<String> args = new ArrayList<String>(Arrays.asList(argsArray)); String className = args.remove(0); ... //創(chuàng)建命令解析器 AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { ... } } else { builder = new SparkClassCommandBuilder(className, args); } List<String> cmd = builder.buildCommand(env);//解析器解析參數(shù) ... //返回有效的參數(shù) if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }
launcher.Main
返回的數(shù)據(jù)存儲到CMD中。
然后執(zhí)行命令:
exec "${CMD[@]}"
這里開始真正執(zhí)行某個Spark的類。
2、 deploy.SparkSubmit類
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(args, uninitLog) } }) } catch { 。。。 } } else { runMain(args, uninitLog) } } doRunMain() }
主要是通過runMain(args,unititLog)方法來提價spark jar包。
所以必須先看看runMain方法是干什么的:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { ... } val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { new JavaMainApplication(mainClass) } try { app.start(childArgs.toArray, sparkConf) } catch { ... } }
這就很清楚了,要做的事情有以下這些:獲取類加載器,添加jar包依賴。創(chuàng)建SparkApplication類的可執(zhí)行程序或者是JavaMainApplication,創(chuàng)建出來的類叫app。最后執(zhí)行app.start方法。
SparkApplication是一個抽象類,我們就看看默認的JavaMainApplication就可以了,代碼非常簡單:
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
就是一個kclass的封裝器,用來執(zhí)行入?yún)⒌膋class的main方法。這里的kclass就是我們編寫的spark程序了,里面總有個main方法的。
以上是“Spark-submit腳本有什么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
當前題目:Spark-submit腳本有什么用
當前URL:http://vcdvsql.cn/article24/pccdce.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、定制開發(fā)、服務器托管、標簽優(yōu)化、外貿(mào)建站、Google
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)