這篇文章主要講解了“flink中窗口的作用是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“flink中窗口的作用是什么”吧!
創新互聯建站致力于網站設計制作、網站設計,成都網站設計,集團網站建設等服務標準化,推過標準化降低中小企業的建站的成本,并持續提升建站的定制化服務水平進行質量交付,讓企業網站從市場競爭中脫穎而出。 選擇創新互聯建站,就選擇了安全、穩定、美觀的網站建設服務!
窗口計算是流式計算中常用的數據計算方式之一,通過按照固定時間或長度將數據流切分成不同的窗口,再對數據進行相應的聚合操作,得到一定時間范圍內的統計結果,例如統計最近5分鐘內某網站的點擊數,此時,點擊數據在不斷產生,通過5分鐘窗口將數據限定在固定時間范圍內,就可以對該范圍內的有界數據執行聚合,得到最近5分鐘的網站點擊數。
代碼接口規則
stream.keyBy(...) //keyed類型數據集 .window(...) //指定窗口分配器類型 [.trigger(...)] //指定觸發器類型(可選) [.evictor(...)] //指定evictor(可選) [.allowedLateness(...)] //指定是否延遲處理數據(可選) [.sideOutputLateData(...)] //指定Output Lag(可選) .reduce/aggregate/fold/apply() //指定窗口計算函數 [.getSideOutput(...)] //根據Tag輸出數據(可選)
算子
Windows Assigner:指定窗口類型,定義如何將數據流分配到一個或多個窗口
Windows Trigger:指定窗口觸發的時機,定義窗口滿足什么樣的條件觸發計算;
Evictor:用于數據剔除
Lateness:標記是否處理遲到數據,當遲到數據到達窗口中是否觸發計算。
Output Tag:標記輸出標簽,然后通過getSideOutput將窗口中的數據根據標簽輸出。
Windows Function:定義窗口上數據處理的邏輯,例如對數據進行sum操作。
在運用窗口計算時,Flink根據上游數據集是否為KeyedStream類型(將數據集按Key分區),對應的Window Assigner會不同,
上游數據集為KeyedStream類型,則調用DataStream API的Windwo()方法指定Windows Assigner,數據將根據Key在不同的Task實例中并行分別計算,最后得出針對每個Key統計的結果。
如果是Non-Keyed類型,則調用WindowsAll()方法來指定Windows Assigner,所有數據都被窗口算子路由到一個Task中計算,并得到結果。
建議數據進行KeyedStream處理,這樣啟動并行計算,加速效率。
flink支持兩種類型的窗口,一種基于時間,窗口大小由開始和結束時間戳約束,一種基于數量,根據固定數量定義窗口大小。
根據Windows Assigner數據分配方式的不同將Windows分為4大類:滾動窗口(Tumbling Windows)、滑動窗口(Sliding Windows)、會話窗口(Session Windows)和全局窗口(Global Windows)
滾動窗口根據固定時間或大小切分,且窗口與窗口間元素互不重疊,適合于固定時間大小和周期統計某一指標的窗口計算。
DataStream API提供了基于Event Time和Process Time兩種時間類型的Tumbling窗口,對應的Assigner分別為TumblingEventTimeWindows和TumblingProcessTimeWindows,窗口大小童工of()指定,時間單位分別為Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x),也可以是不同時間單位的組合。
如下實例,窗口時間按10S進行切分,窗口的時間是[1:00:00.000-1:00:09.999] 到[1:00:10.000-1:00:19.999]的等固定時間范圍。
val inputStream:DataStream[T]= ... //定義Event Time Tumbling Windows val tumblingEventTimeWindows=inputStream.keyBy(_.id) //通過使用TumblingEventTimeWindows定義Event Time滾動窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(...) //定義窗口函數 //定義Process Time Tumbling Windows val tumblingProcessingTimeWindows = inputStream.keyBy(_.id) //通過TumblingProcessTimeWindows定義Evnet Time滾動窗口 .window(TumblingProcessTimeWindows.of(Times.seconds(10))) .process(...) //定義窗口函數
滑動窗口是一種常見的窗口類型,特點是在滾動窗口基礎上增加了窗口滑動時間(Slide Time),且允許窗口數據發生重疊。這種窗口不像滾動窗口按照Windows Size向前移動,而是根據設定的Slide Time向前滑動。窗口之間的數據重疊大小根據Windows Size和Slide time決定,當Slide Time小于Windows Size便會發生窗口重疊,Slide Size大于WindowsSize會出現窗口不連續,數據可能不會再任何一個窗口內計算。
DataStream API針對Sliding Windows根據不同時間類型Assigner,包括基于Event Time的SlidingEventTimeWindows和基于Process Time的SlidingProcessingTimeWindows。
實例如下,指定Windows Size為1h,Slide Time為10m。
val inputStream:DataStream[T]= ... //定義Event Time Sliding Windows val slidingEventTimeWindows=inputStream.keyBy(_.id) //通過使用SlidingEventTimeWindows定義Event Time滾動窗口 .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10))) .process(...) //定義窗口函數 //定義Process Time Sliding Windows val slidingProcessTimeWindows = inputStream.keyBy(_.id) //通過SlidingProcessTimeWindows定義Evnet Time滾動窗口 .window(SlidingProcessTimeWindows.of(Time.hours(1),Time.minutes(10))) .process(...) //定義窗口函數
將某個時間段內活躍較高的數據聚合為一個窗口進行計算,窗口的觸發條件為Session Gap,指規定時間內沒有數據活躍接入,則任務窗口結束,觸發窗口計算。
注意:如果數據一直不間斷,會導致窗口始終不觸發。
與滑動、滾動窗口不同,Session Windows不需要定義Windows Size和Slide Time,只需要定義session gap,規定不活躍數據的時間上線即可。
Session Windows比較適合非連續型數據處理或周期性產生數據的場景。DataStream API中可以創建基于Event Time和Process Time的Session Windows,對應的有Assigner分別為EventTimeSessionWindow和ProcessTimerSessionWindows。
實例代碼如下:
val inputStream:DataStream[T]= ... //定義Event Time Session Windows val eventTimeSessionWindows=inputStream.keyBy(_.id) //通過使用EventTimeSessionWindows定義Event Time滾動窗口 .window(EventTimeSessionWindows.withGap(Time.milliseconds(10))) .process(...) //定義窗口函數 //定義Process Time Session Windows val processTimeSessionWindows = inputStream.keyBy(_.id) //通過ProcessTimeSessionWindows定義Evnet Time滾動窗口 .window(ProcessTimeSessionWindows.withGap(Time.milliseconds(10))) .process(...) //定義窗口函數
flink支持動態調整的Session Gap,需要實現SessionWindowTimeGapExtractor接口,并復寫extract方法,完成Session Gap的抽取,然后將創建好的Session Gap抽取器傳入ProcessiongTimeSessionWindows.withDynamicGap()方法即可。
val inputStream:DataStream[T]= ... //定義Event Time Session Windows val eventTimeSessionWindows=inputStream.keyBy(_.id) //通過使用EventTimeSessionWindows定義Event Time滾動窗口 .window(EventTimeSessionWindows.withDynamicGap( //實例化SessionWindowTimeGapExtractor接口 new SessionWindowTimeGapExtractor[String]{ override def extract(element:String):Long={ //動態指定并返回Session Gap } } )) .process(...) //定義窗口函數 //定義Process Time Session Windows val processTimeSessionWindows = inputStream.keyBy(_.id) //通過ProcessTimeSessionWindows定義Evnet Time滾動窗口 .window(ProcessTimeSessionWindows.withDynamicGap( //實例化SessionWindowTimeGapExtractor接口 new SessionWindowTimeGapExtractor[String]{ override def extract(element:String):Long={ //動態指定并返回Session Gap } } )) .process(...) //定義窗口函數
全局會話窗口將所有相同的key數據分配到單個窗口中計算,窗口沒有起始和結束時間,窗口需要借助Triger觸發計算,如果不指定,則不會觸發計算。
使用全局窗口要非常謹慎,必須明確自己在整個窗口中統計出的結果是什么,并指定對應的觸發器,同時指定相應的數據清理機制,否則數據將一直留在內存中。
val inputStream:DataStream[T]= ... val globalWindows = inputStream.keyBy(_.id) .window(GlobalWindows.create()) //通過GlobalWindows定義Global Windows .process()
flink定義的四種窗口,容易和時間窗口和事件窗口混淆,他們是不同維度的的窗口定義,需要特別注意下。
越長大越孤單,珍惜好身邊人。
感謝各位的閱讀,以上就是“flink中窗口的作用是什么”的內容了,經過本文的學習后,相信大家對flink中窗口的作用是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創新互聯,小編將為大家推送更多相關知識點的文章,歡迎關注!
分享名稱:flink中窗口的作用是什么
地址分享:http://vcdvsql.cn/article44/phoshe.html
成都網站建設公司_創新互聯,為您提供網站改版、關鍵詞優化、網站維護、企業網站制作、網站導航、定制開發
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯