好程序員大數(shù)據(jù)分享Spark任務(wù)和集群啟動流程,Spark集群啟動流程
從網(wǎng)站建設(shè)到定制行業(yè)解決方案,為提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作服務(wù)體系,各種行業(yè)企業(yè)客戶提供網(wǎng)站建設(shè)解決方案,助力業(yè)務(wù)快速發(fā)展。創(chuàng)新互聯(lián)建站將不斷加快創(chuàng)新步伐,提供優(yōu)質(zhì)的建站服務(wù)。
1.調(diào)用start-all.sh腳本,開始啟動Master
2.Master啟動以后,preStart方法調(diào)用了一個定時器,定時檢查超時的Worker后刪除
3.啟動腳本會解析slaves配置文件,找到啟動Worker的相應(yīng)節(jié)點(diǎn).開始啟動Worker
4.Worker服務(wù)啟動后開始調(diào)用preStart方法開始向所有的Master進(jìn)行注冊
5.Master接收到Worker發(fā)送過來的注冊信息,Master開始保存注冊信息并把自己的URL響應(yīng)給Worker
6.Worker接收到Master的URL后并更新,開始調(diào)用一個定時器,定時的向Master發(fā)送心跳信息
?
任務(wù)提交流程
1.Driver端會通過spark-submit腳本啟動SaparkSubmit進(jìn)程,此時創(chuàng)建了一個非常重要的對象(SparkContext),開始向Master發(fā)送消息
2.Master接收到發(fā)送過來的信息后開始生成任務(wù)信息,并把任務(wù)信息放到一個對列里
3.Master把所有有效的Worker過濾出來,按照空閑的資源進(jìn)行排序
4.Master開始向有效的Worker通知拿取任務(wù)信息并啟動相應(yīng)的Executor
5.Worker啟動Executor并向Driver反向注冊
6.Driver開始把生成的task發(fā)送給相應(yīng)的Executor,Executor開始執(zhí)行任務(wù)
?
集群啟動流程
1.首先創(chuàng)建Master類
import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} ? import scala.collection.mutable import scala.concurrent.duration._ ? class Master(val masterHost: String, val masterPort: Int) extends Actor{ ? ??// 用來存儲Worker的注冊信息 ??val idToWorker = new mutable.HashMap[String, WorkerInfo]() ? ??// 用來存儲Worker的信息 ??val workers = new mutable.HashSet[WorkerInfo]() ? ??// Worker的超時時間間隔 ??val checkInterval: Long = 15000 ? ? ??// 生命周期方法,在構(gòu)造器之后,receive方法之前只調(diào)用一次 ??override def preStart(): Unit = { ????// 啟動一個定時器,用來定時檢查超時的Worker ????import context.dispatcher ????context.system.scheduler.schedule(0 millis, checkInterval millis, self, CheckTimeOutWorker) ??} ? ??// 在preStart方法之后,不斷的重復(fù)調(diào)用 ??override def receive: Receive = { ????// Worker -> Master ????case RegisterWorker(id, host, port, memory, cores) => { ??????if (!idToWorker.contains(id)){ ????????val workerInfo = new WorkerInfo(id, host, port, memory, cores) ????????idToWorker += (id -> workerInfo) ????????workers += workerInfo ? ????????println("a worker registered") ? ????????sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" + ??????????s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") ??????} ????} ????case HeartBeat(workerId) => { ??????// 通過傳過來的workerId獲取對應(yīng)的WorkerInfo ??????val workerInfo: WorkerInfo = idToWorker(workerId) ??????// 獲取當(dāng)前時間 ??????val currentTime = System.currentTimeMillis() ??????// 更新最后一次心跳時間 ??????workerInfo.lastHeartbeatTime = currentTime ????} ????case CheckTimeOutWorker => { ??????val currentTime = System.currentTimeMillis() ??????val toRemove: mutable.HashSet[WorkerInfo] = ????????workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval) ? ??????// 將超時的Worker從idToWorker和workers中移除 ??????toRemove.foreach(deadWorker => { ????????idToWorker -= deadWorker.id ????????workers -= deadWorker ??????}) ? ??????println(s"num of workers: ${workers.size}") ????} ??} } object Master{ ??val MASTER_SYSTEM = "MasterSystem" ??val MASTER_ACTOR = "Master" ? ??def main(args: Array[String]): Unit = { ????val host = args(0) ????val port = args(1).toInt ? ????val configStr = ??????s""" ?????????|akka.actor.provider = "akka.remote.RemoteActorRefProvider" ?????????|akka.remote.netty.tcp.hostname = "$host" ?????????|akka.remote.netty.tcp.port = "$port" ??????""".stripMargin ? ????// 配置創(chuàng)建Actor需要的配置信息 ????val config: Config = ConfigFactory.parseString(configStr) ? ????// 創(chuàng)建ActorSystem ????val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config) ? ????// 用actorSystem實(shí)例創(chuàng)建Actor ????actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR) ? ????actorSystem.awaitTermination() ? ??} } |
2.創(chuàng)建RemoteMsg特質(zhì)
trait RemoteMsg extends Serializable{ ? } ? // Master -> self(Master) case object CheckTimeOutWorker ? // Worker -> Master case class RegisterWorker(id: String, host: String, ??????????????????????????port: Int, memory: Int, cores: Int) extends RemoteMsg ? // Master -> Worker case class RegisteredWorker(masterUrl: String) extends RemoteMsg ? // Worker -> self case object SendHeartBeat ? // Worker -> Master(HeartBeat) case class HeartBeat(workerId: String) extends RemoteMsg |
3.創(chuàng)建Worker類
import java.util.UUID ? import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} ? import scala.concurrent.duration._ ? class Worker(val host: String, val port: Int, val masterHost: String, ?????????????val masterPort: Int, val memory: Int, val cores: Int) extends Actor{ ? ??// 生成一個Worker ID ??val workerId = UUID.randomUUID().toString ? ??// 用來存儲MasterURL ??var masterUrl: String = _ ? ??// 心跳時間間隔 ??val heartBeat_interval: Long = 10000 ? ??// master的Actor ??var master: ActorSelection = _ ? ??override def preStart(){ ????// 獲取Master的Actor ????master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" + ??????s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") ? ????master ! RegisterWorker(workerId, host, port, memory, cores) ??} ? ??override def receive: Receive = { ????// Worker接收到Master發(fā)送過來的注冊成功的信息(masterUrl) ????case RegisteredWorker(masterUrl) => { ??????this.masterUrl = masterUrl ??????// 啟動一個定時器,定時給Master發(fā)送心跳 ??????import context.dispatcher ??????context.system.scheduler.schedule(0 millis, heartBeat_interval millis, self, SendHeartBeat) ????} ????case SendHeartBeat => { ??????// 向Master發(fā)送心跳 ??????master ! HeartBeat(workerId) ????} ? ??} ? } object Worker{ ??val WORKER_SYSTEM = "WorkerSystem" ??val WORKER_ACTOR = "Worker" ? ??def main(args: Array[String]): Unit = { ????val host = args(0) ????val port = args(1).toInt ????val masterHost = args(2) ????val masterPort = args(3).toInt ????val memory = args(4).toInt ????val cores = args(5).toInt ? ????val configStr = ??????s""" ?????????|akka.actor.provider = "akka.remote.RemoteActorRefProvider" ?????????|akka.remote.netty.tcp.hostname = "$host" ?????????|akka.remote.netty.tcp.port = "$port" ??????""".stripMargin ? ????// 配置創(chuàng)建Actor需要的配置信息 ????val config: Config = ConfigFactory.parseString(configStr) ? ????// 創(chuàng)建ActorSystem ????val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config) ? ????// 用actorSystem實(shí)例創(chuàng)建Actor ????val worker: ActorRef = actorSystem.actorOf( ??????Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR) ? ????actorSystem.awaitTermination() ? ??} } |
4.創(chuàng)建初始化類
class WorkerInfo(val id: String, val host: String, val port: Int, ?????????????????val memory: Int, val cores: Int) { ? ??// 初始化最后一次心跳的時間 ??var lastHeartbeatTime: Long = _ ? } |
5.本地測試需要傳入?yún)?shù):
網(wǎng)站題目:好程序員大數(shù)據(jù)分享Spark任務(wù)和集群啟動流程
當(dāng)前地址:http://vcdvsql.cn/article26/gjcecg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設(shè)、建站公司、網(wǎng)站制作、網(wǎng)站營銷、網(wǎng)站改版、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)