批處理經(jīng)常要解決的問題是將兩個數(shù)據(jù)源做關(guān)聯(lián)Join操作。比如,很多手機APP都有一個用戶數(shù)據(jù)源User,同時APP會記錄用戶的行為,我們稱之為Behavior,兩個表按照userId來進行Join。在流處理場景下,F(xiàn)link也支持了Join,只不過Flink是在一個時間窗口上來進行兩個表的Join。
目前,F(xiàn)link支持了兩種Join:Window Join(窗口連接)和Interval Join(時間間隔連接。
Window Join
從名字中能猜到,Window Join主要在Flink的窗口上進行操作,它將兩個流中落在相同窗口的元素按照某個Key進行Join。一個Window Join的大致骨架結(jié)構(gòu)為:
input1.join(input2) .where() <- input1使用哪個字段作為Key .equalTo() <- input2使用哪個字段作為Key .window() <- 指定WindowAssigner [.trigger()] <- 指定Trigger(可選) [.evictor()] <- 指定Evictor(可選) .apply() <- 指定JoinFunction
下圖展示了Join的大致過程。兩個輸入數(shù)據(jù)流先分別按Key進行分組,然后將元素劃分到窗口中。窗口的劃分需要使用WindowAssigner來定義,這里可以使用Flink提供的滾動窗口、滑動窗口或會話窗口等默認的WindowAssigner。隨后兩個數(shù)據(jù)流中的元素會被分配到各個窗口上,也就是說一個窗口會包含來自兩個數(shù)據(jù)流的元素。相同窗口內(nèi)的數(shù)據(jù)會以INNER JOIN的語義來相互關(guān)聯(lián),形成一個數(shù)據(jù)對。當窗口的時間結(jié)束,F(xiàn)link會調(diào)用JoinFunction來對窗口內(nèi)的數(shù)據(jù)對進行處理。當然,我們也可以使用Trigger或Evictor做一些自定義優(yōu)化,他們的使用方法和普通窗口的使用方法一樣。
接下來我們重點分析一下兩個數(shù)據(jù)流是如何INNER JOIN的:
一般滴,INNER JOIN只對兩個數(shù)據(jù)源都出現(xiàn)的元素做Join,形成一個數(shù)據(jù)對,即數(shù)據(jù)源input1中的某個元素與數(shù)據(jù)源input2中的所有元素逐個配對。當數(shù)據(jù)源某個窗口內(nèi)沒數(shù)據(jù)時,比如圖中的第三個窗口,Join的結(jié)果也是空的。
class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] { override def join(input1: (String, Int), input2: (String, Int)): String = { "input 1 :" + input1._2 + ", input 2 :" + input2._2 }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2) .where(i1 => i1._1) .equalTo(i2 => i2._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new MyJoinFunction)
上面的代碼自定義了JoinFunction,并將Join結(jié)果打印出來。無論代碼中演示的滾動窗口,還是滑動窗口或會話窗口,其原理都是一樣的。除了JoinFunction,F(xiàn)link還提供了FlatJoinFunction,其功能是輸出零到多個結(jié)果。
如果INNER JOIN不能滿足我們的需求,CoGroupFunction提供了更多可自定義的功能。需要注意的是,在調(diào)用時,要寫成input1.coGroup(input2).where(
class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] { // 這里的類型是Java的Iterable,需要引用 collection.JavaConverters._ 并轉(zhuǎn)成Scala override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = { input1.asScala.foreach(element => out.collect("input1 :" + element.toString())) input2.asScala.foreach(element => out.collect("input2 :" + element.toString())) }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2) .where(i1 => i1._1) .equalTo(i2 => i2._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new MyCoGroupFunction)
Interval Join
與Window Join不同,Interval Join不依賴Flink的WindowAssigner,而是根據(jù)一個時間間隔(Interval)界定時間。Interval需要一個時間下界(lower bound)和上界(upper bound),如果我們將input1和input2進行Interval Join,input1中的某個元素為input1.element1,時間戳為input1.element1.ts,那么一個Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在這個時間段內(nèi)的元素將會和input1.element1組成一個數(shù)據(jù)對。用數(shù)學公式表達為,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN語義,兩兩組合在一起。上下界可以是正數(shù)也可以是負數(shù)。
注意,目前Flink(1.9)的Interval Join只支持Event Time語義。
下面的代碼展示了如何對兩個數(shù)據(jù)流進行Interval Join:
class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] { override def processElement(input1: (String, Long, Int), input2: (String, Long, Int), context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context, out: Collector[String]): Unit = { out.collect("input 1: " + input1.toString() + ", input 2: " + input2.toString) }}// 數(shù)據(jù)流有三個字段:(key, 時間戳, 數(shù)值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) .between(Time.milliseconds(-5), Time.milliseconds(10)) .process(new MyProcessFunction)
默認的時間間隔是包含上下界的,我們可以使用.lowerBoundExclusive() 和.upperBoundExclusive來確定是否需要包含上下界。
val intervalJoinResult = input1.keyBy(_._1) .intervalJoin(input2.keyBy(_._1)) .between(Time.milliseconds(-5), Time.milliseconds(10)) .upperBoundExclusive() .lowerBoundExclusive() .process(new MyProcessFunction)
Interval Join內(nèi)部是用緩存來存儲所有數(shù)據(jù)的,因此需要注意緩存數(shù)據(jù)不能太大,以免對內(nèi)存造成絕大壓力。
(正文已結(jié)束)
推薦閱讀:大股網(wǎng)
免責聲明及提醒:此文內(nèi)容為本網(wǎng)所轉(zhuǎn)載企業(yè)宣傳資訊,該相關(guān)信息僅為宣傳及傳遞更多信息之目的,不代表本網(wǎng)站觀點,文章真實性請瀏覽者慎重核實!任何投資加盟均有風險,提醒廣大民眾投資需謹慎!