這篇文章將為大家詳細(xì)講解有關(guān)如何剖析具體實(shí)現(xiàn),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的平樂(lè)網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
一、概述
這里我們從源碼角度剖析BypassMergeSortShuffleWriter實(shí)現(xiàn)策略的原理和具體的實(shí)現(xiàn)細(xì)節(jié)。
BypassMergeSortShuffleWriter具體的實(shí)現(xiàn)都在對(duì)應(yīng)類的write()函數(shù)中,我們直接看源碼進(jìn)行剖析
1.先看構(gòu)造函數(shù)初始化
BypassMergeSortShuffleWriter( BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, BypassMergeSortShuffleHandle<K, V> handle, int mapId, TaskContext taskContext, SparkConf conf) { // 獲取spark.shuffle.file.buffer參數(shù)值,默認(rèn)32k,這里是一個(gè)比較重要的條有參數(shù), // 該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。 // 將數(shù)據(jù)寫(xiě)到磁盤文件之前,會(huì)先寫(xiě)入buffer緩沖中,待緩沖寫(xiě)滿之后,才會(huì)溢寫(xiě)到磁盤 //如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個(gè)參數(shù)的大小(比如64k),從而減少shuffle write過(guò)程中溢寫(xiě)磁盤文件的次數(shù), // 也就可以減少磁盤IO次數(shù),進(jìn)而提升性能 this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; // 是否采用NIO的從文件到文件流的復(fù)制方式,默認(rèn)值是true 一般不用修改 this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; // 獲取shufflehandle中的ShuffleDependency對(duì)象,通過(guò)該對(duì)象得到分區(qū)器和分區(qū)個(gè)數(shù)等數(shù)據(jù)。 final ShuffleDependency<K, V, V> dep = handle.dependency(); this.mapId = mapId; this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); //設(shè)置序列化工具對(duì)象,和shuffleBlockResolver對(duì)象, // 該對(duì)象用來(lái)創(chuàng)建和維護(hù)shuffle的數(shù)據(jù)的邏輯塊和物理文件位置之間的映射的對(duì)象 this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; }
2.再看write()函數(shù),源碼如下:
//這里大體意思是 為每個(gè)分區(qū)在磁盤創(chuàng)建臨時(shí)文件 并給每一個(gè)writer
上面代碼的大體思路如下:
a.確定分區(qū)數(shù),然后為每個(gè)分區(qū)創(chuàng)建DiskBlockObjectWriter和臨時(shí)文件
b.循環(huán)將record通過(guò)Partitioner進(jìn)行分區(qū),并寫(xiě)入對(duì)應(yīng)分區(qū)臨時(shí)文件
c. 將分區(qū)數(shù)據(jù)刷到磁盤
d.根據(jù)shuffleId和mapId,構(gòu)建ShuffleDataBlockId,創(chuàng)建合并文件data和合并文件的臨時(shí)文件,文件格式為:
shuffle_{shuffleId}_{mapId}_{reduceId}.data
e.將分區(qū)文件合并到一個(gè)總的臨時(shí)文件,合并后會(huì)重命名為最終輸出文件名,并返回一個(gè)對(duì)應(yīng)分區(qū)文件長(zhǎng)度的數(shù)組
f.創(chuàng)建索引文件index和索引臨時(shí)文件,每一個(gè)分區(qū)的長(zhǎng)度和offset寫(xiě)入索引文件等;并且重命名臨時(shí)data文件和臨時(shí)index文件
g.將一些信息封裝到MapStatus返回
存在問(wèn)題:
這種Writer會(huì)為每個(gè)分區(qū)創(chuàng)建一個(gè)臨時(shí)文件,如果分區(qū)過(guò)多時(shí),會(huì)創(chuàng)建很多的output輸出流和臨時(shí)文件對(duì)象,占用資源過(guò)多,性能會(huì)下降。
重點(diǎn)關(guān)注:
參數(shù):spark.shuffle.file.buffer 默認(rèn)值32k
默認(rèn)情況下,shuffle的map task,輸出到磁盤文件的時(shí)候,統(tǒng)一都會(huì)先寫(xiě)入到每個(gè)task自己關(guān)聯(lián)的一個(gè)內(nèi)存緩沖區(qū),每一次當(dāng)內(nèi)存緩沖區(qū)滿溢后,然后才會(huì)進(jìn)行溢寫(xiě)到磁盤中。如果內(nèi)存沖突可適當(dāng)調(diào)大這個(gè)參數(shù),從而減少shuffle write過(guò)程中溢寫(xiě)磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會(huì)有1%~5%的提升。
關(guān)于如何剖析具體實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
文章題目:如何剖析具體實(shí)現(xiàn)
本文路徑:http://vcdvsql.cn/article10/iiphdo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開(kāi)發(fā)、微信公眾號(hào)、品牌網(wǎng)站制作、網(wǎng)站內(nèi)鏈、靜態(tài)網(wǎng)站、軟件開(kāi)發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)