国产午夜男女在线|欧美日本一道高清国产|亚洲日韩乱码中文字幕|麻豆国产97在线精品一区|日韩一区2区三区另类图片|亚洲精品国产99在线观看|亚洲国产午夜福利精品大秀在线|一级做a爰片性色毛片免费网站

您當前的位置 :寧夏資訊網(wǎng) > 資訊 >  內(nèi)容正文
投稿

Flink教程:DataStream上的Join操作!

寧夏資訊網(wǎng) 2020-11-19 05:35:10 來源: 閱讀:-

批處理經(jīng)常要解決的問題是將兩個數(shù)據(jù)源做關(guān)聯(lián)Join操作。比如,很多手機APP都有一個用戶數(shù)據(jù)源User,同時APP會記錄用戶的行為,我們稱之為Behavior,兩個表按照userId來進行Join。在流處理場景下,F(xiàn)link也支持了Join,只不過Flink是在一個時間窗口上來進行兩個表的Join。

Flink教程:DataStream上的Join操作

Join示例圖

目前,F(xiàn)link支持了兩種Join:Window Join(窗口連接)和Interval Join(時間間隔連接。

Window Join

從名字中能猜到,Window Join主要在Flink的窗口上進行操作,它將兩個流中落在相同窗口的元素按照某個Key進行Join。一個Window Join的大致骨架結(jié)構(gòu)為:

input1.join(input2)    .where()      <- input1使用哪個字段作為Key    .equalTo()    <- input2使用哪個字段作為Key    .window()  <- 指定WindowAssigner    [.trigger()]      <- 指定Trigger(可選)    [.evictor()]      <- 指定Evictor(可選)    .apply()     <- 指定JoinFunction

下圖展示了Join的大致過程。兩個輸入數(shù)據(jù)流先分別按Key進行分組,然后將元素劃分到窗口中。窗口的劃分需要使用WindowAssigner來定義,這里可以使用Flink提供的滾動窗口、滑動窗口或會話窗口等默認的WindowAssigner。隨后兩個數(shù)據(jù)流中的元素會被分配到各個窗口上,也就是說一個窗口會包含來自兩個數(shù)據(jù)流的元素。相同窗口內(nèi)的數(shù)據(jù)會以INNER JOIN的語義來相互關(guān)聯(lián),形成一個數(shù)據(jù)對。當窗口的時間結(jié)束,F(xiàn)link會調(diào)用JoinFunction來對窗口內(nèi)的數(shù)據(jù)對進行處理。當然,我們也可以使用Trigger或Evictor做一些自定義優(yōu)化,他們的使用方法和普通窗口的使用方法一樣。

Flink教程:DataStream上的Join操作

Join的大致流程

接下來我們重點分析一下兩個數(shù)據(jù)流是如何INNER JOIN的:

Flink教程:DataStream上的Join操作

窗口內(nèi)數(shù)據(jù)INNER JOIN示意圖

一般滴,INNER JOIN只對兩個數(shù)據(jù)源都出現(xiàn)的元素做Join,形成一個數(shù)據(jù)對,即數(shù)據(jù)源input1中的某個元素與數(shù)據(jù)源input2中的所有元素逐個配對。當數(shù)據(jù)源某個窗口內(nèi)沒數(shù)據(jù)時,比如圖中的第三個窗口,Join的結(jié)果也是空的。

class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] {  override def join(input1: (String, Int), input2: (String, Int)): String = {    "input 1 :" + input1._2 + ", input 2 :" + input2._2  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2)      .where(i1 => i1._1)      .equalTo(i2 => i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyJoinFunction)

上面的代碼自定義了JoinFunction,并將Join結(jié)果打印出來。無論代碼中演示的滾動窗口,還是滑動窗口或會話窗口,其原理都是一樣的。除了JoinFunction,F(xiàn)link還提供了FlatJoinFunction,其功能是輸出零到多個結(jié)果。

如果INNER JOIN不能滿足我們的需求,CoGroupFunction提供了更多可自定義的功能。需要注意的是,在調(diào)用時,要寫成input1.coGroup(input2).where().equalTo()。

class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] {  // 這里的類型是Java的Iterable,需要引用 collection.JavaConverters._ 并轉(zhuǎn)成Scala  override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = {    input1.asScala.foreach(element => out.collect("input1 :" + element.toString()))    input2.asScala.foreach(element => out.collect("input2 :" + element.toString()))  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2)      .where(i1 => i1._1)      .equalTo(i2 => i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyCoGroupFunction)

Interval Join

與Window Join不同,Interval Join不依賴Flink的WindowAssigner,而是根據(jù)一個時間間隔(Interval)界定時間。Interval需要一個時間下界(lower bound)和上界(upper bound),如果我們將input1和input2進行Interval Join,input1中的某個元素為input1.element1,時間戳為input1.element1.ts,那么一個Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在這個時間段內(nèi)的元素將會和input1.element1組成一個數(shù)據(jù)對。用數(shù)學公式表達為,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN語義,兩兩組合在一起。上下界可以是正數(shù)也可以是負數(shù)。

注意,目前Flink(1.9)的Interval Join只支持Event Time語義。

Flink教程:DataStream上的Join操作

Interval Join示意圖

下面的代碼展示了如何對兩個數(shù)據(jù)流進行Interval Join:

class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] {  override def processElement(input1: (String, Long, Int),                              input2: (String, Long, Int),                              context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context,                              out: Collector[String]): Unit = {    out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString)  }}// 數(shù)據(jù)流有三個字段:(key, 時間戳, 數(shù)值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .process(new MyProcessFunction)

默認的時間間隔是包含上下界的,我們可以使用.lowerBoundExclusive() 和.upperBoundExclusive來確定是否需要包含上下界。

val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .upperBoundExclusive()      .lowerBoundExclusive()      .process(new MyProcessFunction)

Interval Join內(nèi)部是用緩存來存儲所有數(shù)據(jù)的,因此需要注意緩存數(shù)據(jù)不能太大,以免對內(nèi)存造成絕大壓力。

(正文已結(jié)束)

推薦閱讀:大股網(wǎng)

免責聲明及提醒:此文內(nèi)容為本網(wǎng)所轉(zhuǎn)載企業(yè)宣傳資訊,該相關(guān)信息僅為宣傳及傳遞更多信息之目的,不代表本網(wǎng)站觀點,文章真實性請瀏覽者慎重核實!任何投資加盟均有風險,提醒廣大民眾投資需謹慎!

網(wǎng)站簡介 - 聯(lián)系我們 - 營銷服務(wù) - XML地圖 - 版權(quán)聲明 - 網(wǎng)站地圖TXT
Copyright.2002-2019 寧夏資訊網(wǎng) 版權(quán)所有 本網(wǎng)拒絕一切非法行為 歡迎監(jiān)督舉報 如有錯誤信息 歡迎糾正