作者:Piotr Nowojsk

整理:崔星灿

  • 为何选择 SQL

  • 流式 SQL 面临的挑战

  • Flink 流式 SQL 提供的多种 Join

  • 模式识别

  • 其他近期成果

 

为何选择 SQL?

 

SQL 最大的优势在于普及率高,用户基数大。和其他编程语言相比,它的入门相对简单。作为一类声明式语言,它允许用户只告诉引擎想要做什么,而无需指定如何做。因此即使是小白用户,也可轻松(讲道理有时候也不太轻松)利用 SQL 表达自己所需的逻辑。SQL 优化器会最大限度帮助你提高效率。

流式 SQL 面临的挑战

虽然 SQL 很优秀,但想要在数据流上执行它却并不容易。我们通过一个实际例子看一下:

上图展示了一个简单的双流 Join,其语义很简单,找出表 A 和表 B 中相同的 id。为了执行 Join ,需要用到相关算法,我们首先来回顾一下两个经典的 Join 算法:

  • 归并连接算法

归并连接算法的思路很简单。拿到两表后,我们首先将它们 id 由小到大排好序,然后从前往后同时遍历两表,一旦遇到相等的 id 就输出一条 Join 结果。

  • 哈希连接算法

在执行哈希连接算法之前,我们首先要对两表的规模有一个估计,然后选取较小的一张表(示例中的表 B),以 id 值为 key,以数据行为 value,建立哈希索引。随后就可以遍历另一侧大表,对每一个 id 值到建立好的哈希索引中查找有没有 id 相等的数据行,如果有则输出 Join 结果。

上述两种算法有一个区别:在哈希连接算法中我们只需要将较小的一张表加载到内存;而在归并连接算法中,我们需要将两张表都加载到内存。这个区别非常关键,因为在现实世界中两表通常会有大小之分。

回顾完经典算法,我们再来看一下流式场景下的持续查询该怎样执行。所谓持续查询指的是在没有外界干预的情况下,查询会一直输出结果,不会停止。由于输入数据流本身无穷无尽,因此在上面的查询通常都是持续查询,即每当新数据到来,可能都会有新的查询结果产生。

还是以之前的 Join 为例,假设最开始只有表B中有一条数据,由于表 A 为空,这时候显然没有结果输出。随后表A中插入一条数据1,但由于表 B 中没有与之相等的 id,因此依然没有结果。直到表 A 中42到来,才有和表 B 中已有数据相等的 id,并生成相应的 Join 结果。之后表 B 中插入数据7,但不会对结果产生影响。

在传统数据库中允许针对一张表进行全表扫描,但在流式场景下却做不到,其原因在于:1、我们无法控制下一条到来的数据属于表 A 还是表 B;2、流数据具有无尽性。既然无法进行全表扫描,传统的归并连接算法和哈希连接算法都无法简单应用。

Flink 流式 SQL 提供的多种 Join

普通连接

那 Flink SQL 里面是怎么实现 Join 的呢?简单来说,内部的 Join 算子会同时维持 A、B 两张表的哈希索引。当表 A 中插入一条新数据时,会去表 B 的哈希索引中尝试寻找满足 Join 条件的数据,同时将新到来的数据加入表 A 的哈希索引中。表 B 亦然。但这种思路有一个问题:随着数据的到来,哈希表可能会无限增长下去。这时候可以通过状态 TTL 等手段加以限制,也可以考虑选用其他种类的 Join。

时间窗口连接

在介绍时间窗口连接之前需要首先普及一下 Watermark(水位线)的概念。

现实世界中,由于数据来源多种多样且传输过程充满不确定性,因此数据乱序时有发生。为了在一定程度上缓解该问题,Flink 引入了 Watermark 机制。所谓 Watermark 就是安插在数据流中的一些特殊控制数据。既然数据流存在“乱序”的概念,那代表着每条数据都会有相应的事件时间戳(也可能是其他的次序标记),Watermark 也有自己的时间戳。每当算子遇到时间戳为 t 的 Watermark,都可以认为将不会再收到事件时间戳小于或等于 t 的数据。

了解完 Watermark 的概念后,我们回到时间窗口连接。

上图是一个简单的时间窗口连接查询示例。和普通 equi-join 相比,它多出来一个查询条件,即限制一侧表的时间需要在另一侧表的时间所定义的一个窗口内(示例查询中,限制运输时间需要在订单记录产生后的4小时内)。有了这个时间窗口条件,就可以帮助我们清理无用的哈希表(状态)。因为对任意一条流中的数据 x ,在另一条流上都有一个满足 Join 条件的时间下限(最迟时间),一旦 Watermark 超过这个时间,就表示另外一条流上后面到来的数据将不会再和 x 产生 Join 结果,此时就可以安全地将 x 从状态中清除。

时间窗口连接只适合天然存在窗口条件的场景,因为某条数据一旦过期就会被永久删除,再也无法产生包含它的 Join 结果。为了应对更多场景,Flink SQL 在近期版本新加入了历史表(Temporal Table)和相应的 Join 功能。

历史表连接

在介绍此类 Join 之前,我们需要理解历史表的概念。它是在 ANSI SQL 2011中新引入的特性,可以简单理解为,对于一个随时间不断变化的表 T ,每给一个时间 t,都会有一个对应该时间(或版本)的表格快照 T(t)。下面我们来看一个示例。

上图展示了一个货币汇率随时间变化的 changelog (不是表本身!),每条记录表示某货币在对应时间点的汇率值。为了在 Flink SQL 中为它注册一个历史表,需要用到 TemporalTableFunction(没错,历史表可以在一定程度上理解为一个 Time-Versioned TableFunction)。具体注册过程如下图所示,其中第一个参数“time”是时间字段,第二个参数“currency”表示主键。

如此,每给一个时间,就会返回所有币种在那个时间的最新汇率情况。

具体怎么用历史表做 Join 呢?还是通过示例说明,假设我们还有一个订单表,里面每行记录表示在对应时间(time)利用一定数量(amount)的某种货币(currency)所生成的订单。

一个很容易想到的操作就是根据汇率,将每个订单的货币量都转化为本地货币的量。由于货币汇率不断变化,我们需要用一些复杂的 Join 条件来完成上述任务,但如果用历史表连接,就变得很直观:

这里除了之前提到的历史表,还引入了一个 LATERAL 关键字,它表示针对订单表中的每一条数据,都需要生成一个新的汇率表。

Join 的执行过程也比较直观,每到来一条新的订单数据,根据对应时间的最新汇率计算即可(这里面其实还有一些没讲到的细节问题,留给读者思考吧)。当订单流的 Watermark 超过一定数值后,可以安全地将过期的汇率记录删除,从而限制状态的无限增长。

简单总结一下不同 Join 的特点。

OK,接下来我们再看一看 Flink SQL 近期新加的模式识别功能。这里的模式指的是一些能用正则表达式描述的某些数据特征序列。

SQL 中有专门的“MATCH_RECOGNIZE”语句来做模式检测,如下图所示:

这个查询的大致要做的事情是:从 Ticker 表内按照不同 symbol 分组,找出一段时间内价格均值小于15的连续事件序列。虽然看上去很吓人,但语句其实不难理解,我们一步一步来看:

  • 最上方的“SELECT * FROM Ticker”表示要针对 Ticker 表做模式识别。

  • 接下来的“PARTITION BY symbol”和传统 SQL 的“GROUP BY”类似,都是将数据按照某些列进行分组。

  • 随后的“ORDER BY rowtime”用于指定每个分组内的数据顺序(时间升序)。

  • 进入匹配环节,首先看最下面的“PATTERN…DEFINE…”子句,它用来定义我们想要识别的模式。在示例中所定义的模式表示“事件A出现1次以上,在事件A后面跟着一个事件B”。其中事件A定义为当前所有匹配A的事件(包括当前待匹配的事件)的价格均值小于15,由于未提供事件B的定义,可以认为任意事件都可以匹配B。

  • 回到上方的“MEASURES…”子句,它定义在发现模式后我们希望输出的具体结果。在示例中,一旦匹配成功将会返回模式 B 以及模式 A 中的事件数。

  • 下方的“ONE ROW PER MATCH”表示针对每个匹配成功的模式,输出一条结果。除了ONE ROW PER MATCH,SQL标准中还支持ALL ROWS PER MATCH,它表示对于每个满足模式的数据流中的每条数据,产生一条输出结果。Flink SQL目前还不支持ALL ROWS PER MATCH语句。

  • 最后“AFTER MATCH TO FIRST B”是一个匹配选项,示例中表示当产生一个成功的匹配串之后,下一个匹配串的查找,从这次匹配成功的匹配串中的模式B开始。

总结一下利用 MATCH_RECOGNIZE 子句进行模式识别的两个点。其一是它和 GROUP BY 子句类似,可以理解为执行一个特殊的聚合;其二是这个子句已经成为 SQL 2016 标准的一部分。

其他成果

最后我们来看一下2018年其他已经完成或正在进行的 Flink SQL 相关工作。

  • 2018年 Flink 新添加了一个 SQL Client 模块,允许用户在配置好数据源等信息的前提下,直接通过命令行调用 Flink SQL,无须用 Java、Scala、Python 等语言进行编码。现阶段 SQL Client 的功能还有些局限,比较适合快速开发原型等场合,大家如果有兴趣可以多参与贡献。

  • 社区一直在不遗余力地拓展 Flink 与其他项目之间的连接及适配工作,近期正在添加外部 Catalog 功能。该功能完成后 Flink 将可以直接访问 Cassandra、Hive 等系统的元数据。

除了上述两点,Flink SQL 在2018年还有很多新功能及改进,大家可以去阅读官方文档或源码学习。


欢迎转载,敬请在正文中标注并保留文章来源、原文链接、作者/译者等信息。

浙公网安备 33010802010061号 浙ICP备19028187号