五分钟搞定 Flink 双流 JOIN 面试题
作者西贝
在实际的开发中,JOIN操作是我们经常使用的。JOIN的本质是数据拼接,由于我们无法将所有的数据都存储到一张表中,所以也就有了JOIN操作,JOIN操作可以很方便地根据需要将不同表的数据拼接在一起。本文总结了Flink双流JOIN的常见面试题,希望对你有所帮助。
1、双流JOIN与传统数据库JOIN之间的区别是什么
- 数据集合 :传统数据库左右两个表的数据集合是有限的,双流JOIN的数据会源源不断的流入
- 结果更新 :传统数据库表JOIN是一次执行产生最终结果后退出,双流JOIN会持续不断的产生新的结果
- 计算驱动 :双流JOIN由于左右两边的流的速度不一样,需要状态存储,双流驱动。比如:左边数据到来的时候右边数据还没有到来,或者右边数据到来的时候左边数据没有到来,所以在实现中要将左右两边的流数据进行保存,以保证JOIN的语义。
2、Flink INNER JOIN与LEFT JOIN的实现原理
LEFT JOIN示意图
LEFT OUTER JOIN 可以简写 LEFT JOIN,语义上和INNER JOIN的区别是不论右流是否有JOIN的事件,左流的事件都需要流入下游节点,但右流没有可以JION的事件时候,右边的事件补NULL。
如上图所示:当左边先流入1,2事件时候,右边没有可以join的事件时候会向下游发送左边事件并补NULL向下游发出,当右边第一个相同的Join key到来的时候会将左边先来的事件发出的带有NULL的事件撤回(+代表正向记录,-代表撤回记录)。这里需要注意的是:
- 左流的事件当右边没有JOIN的事件时候,将右边事件列补NULL后流向下游 ;
- 当右边事件流入发现左边已经有可以JOIN的key的时候,并且是第一个可以JOIN上的右边事件(比如上面的3事件是第一个可以和左边JOIN key P001进行JOIN的事件)需要撤回左边下发的NULL记录,并下发JOIN完整(带有右边事件列)的事件到下游。后续来的4,5,6,8等待后续P001的事件是不会产生撤回记录的。
- 在Flink系统内部事件类型分为正向事件标记为' + '和撤回事件标记为' - '。
INNER JOIN
这里关于INNER JOIN的语义和大家强调两点:
- INNER JOIN只有符合JOIN条件时候才会有JOIN结果流出到下游,如果关联不上,流入时候没有任何输出,因为左边还没有可以JOIN的事件;
- INNER JOIN两边的数据不论如何乱序,都能够保证和传统数据库语义一致,因为我们保存了左右两个流的所有事件到state中。
3、如何保障左右流的JOIN数据在同一个节点进行处理的
分布式流计算所有数据会进行Shuffle,怎么才能保障左右两边流的要JOIN的数据会在相同的节点进行处理呢?在双流JOIN的场景,我们会利用JOIN中ON的联接key进行partition,确保两个流相同的联接key会在同一个节点处理。
4、Flink双流JOIN的state状态
State分类
-
OperatorState
顾名思义, OperatorState 是与单个并发 operator 绑定, Kafka Connector 是一个典型使用范例, 即每一个并发 operator 实例都会把读取的 partitions 及对应 offset 保存在 OperatorState 中, 以便出现 failover 后恢复状态;
-
KeyedState
用于 Keyed 算子, 这里的 key 是指 SQL 中的比如聚合算子的 group key, Rank 的 partition key 或双流 Join 的 join key, KeyedState 提供了多种类型的 state 接口以方便用户灵活的操作当前 key scope 下的状态, 常用有:
- MapState: 使用方式类似 Java 的 Map, 提供了 <MapKey, MapValue> (注意区分这里的 MapKey 和 KeyedState 的 key) 的增删改查各种操作接口, 常用于 UV 聚合计算以及双流 Join 的状态存储;
- ValueState: 单一变量值, 比如聚合计算 sum, 一个 key 对应一个 value 值;
双流JOIN的State数据结构
双流 Join 状态 State数据结构
双流 Join 算子需要把左右流的数据都存储到 State 中, 对于此案例中, 均使用KeyedState<JoinKey,Map<rowData, count>>进行存储, 其中:
JoinKey
:第一级MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件 Join 算子是 Keyed 算子,上游数据按照 Join Key 进行 shuffle, 发送到 Join 算子;Map<rowData, count>
:第二级MAP的key是行数据,比如示例中的P001, 2,value是相同事件值的个数,即rowData 出现的次数
数据结构的几点说明
- 记录重复记录 - 利用第二级MAP的value记录重复记录的个数,这样大大减少存储和读取
- 正向记录和撤回记录 - 利用第二级MAP的value记录,当count=0时候删除改元素
5、Flink版数据的状态保存时间是多久?
-
实时计算Flink版1.0版本。
#使用rocksdb作为statebackend。 state.backend.type=rocksdb #rocksdb的数据生命周期,单位为毫秒。 state.backend.rocksdb.ttl.ms=129600000
-
实时计算Flink版2.0及以上版本。
#使用niagara作为statebackend。 state.backend.type=niagara #niagara的数据生命周期,单位为毫秒。 state.backend.niagara.ttl.ms=129600000
-
实时计算Flink版3.0及以上版本。
#使用Gemini作为statebackend。。 state.backend.type=gemini #gemini的数据生命周期,单位为毫秒。 state.backend.gemini.ttl.ms=129600000