bl双性强迫侵犯h_国产在线观看人成激情视频_蜜芽188_被诱拐的少孩全彩啪啪漫画

第4課:SparkStreaming的Exactly-One的事務(wù)處理-創(chuàng)新互聯(lián)

Spark Streaming的事務(wù)處理和關(guān)系型數(shù)據(jù)庫(kù)的事務(wù)的概念有所不同,關(guān)系型數(shù)據(jù)庫(kù)事務(wù)關(guān)注的是語(yǔ)句級(jí)別的一致性,例如銀行轉(zhuǎn)賬。而Spark Streaming的事務(wù)關(guān)注的是某次job執(zhí)行的一致性。也就是如何保證Job在處理數(shù)據(jù)的過(guò)程中做到如下兩點(diǎn):

創(chuàng)新互聯(lián)專(zhuān)注于企業(yè)營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、網(wǎng)站重做改版、壽光網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站、電子商務(wù)商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性?xún)r(jià)比高,為壽光等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。
  • 不丟失數(shù)據(jù)

  • 不重復(fù)處理數(shù)據(jù)

SparkStreaming程序執(zhí)行架構(gòu)大致如下:

第4課:Spark Streaming的Exactly-One的事務(wù)處理

一、我們先來(lái)說(shuō)說(shuō)丟失數(shù)據(jù)的情況:

  1. Receiver接收到數(shù)據(jù)后,首先會(huì)在Executor級(jí)別上保存數(shù)據(jù)(根據(jù)StorageLevel的設(shè)置),例如socketTextStream的Receiver。在內(nèi)存和磁盤(pán)上保留2份副本數(shù)據(jù)

def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

如果StorageLevel設(shè)置的是只進(jìn)行內(nèi)存級(jí)別的存儲(chǔ),那么當(dāng)程序崩潰后,即便對(duì)Driver進(jìn)行了Checkpoint,然后重新啟動(dòng)程序。該部分?jǐn)?shù)據(jù)也會(huì)丟失。因?yàn)镈river的Checkpoint并不對(duì)計(jì)算數(shù)據(jù)進(jìn)行保存。

我們假設(shè)StorageLevel設(shè)置了磁盤(pán)級(jí)別的存儲(chǔ),也不能完全保證數(shù)據(jù)不被丟失,因?yàn)镽eceiver并不是接收一條數(shù)據(jù)寫(xiě)一次磁盤(pán),而是按照數(shù)據(jù)塊為單位寫(xiě)數(shù)據(jù)。然后將數(shù)據(jù)塊的元數(shù)據(jù)信息發(fā)送給Driver,Driver的Checkpoint記錄的數(shù)Block的元數(shù)據(jù)信息。當(dāng)數(shù)據(jù)塊寫(xiě)到一半的時(shí)候,或者是元數(shù)據(jù)還沒(méi)有發(fā)送給Driver的時(shí)候,Executor崩潰了,數(shù)據(jù)也就丟失啦。

解決方案:為了減少這種情況的發(fā)送,可以在Receiver端引入WAL寫(xiě)機(jī)制,因?yàn)閃AL寫(xiě)的頻率要比數(shù)據(jù)塊的頻率高的多。這樣,當(dāng)Executor恢復(fù)的時(shí)候,可以讀取WAL日志恢復(fù)數(shù)據(jù)塊。

但是通過(guò)WAL方式會(huì)極大的損傷Spark Streaming中Receivers接受數(shù)據(jù)的性能;

WAL也不能完全的解決數(shù)據(jù)丟失的問(wèn)題,就像Oracle一樣,日志文件的寫(xiě),也是先寫(xiě)到內(nèi)存中,然后根據(jù)一定的觸發(fā)條件再將數(shù)據(jù)寫(xiě)到磁盤(pán)。如果還沒(méi)有來(lái)的及寫(xiě)WAL日志,此時(shí)數(shù)據(jù)也會(huì)有不一致的情況(數(shù)據(jù)已經(jīng)接收,但是還沒(méi)有寫(xiě)到WAL的這部分?jǐn)?shù)據(jù)是恢復(fù)不出來(lái)的。)。

Spark Streaming 1.3的時(shí)候?yàn)榱吮苊釽AL的性能損失和實(shí)現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲(chǔ)系統(tǒng)?。。〈藭r(shí)兼具有流的優(yōu)勢(shì)和文件系統(tǒng)的優(yōu)勢(shì),至此,Spark Streaming+Kafka就構(gòu)建了完美的流處理世界?。。∷械腅xecutors通過(guò)Kafka API直接消費(fèi)數(shù)據(jù),直接管理Offset,所以也不會(huì)重復(fù)消費(fèi)數(shù)據(jù);事務(wù)實(shí)現(xiàn)啦!??!

2. Driver崩潰,此時(shí)Job正在處理的數(shù)據(jù),包括Receiver已經(jīng)接收到還未被處理的數(shù)據(jù)將全部丟失。

解決方案:對(duì)Driver進(jìn)行Checkpoint,此處的Checkpoint和RDD的Checkpoint并不一樣。

我們看看Checkpoint都包含哪些屬性:

private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
  val sparkConfPairs = ssc.conf.getAll

其中g(shù)raph是DStreamGraph的實(shí)例化,它里面包含了InputDStream

private val inputStreams = new ArrayBuffer[InputDStream[_]]()

我們以DirectKafkaInputDStream為例,其中包含了checkpointData

protected[streaming] override val checkpointData =
  new DirectKafkaInputDStreamCheckpointData

其中只是包含:

class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
  def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
    data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
  }

就是每個(gè)batch 的唯一標(biāo)識(shí) time 對(duì)象,以及每個(gè)KafkaRDD對(duì)應(yīng)的的Kafka偏移信息。

所以:

 checkpoint 是非常高效的。沒(méi)有涉及到實(shí)際數(shù)據(jù)的存儲(chǔ)。一般大小只有幾十K,因?yàn)橹淮媪薑afka的偏移量等信息。

 checkpoint 采用的是序列化機(jī)制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函數(shù)應(yīng)該也會(huì)被序列化。如果采用了CheckPoint機(jī)制,而你的程序包做了做了變更,恢復(fù)后可能會(huì)有一定的問(wèn)題。

二、關(guān)于數(shù)據(jù)重復(fù)處理涉及兩個(gè)方面:

  1. 數(shù)據(jù)被重復(fù)讀?。涸谑褂肒afka的情況下,Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒(méi)有來(lái)得及進(jìn)行updateOffsets,此時(shí)Receiver崩潰后重新啟動(dòng)就會(huì)通過(guò)管理Kafka的ZooKeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時(shí)SparkStreaming認(rèn)為是成功的,但是Kafka認(rèn)為是失敗的(因?yàn)闆](méi)有更新offset到ZooKeeper中),此時(shí)就會(huì)導(dǎo)致數(shù)據(jù)重新消費(fèi)的情況。

  2. 數(shù)據(jù)輸出多次重寫(xiě)

    為什么會(huì)有這個(gè)問(wèn)題,因?yàn)镾park Streaming在計(jì)算的時(shí)候基于Spark Core,Spark Core天生會(huì)做以下事情導(dǎo)致Spark Streaming的部分結(jié)果重復(fù)輸出(例如數(shù)據(jù)輸出后,該Task的后續(xù)程序發(fā)生錯(cuò)誤,而任務(wù)發(fā)生錯(cuò)誤,Spark Core會(huì)進(jìn)入如下程序):

    Task重試;慢任務(wù)推測(cè)(兩個(gè)相同任務(wù)可能會(huì)同時(shí)執(zhí)行),Stage重復(fù);Job重試;

 具體解決方案:

設(shè)置spark.task.maxFailures次數(shù)為1;

設(shè)置spark.speculation為關(guān)閉狀態(tài)(因?yàn)槁蝿?wù)推測(cè)其實(shí)非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能)

Spark Streaming on Kafka的話(huà),Job失敗后可以設(shè)置auto.offset.reset為“l(fā)argest”的方式;

Exactly Once的事務(wù)處理必須滿(mǎn)足:

  1. Receiver數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來(lái)源和可靠的Receiver,且通過(guò)WAL來(lái)保證數(shù)據(jù)安全。

  2. 整個(gè)應(yīng)用程序的metadata必須進(jìn)行checkpoint;

最后再次強(qiáng)調(diào)可以通過(guò)transform和foreachRDD基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來(lái)實(shí)現(xiàn)數(shù)據(jù)不重復(fù)消費(fèi)和輸出不重復(fù)!這兩個(gè)方式類(lèi)似于Spark Streaming的后門(mén),可以做任意想象的控制操作!

備注:

1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線(xiàn),公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性?xún)r(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿(mǎn)足用戶(hù)豐富、多元化的應(yīng)用場(chǎng)景需求。

網(wǎng)站欄目:第4課:SparkStreaming的Exactly-One的事務(wù)處理-創(chuàng)新互聯(lián)
文章源于:http://vcdvsql.cn/article14/ccsode.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、網(wǎng)站制作微信小程序、用戶(hù)體驗(yàn)、服務(wù)器托管

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名