Stream SQL 的执行原理与 Flink 的实现

在数据仓库应用中,执行 ETL 过程是一种常见的需求。我们希望通过 ETL 过程预处理我们的原始数据, 从而达到抽取有用信息和将数据转换为适合进一步查询的格式等目的。MapReduce 和 Spark 等批数据处理系统已经很好地解决了在高延迟的场景下的需求,目前低延迟的流式处理和增量计算是主要的发展方向。 本文将结合 Apache Flink 系统讨论相关技术课题。

伴随着流处理系统的发展,SQL 特别是 Stream SQL 系统也渐渐流行起来。这可能得益于 SQL 是一门已经被学界和工业界充分研究过的查询语言。 尽管 SQL 可以认为是图灵完备的,但其相较于更加通用的一些计算模型, 其表现力和应用范围是受限的。在这篇文章中将讨论一下 Stream SQL 实现原理、应用场景和能力范围。

一般来讲,现实应用中各个计算模型能力范围可以表示成如下图所示。

Computing Models

尽管已经知道这件事情,我们仍然对如下一些问题感兴趣:

  1. SQL 和 Stream SQL 的能力的边界在哪里?给定任意一个 SQL 查询,我们是否可以判断其能否使用 Stream SQL 执行?
  2. 如果一条 SQL 可以使用流式处理来执行,具体要如何实现?我们格外感兴趣的是 Group By 和 Join 操作的实现。
  3. 在了解过 Stream SQL 的基本原理之后,我们进一步感兴趣的问题是现存的系统(特别是 Flink)的相关实现细节。

接下来的文章我们就来讨论这些问题。在增量 SQL 查询算法这一章首先来介绍诸如 Flink 这类系统所采用的实现 Stream SQL 查询的理论, 在流式处理与时间控制这一章,我们将讨论 Stream 处理系统的一些基本的概念和如何操作时间。最后, 我们将会讨论 Apache Flink 的能力与局限,致力于对 Flink 执行相关任务方法的进行一个简单刻画。特别地,我们会花费一些篇幅在 Flink 内部状态的管理。

增量 SQL 查询算法

一般来讲,给定一条 SQL 如果其源数据表中有一些数据发生了改变,我们需要重新全量执行这条 SQL 才能得到更新过的结果。 增量 SQL 查询则意味着我们可以只依赖源数据的改变量,局部地执行查询并更新原来的结果。使用增量模型, 我们往往可以得到更快的执行方案。很显然,Stream SQL 执行就是增量 SQL 查询:新到达的数据就是在一张“源数据表” 当中新加入的数据项。

为了介绍增量 SQL 查询算法,首先来看一些术语的解释:

  1. 视图(View):是在关系型数据库当中被保存的一个查询
  2. 物化视图(Materialized View):是将一个视图的内容“物化/预计算保存”下来的结果
  3. 表(Table):是一类可以支持扫描(Scan)或查询(Query)的数据源
  4. 索引(Index):是附加于表或者视图上用于加快查询的数据结构

请注意,在本文的讨论当中,索引指代的是一类需要引用原始数据的数据结构,也就是说索引只包含了原始数据当中的一部分信息。 包含原数据的全部信息并且可以无损翻译回去的可以认为是物化视图。另外,物化视图显然也是一种表:他支持扫描和查询操作。

容易得到,增量 SQL 查询的问题就是物化视图内容维护的问题。我们显然希望增量地维护物化视图的内容, 而不是每当源数据表改变就全量刷新。

物化视图的概念最早由 Oracle 和 SQL Server 等商业数据库作为索引的一种补充而引入。 学术界和工业界至今已经积累了很多相关研究,从而形成了一整套方法论。 很多现代的流式处理系统也采用了这些方案:Stream SQL 是物化视图维护问题的一个子问题。

为了阐释为什么物化视图是一种有效地加速查询的功能,我们先来花一些时间在 SQL 查询的优化与执行规划问题上。

SQL 优化与执行规划

SQL 查询优化原理与 Volcano Optimizer 介绍 一文中,我们已经对相关算法进行了详细介绍。简单重复一下的话,作为一种声明式的查询语言,我们可以将 SQL 转化成一种叫关系代数(Relation Algebra)的抽象表示来进行计算。这样的一种抽象表示被称为 SQL 执行的逻辑计划(Logical Plan)。 逻辑计划一般由一棵算子(Operator)组成的的树表示,这棵树表示了算子之间的依赖关系和执行顺序。特别地, 我们还可以为这课树表示的方案及其每棵子树进行执行花费的估算。下图展示的就是一条 SQL 的执行方案。

Relation Algebra

上图左边是逻辑计划,右侧是对计划成本的估算,其中展示的算子有 Join、Project 和 Filter 等。这里 Project 表示的是对上游输入的每个元素进行变换处理(如选取列、对每行进行数值变换等)的算子。利用右侧成本估算进行 SQL 查询优化的优化器被称为基于成本(Cost-based)的优化器。在实现上, 每个抽象算子可以结合子树传入的成本和自己的内部实现向上返回一个对自己成本的估算。也就是说成本估算是递归执行的。

比较先进的优化器大多基于成本估算,其基本原理是反复应用查询变换的规则将原查询修改为语义相同等另一方案, 在这一过程中不断估算成本并最终选取成本最低的方案。关于高效实现这一思路的 Volcano/Cascades 算法, 前面提到的文章已经进行了充分介绍,这里不再赘述。

在本文的语境当中,值得关注的是这种优化器提供的两种特别的可能性:

  1. 将逻辑计划转换为物理计划的能力
  2. 将整棵子树转换为单个算子节点返回预计算结果的能力

所谓的物理计划,指的是方案中每个算子都包含了执行这个方案所需要的信息的方案。在有的实现中, 物理计划当中的算子可以直接被调用执行。将计划划分为逻辑计划和物理计划的意义在于, 我们可以为语义相同的算子提供不同的物理实现。比方说如下图所示,针对 Join 节点我们有两种不同的实现方法: 使用哈希表的 Hash Join 和使用排序方法的 Sort Join。这两种实现方法有不同的适用场景, 比方说,Hash Join 一般性能较好,但是适用的数据规模有限,而使用外部排序的方法 Sort Join 一般可以支持非常大的数据范围。在现代数据库的优化器当中,这两种物理算子的选取是依据方案的成本估计来选取的。

Choosing Physical Plan

可以看到,在上述方案选取当中,尽管 Hash Join 本身可以以更高的性能执行,然而 Sort Join 可以具备返回有序结果的特性。 因此,在规划时更上层的算子可以利用这一特性,反而可以得到更优的成本估算,因此包含 Sort Join 算子的方案会最后胜出。 在这里,逻辑计划到物理计划的转变同样也是通过规则来实现的,我们可以看到在这一模型中, 查询优化和物理计划的生成被统一成一个过程,不得不说是十分精巧的设计。

使用预计算结果代换整棵子树更容易理解:获得子树计算结果最快的方式就是直接从预计算结果当中读取。 这恰恰就是使用物化视图的思路所在:将所有的物化视图注册在优化器当中, 优化器就有可能自动发现一个方案可以利用这些预计算结果。

Materialized View

物化视图增量维护的简单算法

如上文所述,物化视图就是对一条 SQL 查询的缓存。由于 SQL 是一门封闭的查询语言,它具有以下两条特点:

  1. 如果一颗树是 SQL 的代数表示,那这棵树的所有子树也是某条 SQL 查询的代数表示
  2. 将一颗表示 SQL 的树表示插入到另一颗树中作为子树,得到的树仍然是 SQL 的代数表示

因此,如果我们使用子查询或者视图进行查询,我们就可以将子查询和视图的代数表示直接插入到父查询之中, 作为一个整体优化和计算。这一过程称为子查询/视图展开。反过来,视图/物化视图也就可以完全由 SQL 查询来定义。 特别地,我们可以把视图/物化视图直接作为一个表来看待,在思考问题的时候不去考虑他的内部结构, 这样就比较容易进行讨论。

因此,对物化视图进行增量维护的最简单算法就是从根算子开始,将其左右两颗子树作为整体看作“似表(Table-Like)”。 显然,这些似表都支持扫描和查询功能。与一般的 SQL 查询不同,在增量 SQL 查询中,当一个表的内容改变, 我们希望这些表将内容的修改表示成包含增加的行和减少的行的增量表(Delta Table)的形式。 这些增量表将会被送入上层算子进行处理。

当上层算子接收到增量表了之后,他可以通过三种数据来源来判断要如何增量地执行这一查询, 并进一步将生成的增量表向上发送。这三种数据来源是:

  1. 这一算子自己维护的内部状态
  2. 某一子节点向上发送的增量表
  3. 把子树当成表并发送请求查询其内容

下图展示了这一框架:

Incremental Operating

在最上层的根节点完成这样的计算后所得的增量表,就可以应用在物化视图原先保存的结果上,从而得到新的结果。 而如果我们能为每一个算子,如 Project、Filter 和 Join 都设计这样的执行方案(实现对应的物理算子) 是不是我们就可以为任意的 SQL 实现增量查询了呢?答案是肯定的: 对于任何标准 SQL 里的算子, 我们总可以将其转换成上述产生增量表的形式。也就是说我们有 $\alpha(T + \Delta T) = \alpha(T) + Q(T, \Delta T)$。 这里 $\alpha$ 代表一个算子,$T$ 是基表,$\Delta T$ 是增量表,$Q$ 是一个以 $T$ 和 $\Delta T$ 为参数的查询。 显然 $Q(T,\Delta T)$ 即我们所需输出增量表。

尽管上述增量维护的简单算法(或称为代数算法)提供了一条实现通用增量执行的道路, 仍有一些问题值得讨论:

  1. 时间成本:执行增量刷新操作的时间花费
  2. 空间花费:为了支持增量刷新,算子内部需要保存的状态大小
  3. 刷新时机:激发刷新操作的正确时机是什么?

问题3的答案很简单,通过时算子缓冲一些修改批量执行或者添加其他的控制策略,可以有效的提高执行效率,节约资源。 本文不对这一问题进一步展开,而将主要关注时间和空间花费两方面。

查询放大问题及其解决思路

上文提到,增量查询的简单算法(代数算法)有时需要通过查询上游表来获得必要的数据,从而生成输出给下游的增量表。 一个问题就是增量表的计算公式 $Q(T, \Delta T)$ 的执行效率。遗憾的是,在前述算法当中的一些相当简单的算子(如 Project)等, 这一公式的执行效率都不高,有的甚至隐含着需要全表刷新。下面是一些例子:

  1. Project 查询放大问题:考虑简单查询SELECT MAX(a, 42) FROM example。当下游给定了一个删除某以a = 100 的行, 这时因为在物化视图当中,每一个因 MAX 函数执行而产生的数值 42 都是相同的,但 Project 算子又必须保留元素的排序, 从而出现了不知道删除哪条记录的情况,因此必须全量查询原表并重新刷新视图
  2. 聚合计算查询放大问题:考虑查询SELECT count(distinct a) FROM example 由于表中 a 元素可能会有重复, 在没有其他附加信息的情况下,每次增量查询执行只能重新扫描全表。这种情况在执行 SELECT a, count(distinct b) FROM example 的时候也会出现,根据 (a, b) 数据的分布,相关聚合算子 AGG 可能需要进行规模不等的扫描。
  3. TopK (Sort Limit) 查询放大问题,设想查询SELECT a FROM example ORDER BY a LIMIT 10。很显然, 虽然输出的视图里只包含10条元素,然而为了处理某一增量表,相关查询不得不重新扫描全表来选取最小的十个数字.
  4. Theta Join 问题。对于比较自由的 Join 条件,如查询 SELECT T1.a, T2.a FROM T1 JOIN T2 ON T1.a * T2.a < 100 尽管最终符合条件的元素数量并不多,但仍然可能需要对两个表进行大量查询

然而,解决查询放大问题并没有一定之规。现有的方法主要是针对各种特定情况进行优化。几种可能的思路:

  1. 确保处理过程当中的行唯一性。也就是说首先为每一行指定一个唯一的 ID,保证这些 ID 在整个计算过程中不断地在算子之间传递。 有些操作如 Group By 等需要根据条件修改这些 ID。保证这些 ID 被增量表中的每一行携带。 这样就容易获知应该修改目标视图当中的哪些行。然而对于比较自由的 Join 算子,可能无法为下游行生成合理的 ID, 这也是有些系统只支持 EquiJoin(带有相等条件的 Join)的原因。
  2. 使得算子保存额外的内部状态,将 $Q(T, \Delta T)$ 变换为 $Q’(T, \Delta T, S)$ 这里 $S$ 是算子的内部状态。 这样得到的函数 $Q’$ 就可以支持更加快速的查询并使得扫描的范围更小。额外的内部状态可能包括对子树内容的物化(缓存)本身, 也可以有根据查询条件或 Join 条件生成的索引等。实现这种思路没有一劳永逸的方法, 大多需要根据SQL 优化与执行规划 这一章介绍的优化模型进行统筹考虑和方案选择。也就是说提供多种物理计划算子以待选择
  3. 使用近似计算。对于某些聚合查询,如COUNT(DISTINCT value))和 TopK 等,可以使用 HyperLogLog 等算法进行近似计算, 并将结果保存在内部状态中。这样查询虽然输出了近似结果,但是在时间和空间上都获得了优化
  4. 限制或扩充语义。一方面可以对 SQL 在某些方面的能力进行限制,从而防止全量查询的发生。 另一方面可以增强 SQL 的建模能力,加入诸如 Window 之类的概念,使得用户可以更好地描述自己的需求, 并将相关查询的扫描范围限制在一定的规模。这种方式往往常用于 Stream SQL,在之后会进一步介绍相关内容

在上述各种方案都无法有效解决问题的时候,一种方法就是完全退化为全量刷新。 这是因为某些场景下增量查询的执行可能比全量刷新具有更高的成本,这时根据 SQL 的成本估算选择全量刷新执行是更明智的。

修改放大问题及其解决思路

实现 SQL 增量执行最棘手的问题是修改放大问题。这一问题指的是源数据表当中一条简单的修改, 在算子增量执行的时候会产生大量的下游修改。也就是说某一个算子接收了一个很小的增量表,却向下游输出巨大的增量表。 这种情况往往出现在使用 Join 的时候。在这里,我们假设系统已经根据上文当中的查询放大问题进行了优化, 因此算子输出的增量表的内容都是必要的。研究下面的 SQL 的例子:

1
SELECT A.a, B.b FROM A CROSS JOIN B

作为一个没有条件的 Cross Join,很显然,AB表当中的任何修改都会导致另外一个表的全部内容被查询并插入到增量表。 在一些即使有限制条件,但是数据分布比较倾斜的 Join 场景下也会出现这样的问题。这时往往只有一些无奈的选择:

  1. 延迟刷新:通过选择时机,预防连续小的修改产生连续的批量计算操作
  2. 限制/扩充语义:和解决查询放大问题相同,通过限制和扩充语义,只满足用户的一部分需求。 特别是在流处理系统当中,引入一些新的 Join 形式和 Window 的概念,反而可以增强用户表达和实现流式处理需求的能力

在后文的流处理模型介绍一章将会进一步介绍新引入的流式 Join 形式。

可自我维护性(Self-maintainability)

一个算子被称为可自我维护的,当他可以完全使用内部状态处理增量表并输出数据给下游。 也就是说,对于可自我维护的算子,其增量表生成函数的形式是 $Q’’(\Delta T, S)$,其中 $\Delta T$ 是增量表、$S$是内部状态。

可自我维护性在流处理和分布式查询场景下十分有用。首先对于流处理来说,输入表也许是不可重入的, 也就是说你不能轻易地查询任意久远之前的数据。这时就要求算子能支持一定的可自我维护性, 避免反向查询输入流的操作。对于分布式查询来说,不同算子可能运行在不同的机器上, 因此跨算子的查询因为网络延迟的原因往往会比较低效。

有一些算子,如 Filter 等天然就可自我维护。另外一些算子(Join、Agg 等) 往往需要通过全量物化自己代表的视图才获得,这种做法往往需要消耗大量的状态空间。 当然也存在一些空间消耗比较适中的特别解决方案,但是他们都要根据其参数和输入数据分布, 通过成本估算来选定算法来实现,没有通用的解法。

一般来说,一个算子是否可以自我维护在 SQL 优化和计划生成阶段就完成了,但也有一些研究着眼于动态可自我维护性。 这种方法实现的算子会维护一些特别的状态,以便于分析输入的增量表,对于其中可以自行解决的项目直接利用内部状态计算, 其他的部分再反向查询。

值得注意的一点是,可自我维护性并不是取得高查询性能的必要条件。查询的执行器应该综合考虑时间和空间成本的平衡, 在整个查询产生的算子树上选择部分合适的节点物化内容和实现可自我维护性。从这里可以看出, 查询优化器及其相关算法增量 SQL 处理过程当中的重要作用。事实上,根据输入数据的分布的统计数据和算子的特性, 建立查询成本估计的数学模型是一项非常重要且紧缺的技能。

Optimizer Guy Wanted

流式处理与时间操作

流(Stream)可以被看作一种无界(Unbounded)且只可追加的(Append-Only)的数据表。 如果把新来的事件看作插入条目的增量表,我们就可以用之前提到的物化视图维护算法增量地执行 SQL。 Apache Flink 和 Samza 之类的系统大多采用了这种方式。

由于流处理系统的输入是无限增长的,我们希望能就以下问题进行讨论:

  1. 如何在流处理系统当中处理时间,并利用这一特性限制内部状态的大小
  2. 如何扩展 SQL 以支持描述时间方面的需求,使得执行器更好地理解需求并执行

流处理系统的时间操作

在现实世界的分布式系统当中处理流主要面对的问题有:

  1. 绝对时钟(Absolute Clock)问题:现实世界当中的时钟往往不精确,而且在分布式系统当中实现时间绝对同步是(物理上)不可能的
  2. 时间倾斜(Time Skew)问题:由于系统中必然会存在网络延迟、网络中断和系统崩溃等问题, 发送到系统的消息和系统内部的消息很可能失序乃至丢失

对于第一个问题,现代操作系统普遍使用事件驱动(Event-Driven)模型来处理。通过使用消息本身携带的生成时间(Event-Time) 而不是系统接收到消息的时间(Processing Time)来进行处理。这样的系统当中,时钟是由事件来驱动的。 没有新的事件到开,系统的状态就如同冻结起来了一样。

激发器(Trigger)是一类特别的事件,当这种事件的消息被接收到时,某些任务会被激发和执行。 可以被当作激发器的消息有很多:

  1. 新的数据从消息队列中到达
  2. 某一算子计算产生的增量表发送到下游算子
  3. 下游算子对上游算子发送的 ACK 消息

有了激发器之后,我们的模型当中就不需要存在物理时间, 采用了这种事件驱动模型或者反应式设计模型的系统可以变得更加函数式和无状态。

值得注意的是,每个算子可以自行决定自己处理激发器的逻辑。它们可以自由地忽略、收集、聚合和发送事件。 这些逻辑设计有可能有助于提高系统的性能和降低通讯开销。

接下来考虑时间倾斜问题,可以回忆一下 TCP 是如何处理丢包和不按顺序到达的包的: 为每个包编号并维护已经获得 ACK 的包的编号。在流式系统里也采用了类似的方法。

水印(Watermark)就是用来处理这一问题的。简单来说,水印就是根据消息的事件时间来决定一条消息应该被处理还是被丢弃的标记。 下图展示了水印起作用的方式:

Watermark

上图中,从右到左是消息到达的时间,在某时刻,消息8通过激发器激发了一次对水印的修改。此时水印的时间限制被修改为 4 。 这意味着之后到达的标号时间小于4 的消息都会被丢弃。在消息 8 之后到达的消息 7 和 5,虽然时间戳比消息 8 要早, 但是因为仍在 Watermark 的范围里,因此会被考虑在内。最后到达的消息有时间标号 9,他是一条当前观察到过的消息之后的消息, 因此也会被处理。

从上述讨论当中我们可以看到:

  1. 水印应该永远小于当前处理过的事件的时间戳
  2. 水印是通过激发器的激发来移动的,算子可以自己决定移动水印的时间,而不是每个接收到的事件都会改变水印
  3. 水印必须是单调递增的。否则,一旦水印向前移动,我们无法知道是否已经有被包含在水印范围里的消息被丢弃

水印不仅仅是处理时间偏移问题的利器,他也有助于实现限制算子内部状态大小的逻辑。算子可以通过检查水印标记的时间, 将自己内部状态中较老的不会被用到的条目移除掉。在实现严格单次发送(Exactly-Once Delivery)的系统中, 正确管理水印对于防止移除之后仍可能被查询到的条目非常重要。因此,在比较先进的系统中使用者都可以指定一个如下形式的函数:

$$t_{watermark}=F(t_{processed}, t_{ack}, Env)$$

在上述公式中,水印 $t_{watermark}$ 是由处理过的消息的时间 $t_{processed}$、从下游接收到的 ACK 消息附带的时间 $t_{ack}$ 和系统的其他环境参数 $Env$ 所决定的。算子决定水印的逻辑十分有灵活性,但是设计这样一个函数也需要一些灵感:

  1. 如果水印前进的太慢,算子的内部状态可能膨胀于过大
  2. 如果水印前进太快,过多的消息可能被丢弃掉

窗口(Window)是一种设计出来让用户更好地描述它们对时间的需求的工具。 他可以让用户在一个窗口里以有限时间/数据范围的方式操作数据,同时也为进一步优化时间空间成本提供了可能。

流处理系统提供的常见的窗口类型有:

  1. 固定窗口(Fixed Window):长度固定的窗口,每个窗口一个紧跟着一个将时间维度划分成片段
  2. 滑动窗口(Sliding Window):长度固定,但每个窗口的开始时间相比于前一个窗口都有一个固定的时间偏移
  3. 会话窗口(Session Window):使用事件的属性和相互的时间间隔把他们组织在一个窗口里。这些窗口的开始时间、 持续长度等都会变化。这类窗口用于用户追踪、线索跟踪等场景十分有效

Windows

有了窗口的语义,流处理引擎就可以强制一些不适合全局使用的计算(如 Join 和 Group By 等操作)在同一个窗口内完成。 这样,执行任务时需要处理的数据量和计算成本都有了边界。事实上,有些系统只支持窗口内的 Stream Join。

值得注意的是,窗口和水印是两个不同的概念。一个窗口的结束时间已经过去, 并不意味着这个窗口不能再接收迟到的落入这个窗口的消息。实际上,这个窗口的内部状态可以被一直保存, 以便于在接收到新的消息之后刷新窗口内容并输出增量表。一个窗口及其内部状态将只会在水印完全通过之后被回收。 事实上,一个窗口的打开和关闭都有赖于激发器的作用。

Window with Watermark/Trigger

从上图可以看出,窗口不仅是一个查询聚合的单位,也是状态管理的单位。水印则要么完全包含一个窗口, 要么完全通过一个窗口。图中也介绍了一个新的概念句点(Punctation)。句点是一些在窗口关闭之前激发的激发器, 他使得窗口可以输出他的中间结果而不必等带整个窗口的消息都处理过。这对于提供低延迟数据传达十分有用。

对于激发器、水印、窗口和句点这些概念,十分建议进一步阅读如下文章和论文:

  1. Stream 101: The world beyond batch
  2. Stream 102: The world beyond batch
  3. The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing

Stream Join 的语义

了解完流式处理当中的时间处理,接下来我们总结一下在流式系统当中实现 Join 的语义。 显而易见,由于 Stream 都是无边界的数据,传统数据表当中的 Join 概念在流处理系统当中可能不完全适用。 来流处理系统当中的 Join 往往有以下几种类别和语义:

  1. Stream 与纯静态表 Join。这里的纯静态表的内容不会改变,因此 Join 的实现只是在 Stream 端对每个消息在静态表内进行查询
  2. Stream 与动态表的快照 Join。动态表的内容可能会出现增删改等情况,这里的 Join 的语义是, 当对流当中的某个消息实施 Join ,相当于查询了动态表在那一时刻的快照
  3. Stream 与 Stream Join,操作的两边都是 Stream,这种情况最为复杂也很难实现,在之后将会进一步介绍

Stream Joins

与纯静态表 Join 是简单的。动态表与Stream Join 则是通过将动态表处理成类似 MVCC 并发控制那样的形式, 因此在每一个来自于 Stream 的消息需要 Join 时,只需要查询对应事件时间下动态表的快照即可。 前面说过,Stream 也可以认为是一种表,如果把 Stream 里的消息解读成插入或覆盖操作, 就可以得到一个动态表。请注意,在 Stream Join 动态表这个模型中,动态表这端虽然可能也是由 Stream 而来,但是对动态表的插入和修改操作(也表示称 Stream 消息)并不会激发 Join 结果的刷新。 这些消息只是被加入到内部状态中,等待 Stream 里的消息激发刷新时查询。 这是动态表 Join Stream 与Stream Join Stream 最大的区别。

Stream Join Stream 是最难以实现的 Join 方式,前文提过,这种类型的 Join 在两边有消息来的时候都有可能激发大量查询和修改操作,因此面临着严峻的查询放大放大和修改放大问题。 因此虽然可以利用物化视图的算法来解决,实际应用中却不十分方便。 在流式处理系统中,一种解决方法是结合窗口语义实现局部的 Join,下图描述了不同窗口下 Join 的语义。

Windowed Join

可以看到,如此 Stream Join 就被转换为了局部的窗口 Join。除了这篇文章内介绍的 Join 方式, 学术界和工业界还提出了许多其他的形式和实现方法,在这里不再一一枚举。这些 Join 都是为了满足实际需求所提出 Join 的简化变种。

在介绍完的一些基本理论之后,这一章节结合实际系统 Apache Flink 对相关技术实现进行一点总结。 简单来说,前面所介绍的流式处理的各种概念,基本上都已经被 Flink 采纳和实现了。 因而我们将主要讨论其他涉及的问题:

  1. Flink 内部的状态如何管理?
  2. Flink 如何实现严格单次发送(Exactly-once Delivery)?
  3. Flink 的 SQL 系统如何实现,其能力范围如何?

状态管理

Apache Flink 实现了所谓的有状态(Stateful)流式处理的模型。它适用了常用于事件驱动开发或反应式设计模式的 Akka 和 Actor 模型实现算子。当一个 Flink 任务进程开始之后,可以托管一个或多个算子。每个算子将会在本地维护自己的状态。 Flink 的状态储存后段可以选用 In-Memory、FileSystem 和 RocksDB 几种。只有 RocksDB 后端支持增量 Checkpoint。

在比较早的流处理系统如 MillWheel 中,选择了远程状态储存, 如 HBase、BigTable 等。 而一些新近的系统则声称本地储存才是流处理的最佳拍档。 现在,诸如 Apache Flink 和 Apache Samza 的系统都使用本地储存来实现超低延迟的数据处理, 这是因为远程状态由于网络通讯的原因会导致数据处理请求变慢。此外,本地状态也更容易实现并发控制等功能。

不过,本地状态也并非完美,他的主要缺点包括:

  1. 难以正确实现持久化和容错
  2. 容易受到偏斜数据分布的影响,数据难以再平衡(Rebalance)

为了实现持久化和容错,Flink 支持 Checkpoint 系统状态。这一过程也很类似 MVCC 的过程。 在两次检查点之间的状态修改将会以增量的方式储存,如果处理管线错误推出,这些中间修改的内容将会一起被丢弃, 因此系统就恢复到上次 Checkpoint 时的一致状态。

由于每个算子不断地和其他算子相交互,Checkpoint 也必须是对系统全局一致的快照, 否则在恢复系统是,不同的算子可能有状态不匹配的问题。Apache Flink 通过 Chandy-Lamport 快照算法 一个变种来实现异步的全局一致 Checkpoint。

Flink 也提供了 Savepoint 的概念,Savepoint 是一种包含更全面信息的 Checkpoint。 虽然需要花费更多时间来构建 Savepoint,它却使得系统状态回滚和迁移变得方便。

严格单次发送

严格单次发送对于流式处理系统意义重大。设想我们需要实现一个点赞数量统计的功能, 这时我们显然不希望消息被多发或者少发而导致统计不准确。遗憾的是在现有网络条件下这点不能轻松实现。

Flink 依赖于其强一致的状态管理来实现严格单次发送的语义。在分布式系统当中无法完美发送消息的主要原因是:

  1. 网络延迟或中断导致无法可靠传递消息
  2. 重发协议会可能会导致消息被发送多次

通过消息编号、重发和去重,我们可以在一个相对稳定的网络条件下实现副作用意义上的严格单次发送。 所谓副作用意义上的严格单次发送,意思就是指在至少一次发送(At Least Once)发送的前提下, 通过去重消除重复消息对系统状态产生的作用,这样计算得到的结果就根所有消息只发送了一遍相同。

Flink 将记录发送进度的标志一并保存在算子状态中并交由 Checkpoint 定时保存。 这时,当系统崩溃重启,不光系统状态恢复到之前,进度记录也恢复回去了。 因此系统就像那个时候一样重新开始了工作。

为了能真正获得准确的结果,不单需要 Flink 系统内部的保证,外部系统也要符合一些要求:

  1. 输入的消息队列对消息有编号
  2. 消息队列支持重设到某一过去编号的位置开始消费
  3. 输出端需要支持事务类型的写入或者写入是幂等的

第三条的原因是,当 Flink 因重启恢复到之前状态时,部分工作结果可能已经被写出。 而从恢复的状态开始,这些结果又将会被重复输出一遍。因此,要么输出系统支持 Flink 提供的 2-Phase Commit 协议从而可以撤回未 Commit 的数据,要么程序被设计成重复写入所得结果不变的形式。

值得高兴的是,Kafka 恰好满足上述条件。Flink 也提供了使用 Kafka 实现严格单次发送的 Connector。

Stream SQL

在事件驱动模型上,Flink 实现了流式处理和批量处理,并在这基础上进一步提供了 Table API 和 SQL 的支持。 其 Table API 和 SQL 基本上实现了之前提到的物化视图增量更新算法。特别地,Flink 还使用了 Apache Calcite 提供的 SQL 解析和优化模块来执行相关任务。

从 SQL 的语义上来讲,Flink 的 SQL 也实现了窗口语义,它们分别是:

  1. TUMBLE 函数提供了固定窗口
  2. HOP 函数提供了滑动窗口
  3. SESSION 函数提供了会话窗口

当这些函数出现在 Group By 当中时,Flink 将会把计算转变为窗口化执行的。

就 Join 而言,通过 Temporal Table 的概念,Flink 提供了覆盖静态表和动态表 Join Stream 的能力(静态表是不会变的动态表)。 由于实现了物化视图的增量更新算法,Flink 的 Table API 理论上可以实现无边界的 Stream 到 Stream 的 Join。 然而由于内部状态大小的限制,Flink 可能会按照 LRU 的 Cache 管理策略清理一些比较老的项目, 因此会导致结果的不准确。

因此,Flink 十分建议使用 Group By 等方法将 Join 处理成窗口内的。在这方面,使用 Java 的 Table API 要比直接输入 SQL 查询更方便些。这是因为 SQL 解析器需要通过其他方面的语法分析来推断窗口, 这方面实际上非常困难。

此外,Flink 的 SQL 只支持 EquiJoin,也就是说 Join 条件中一定要符合 (T1.a = T2.b) AND (....) 这样的形式。 这里 T1.aT2.b 是来自两个表当中的字段。怀疑 Flink 可能需要利用这个条件进行状态的内部管理和加快查询。

当构造好合适的查询之后,可以将某个查询输出到RetractStreamSink从而转换成附带增加/删除标记的行。 这样就可以把这些行输出到外部系统储存和进一步使用了。

总结

在本文中,我们介绍了 Stream SQL 查询执行的基本原理。 介绍了物化视图增量维护的算法并指出流式 SQL 处理实际上是物化视图增量维护的子问题。 我们还介绍了一些常见的流式处理的概念并结合 Apache Flink 对这些技术原理的实现进行了描述。

我们可以看到,现阶段大多数 Stream SQL 系统在实现功能和语义上仍然还有局限。 完全实现任意 SQL 查询高效增量执行仍又不可及。尽管如此,针对特定的查询需求和模式引入特别的实现方法和扩展新的语义, 现在的流式 SQL 系统已经可以覆盖相当宽广的需求。

同时我们也看到了好的 SQL 优化器和成本估算模型的重要性。它们在执行方案、 状态管理逻辑和时间空间成本平衡方面都发挥重要的作用,相关的研究更是方兴未艾。 一些研究方向包括:

  1. 更准确的统计信息搜集
  2. 更有效的成本估算模型
  3. 有动态适应能力的优化算法和执行模型

同时,作为数据集成(Data Integration)应用的一部分,数据输入输出源的管理、用户应用接口和改变查询语句的 DDL 操作等需求的实现也是非常有价值的方向。