Skip to content

第 12 章 流处理

一个能正常运转的复杂系统,无一例外都是从一个能正常运转的简单系统演化而来的。其逆命题似乎也成立:从零设计的复杂系统永远跑不起来,也无法被改造成能运转。

—— John Gall,《Systemantics》(1975)

第 11 章中我们讨论了批处理——读入一组文件作为输入、再产出一组新的输出文件。输出是一种派生数据,必要时再跑一次批处理就能重新生成。我们也看到这一简单却强大的想法可以用来构建搜索索引、推荐系统、分析等等。

然而第 11 章一直默认了一个大前提:输入是有界的——大小已知且有限——因此批处理过程知道自己何时读完输入。例如 MapReduce 中的核心排序操作必须先读完整个输入再开始产出输出,因为输入的最后一条记录有可能恰好是 key 最小的那一条、必须排在输出最前——因此提前产出结果并非选项。

可现实中很多数据是无界的:它们随时间逐步到达。你的用户昨天和今天产生了数据,明天还会继续产生。除非你倒闭,这一过程永不止息——数据集在任何有意义的层面上从不"完成"[1]。批处理器只能人为地把数据切成固定时长的块——比如每天结束时处理这一天的数据,或每小时结束时处理这一小时的数据。

按天跑批处理的问题在于:输入的变动要等一整天后才能反映到输出里,对许多缺乏耐心的用户来说太慢了。要缩短延迟,可以更频繁地跑——比如每秒处理上一秒的数据——甚至彻底放弃固定时间切片,每个事件来一条就处理一条。这正是流处理背后的思路。

笼统地说,指的是随时间逐步变得可用的数据。这一概念在许多地方都能见到:Unix 的 stdin 与 stdout、编程语言中的(惰性)列表 [2]、文件系统 API(如 Java 的 FileInputStream)、TCP 连接、互联网上的音视频传输等等。

本章我们将把事件流视为一种数据管理机制:它是上一章批数据的对应物——无界、增量处理的版本。我们先看流如何被表示、存储、跨网络传输;继而探讨流与数据库之间的关系;最后在第 513 页"处理流"中考察持续处理流的若干思路与工具,以及它们如何被用来构建应用。

传输事件流

批处理世界里,作业的输入与输出是文件(或许在分布式文件系统上)。流式世界中与之对应的又是什么?

输入是文件(一段字节流)时,第一步处理通常是把它解析成一连串记录。流处理语境中,记录更常被称为事件,但实质大同小异:一个小型、自包含、不可变的对象,承载着某一时刻发生之事的细节。事件通常含有一个时间戳,按时刻时钟(见第 359 页"单调时钟与时刻时钟")记录其发生的时间。

事件描述的"事"可能是用户的某次操作,比如查看一个页面或下单;也可能来自机器,比如温度传感器的周期读数或 CPU 利用率指标。第 454 页"用 Unix 工具做批处理"的例子里,Web 服务器日志的每一行就是一个事件。

事件可以编码成文本字符串、JSON,也可以采用第 5 章讨论过的二进制形式。这种编码让你既能存储一个事件——比如追加到文件、插入关系表、写到文档数据库——也能把它通过网络发给另一个节点处理。

批处理中一份文件写入一次后可被多个作业读取;类似地,流式术语里事件由生产者(也叫发布者发送者)生成一次,可能被多个消费者订阅者接收者)处理 [3]。文件系统中文件名标识一组相关记录;流式系统中相关事件通常被归入同一个主题

原则上文件或数据库就足以连接生产者与消费者:生产者把每个事件写入数据存储,每个消费者周期性地轮询数据存储,看自上次以来出现了哪些新事件。本质上这就是批处理在每天结束时处理一天数据所做的事。

然而要做到低延迟的持续处理,轮询就变得昂贵——数据存储并非为这种用法设计。轮询越频繁,每次返回新事件的请求所占比例越低、开销占比越大。更好的做法是:新事件出现时主动通知消费者。

数据库历来对这种通知机制支持得并不好。关系数据库通常带触发器,可对变更(如某行被插入到某张表)作出反应,但触发器能做的事很有限,且在数据库设计中本属事后追加 [4]。专门用来投递事件通知的工具因此应运而生。

消息系统

通知消费者有新事件的常见做法是使用消息系统:生产者发出包含事件的消息,消息再被推给消费者。这种系统我们在第 189 页"事件驱动架构"中已略有提及,下面更深入讨论。

Unix 管道或 TCP 连接这样的直接通信通道本身就可作为简易消息系统使用。但多数消息系统在此基础上做了扩展:Unix 管道与 TCP 只把单个发送者连到单个接收者,而消息系统允许多个生产者节点把消息发到同一主题,也允许多个消费者节点从某主题接收消息。

在这种发布/订阅模型下,不同系统采用的做法千差万别,没有放之四海而皆准的答案。要区分不同系统,下面两个问题尤其有用:

如果生产者发消息的速度比消费者处理的速度快,会怎样?

大体上系统有三种选择:丢消息、放队列里缓冲、对生产者施加背压(也叫流量控制,即阻塞生产者继续发送)。例如 Unix 管道与 TCP 都用背压:它们有一段固定大小的小缓冲区,缓冲区填满后发送方会被阻塞,直到接收方把数据从缓冲区取走(见第 353 页"网络拥塞与排队")。

如果消息被放进队列里缓冲,队列变大时会发生什么也很关键:队列大到装不下内存时系统会崩溃吗?还是把消息写到磁盘?写磁盘的话,磁盘访问对消息系统的性能影响有多大 [5]?磁盘写满了又怎么办 [6]?

如果节点崩溃或暂时下线,会丢消息吗?

与数据库一样,持久性可能要靠写盘、复制或两者并用来获得(见第 283 页旁注"复制与持久性"),这些都有代价。如果可以接受偶尔丢消息,往往能在同样硬件上获得更高的吞吐与更低的延迟。

是否可接受丢消息严重依赖具体应用。比如周期性传输的传感器读数与指标,偶尔丢一两条数据点也许无关紧要——反正稍后会再发一个更新值。然而要注意:一旦丢消息变多,指标可能不准而你都察觉不到 [7]。如果你在统计事件计数,那就更要可靠投递了——丢一条消息就意味着一次错计。

第 11 章中讨论过的批处理系统的一项好处是它们提供强可靠性保证:失败任务会被自动重试,失败任务的部分输出会被自动丢弃。最终结果就好像未发生过任何故障,这有助于简化编程模型。本章稍后我们将考察如何在流式语境中提供类似保证。

生产者到消费者的直接消息

不少消息系统采用生产者与消费者之间的直接网络通信,不经任何中介节点:

  • UDP 组播在金融行业被广泛用于股票行情等延迟敏感的流 [8]。尽管 UDP 本身不可靠,应用层协议可以恢复丢失的数据包(生产者必须保留已发包的记录,以便按需重传)。
  • ZeroMQ、nanomsg 等无代理消息库采用类似做法,在 TCP 或 IP 组播之上实现发布/订阅。
  • 一些指标采集代理(如 StatsD [9])使用不可靠的 UDP 收集网络上各机器的指标并监控(在 StatsD 协议下,只有所有消息都被收到时计数指标才正确;用 UDP 意味着指标充其量只是近似 [10],参阅第 354 页"TCP vs UDP")。
  • 如果消费者在网络上对外暴露了某种服务,生产者可以用 HTTP 或 RPC(见第 180 页"通过服务的数据流:REST 与 RPC")直接向其推送消息。这正是 webhook 的思路 [11]——一个服务的回调 URL 注册到另一个服务上,每当某事件发生便请求该 URL。

直接消息系统在它们所针对的场景下工作良好,但通常要求应用代码自己意识到丢消息的可能性;它们能容忍的故障也相当有限。即便协议本身能检测并重传网络上丢失的数据包,它们一般也假定生产者与消费者始终在线。

如果消费者暂时离线,它可能错过期间发送的消息。有些协议允许生产者重试失败的投递;但若生产者自己也崩溃、丢掉了原本要重发的消息缓冲区,这一方案就站不住脚了。

消息代理

另一种被广泛采用的方案是经由消息代理(也叫消息队列)发送消息:它本质上就是一种针对处理消息流而优化的数据库 [12],以服务器形式运行,生产者与消费者作为客户端连接到它。生产者把消息写入代理,代理再把消息投递给消费者。

把数据集中在代理中后,这类系统就更易容忍来去无常的客户端(连接、断开、崩溃),持久性的问题也转交到代理身上。一些消息代理只把消息保留在内存里,另一些(视配置而定)把消息写到磁盘以避免在代理崩溃时丢失。面对慢消费者,它们一般允许无界排队(而非丢消息或施加背压),不过这同样可能依赖配置。

排队的一项后果是消费方一般是异步的:生产者发消息时通常只等到代理确认已缓冲下来,并不等消息真正被消费者处理。消息投递给消费者会发生在未来某个不确定的时刻——常常在 1 秒之内,但有积压时也可能晚不少。

消息代理与数据库的对比

某些消息代理甚至能借 XA 或 JTA 参与两阶段提交协议(见第 328 页"跨异构系统的分布式事务")。这一能力让它们与数据库颇为相像。然而消息代理与数据库在实践中仍有重要差异:

  • 数据库通常一直保留数据,直到被显式删除;某些消息代理则在消息成功投递给消费者后自动删除——这类代理不适合用作长期数据存储。
  • 由于消息会被很快删除,多数消息代理假定其工作集相当小——也就是队列很短。如果代理因消费者慢而需要缓冲大量消息(甚至当消息装不下内存时溢出到磁盘),处理每条消息都会变慢,整体吞吐也可能下降 [5]。
  • 数据库通常支持二级索引以及多种用查询语言搜索数据的方式;消息代理通常的支持方式则是订阅与某种模式匹配的主题子集。两者本质上都是让客户端选取自己想知道的数据片段,但数据库提供的查询功能往往要先进得多。
  • 查询数据库时结果通常基于某一时点的快照:之后若另一客户端写入了什么改变了查询结果,第一个客户端并不会发现自己的旧结果已过期(除非它再查或轮询)。消息代理则相反——它不支持任意查询,消息发出后也不允许修改,但一旦数据发生变化(即出现新消息)便会通知客户端。

这是消息代理的传统视角,已被 JMS [13]、AMQP [14] 等标准固化,并在 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO Enterprise Message Service、IBM MQ、Azure Service Bus、Google Cloud Pub/Sub [15] 等软件中实现。也可以把数据库当成队列用,但要把它调到性能良好并不容易 [16]。

多消费者

多个消费者在同一主题里读消息时,常用两种主要模式(如图 12-1 所示):

负载均衡

每条消息只投给消费者中的某一个,消费者由此分担处理工作。代理可以任意把消息分给消费者。这种模式适合处理代价较高、希望并行加速的消息(在 AMQP 里可通过多个客户端从同一队列消费来实现负载均衡,JMS 里则称之为共享订阅)。

扇出

每条消息都投给所有消费者。扇出让若干彼此独立的消费者各自"收听"同一组广播消息而互不干扰——这相当于多个批处理作业读取同一份输入文件(这一特性在 JMS 里由主题订阅提供,在 AMQP 里由 exchange 绑定提供)。

(a) 负载均衡让多个消费者分担消费同一主题;(b) 扇出则把每条消息投给多个消费者

图 12-1. (a) 负载均衡让多个消费者分担消费同一主题;(b) 扇出则把每条消息投给多个消费者 两种模式可以组合——比如借助 Kafka 的消费者组特性。消费者组订阅某主题时,主题中的每条消息都会被投给该组中的某一个消费者(在组内负载均衡);如果两个独立的消费者组订阅了同一主题,每条消息都会被投给每组里的其一(即在消费者组之间扇出)。

确认与重投递

消费者随时可能崩溃。因此可能出现这种情形:代理把消息投给消费者,消费者从未处理它,或者只处理了一半就崩了。为确保消息不丢,消息代理使用确认机制:客户端必须显式告诉代理某条消息已处理完毕,代理才能把它从队列中移除。

如果与客户端的连接被关闭,或在收到确认前超时,代理就会假定该消息未被处理,于是把它投给另一个消费者。(注意一种情况:消息其实已被完整处理,只是确认在网络中丢了;处理这种情况需要原子提交协议——参阅第 329 页"恰好一次消息处理"——除非操作本就幂等或不要求恰好一次语义。)

与负载均衡结合起来时,重投递机制对消息顺序会带来有趣的影响。图 12-2 中消费者总体上按生产者发送的顺序处理消息。然而消费者 2 在处理 m3 时崩了,此时消费者 1 正在处理 m4。这条未被确认的 m3 随后被重投给消费者 1,结果消费者 1 处理消息的顺序变成 m4m3m5——m3m4 的投递顺序就与生产者 1 发送时的顺序不同了。

消费者 2 在处理 m3 时崩溃,因此 m3 稍后被重投给消费者 1

图 12-2. 消费者 2 在处理 m3 时崩溃,因此 m3 稍后被重投给消费者 1 即便消息代理本身尽力保留消息顺序(如 JMS 与 AMQP 标准要求那样),负载均衡与重投递结合起来仍不可避免会让消息被重排。要避免这一点,可以让每个消费者对应一个独立队列(即不使用负载均衡)。如果消息彼此完全独立则乱序无伤大雅;但若消息间存在因果依赖,重排就可能很关键,本章稍后会讲。

重投递还可能造成资源浪费、资源饥饿或流的永久阻塞。一个常见情形是生产者错误地序列化了一条消息——比如某个 JSON 编码对象漏了一个必需键。如果缺键的消息让消费者崩溃重启,消费者就不会确认该消息,代理便会再发一遍、把另一个消费者也搞崩,如此无限循环下去。若代理保证强顺序,它就再也无法前进;允许消息重排的代理还能继续前进,但会把资源浪费在永远不会被确认的消息上。

为此设计的死信队列(DLQ)派上用场:与其把消息留在当前队列里反复重试,不如把它移到另一个队列以解锁消费者 [17, 18]。监控通常会针对 DLQ 设置——队列里出现的任何消息都是错误。一旦发现新消息,运维可以选择直接丢弃它、手动修改并重投,或修复消费者代码以妥善处理。DLQ 在多数排队系统里很常见;Apache Pulsar 这类基于日志的消息系统、Kafka Streams 这类流处理系统现在也支持 DLQ [19]。

基于日志的消息代理

在网络上发送数据包或向网络服务发起请求,通常都是不留永久痕迹的瞬时操作。原则上可以把这种操作永久记录下来(用抓包加日志),但我们一般并不那么想。AMQP/JMS 风格的消息代理继承了这种瞬时消息的思路:即便把消息写到磁盘,投递给消费者后也会很快删除。

数据库与文件系统则采取相反思路:写入数据库或文件的一切通常都会被永久保留,至少要等到有人显式地把它删掉。

这一思路差异对派生数据的创建方式影响巨大。如第 11 章所讨论,批处理的一项关键特性是:你可以反复跑、对处理步骤反复实验,且没有损坏输入的风险(因为输入只读)。AMQP/JMS 风格的消息却不是这样:处理某条消息有破坏性后果——确认会让该消息从代理上被删除——所以你不能再跑同一个消费者并指望得到相同结果。

如果你给消息系统添加新的消费者,它通常只能开始接收注册之后发送的消息;先前的消息已经没了、找不回来。文件与数据库则相反:你可以随时加入新的客户端,它能读到任意久远以前写入的数据(只要应用没有显式覆盖或删除它)。

能否做出一个混合体——把数据库的持久存储与消息系统的低延迟通知能力结合起来?这正是基于日志的消息代理背后的思路,它近年来变得非常流行。

用日志做消息存储

日志就是磁盘上一段只追加的记录序列。第 4 章中我们以日志结构存储引擎与预写日志为语境讨论过它,第 6 章在复制语境中、第 10 章在共识语境中也讨论过。

同样的结构也可以用来实现消息代理:生产者把消息追加到日志末尾即可发送,消费者顺序读日志即可接收消息;消费者读到日志末尾时,便等待"有新消息追加"的通知。Unix 的 tail -f 命令——监视某个文件被追加新数据——本质上就是这样工作的。

为了把吞吐扩展到超出单块磁盘所能提供的水平,可以把日志分片(第 7 章意义上的分片)。不同分片可以放在不同机器上,让每个分片成为一份可独立读写、与其他分片彼此独立的日志;一个主题则可定义为一组承载同类消息的分片。如图 12-3 所示。

生产者把消息追加到主题的分区文件,消费者顺序读取这些文件

图 12-3. 生产者把消息追加到主题的分区文件,消费者顺序读取这些文件 在 Kafka 称为分区的每个分片中,代理为每条消息分配一个单调递增的序号,即偏移(图 12-3 中方框里的数字就是消息偏移)。这种序号有意义的前提是分区(分片)只追加,因此分区内消息完全有序;不同分区之间则不保证顺序。

Apache Kafka [20] 与 Amazon Kinesis Streams 都是基于日志的消息代理。Google Cloud Pub/Sub 在架构上类似,但对外暴露的是 JMS 风格 API 而非日志抽象 [15]。这些消息代理虽然把所有消息都写到磁盘,却能通过跨多机分片把吞吐做到每秒数百万条,并通过对消息做副本来实现容错 [21, 22]。

日志 vs 传统消息

基于日志的方法天然支持扇出消息——多个消费者可彼此独立地读日志,因为读取消息不会从日志中删除它。要在一组消费者之间做负载均衡,代理可以把整个分片分配给消费者组中的某个节点,而不是把单条消息分给消费者客户端。

每个客户端随后会消费它被分配到的那些分片中的全部消息。通常一个消费者拿到一个日志分片后,便以单线程方式按顺序读取分片中的消息。这种粗粒度负载均衡有几点不足:

  • 在一个主题上分担消费工作的节点数最多只能等于该主题的日志分片数——因为同一分片中的消息都被投给同一节点。(也可以设计共享分片的负载均衡方案:让两个消费者都读完整消息集,但一方只看偶数偏移、另一方只看奇数偏移;或者把消息处理分散到线程池里——但这种做法让消费者偏移的管理变得复杂。一般而言,单线程处理一个分片更可取;要提高并行度可以增加分片数。)
  • 若某条消息处理得慢,会拖住该分片中后续消息的处理(一种队头阻塞;见第 37 页"性能描述")。

因此,当消息处理代价较高、希望按消息粒度并行化、且消息顺序不太重要时,JMS/AMQP 风格的消息代理更可取;而当消息吞吐量大、每条消息处理快、消息顺序又很重要时,基于日志的方法效果非常好 [23, 24]。两种架构的边界正在变得模糊:Kafka 这类基于日志的消息系统现在也支持 JMS/AMQP 风格的消费者组,让多个消费者可以从同一分区接收消息 [25, 26]。

由于分片日志通常只在单分片内保持消息顺序,所有需要按一致顺序处理的消息都得被路由到同一分片。例如某应用可能要求与某个用户相关的事件按固定顺序到达,这可以通过基于事件中的 user ID 选择分片来实现(即让 user ID 作为分区键)。

消费者偏移

顺序消费一个分片让"哪些消息已被处理"很容易判断:偏移小于某消费者当前偏移的所有消息都已被处理,大于的尚未处理。因此代理无须为每条消息逐一跟踪确认,只需周期性地记录消费者偏移即可。这一较低的簿记开销以及由此带来的批处理与流水化机会,能显著提升基于日志系统的吞吐。一旦消费者失败,会从最近一次记录的偏移恢复,而非它最后看到的偏移;这意味着部分消息可能会被处理两次。

事实上这一偏移与单主数据库复制中常见的日志序列号非常相似,第 201 页"建立新的从节点"中讨论过:日志序列号让从节点在断开连接后能重新接上主节点恢复复制而不漏写。这里采用的是相同原理——消息代理扮演主节点的角色,消费者像从节点。

若某消费者节点失败,它的分片由消费者组中的另一个节点接手,从最近记录的偏移开始消费。如果该消费者已经处理了后续消息但还没记录其偏移,重启后这些消息会被重复处理一次。本章稍后会讨论应对这一问题的几种办法。

磁盘空间使用

如果一直只往日志追加,最终会把磁盘空间耗光。为回收磁盘空间,日志通常被切成段,旧段不时被删除或移到归档存储(更精巧的回收办法见第 505 页"日志压实")。

这意味着若某慢消费者落后到其偏移所指的段已被删除,它就会错过部分消息。日志实际上扮演了有界的环形缓冲区的角色——满了之后丢弃旧消息(也叫循环缓冲环形缓冲)。不过这一缓冲在磁盘上,可以做得相当大。

我们来粗略算一下。截至本文撰写时,一块大型机械硬盘的容量约 20 TB、顺序写吞吐约 250 MB/s。如果以最大速率写,22 小时左右就能写满,之后才不得不开始删除最旧的消息。这意味着基于磁盘的日志至少能缓冲 22 小时的消息——即便有许多机器、许多盘也是如此(更多盘既增加可用空间也增加总写带宽)。实际部署中很少把磁盘写带宽用满,所以日志通常能保留好几天乃至好几周的消息。

许多基于日志的消息代理现在把消息存放到对象存储中以扩大存储容量——做法类似第 202 页"由对象存储支撑的数据库"所讨论的数据库。Apache Kafka、Redpanda 这类代理把较旧的消息从对象存储中读出,作为分层存储的一部分;WarpStream、Confluent Freight、Bufstream 等则把所有数据都存在对象存储中。这种架构除了成本上的好处外,也让数据集成变得更简单:对象存储中的消息以 Iceberg 表形式存放,因此批处理与数据仓库作业可以直接对这些数据执行,而无须复制到另一个系统。

消费者跟不上生产者时

第 489 页"消息系统"开头我们讨论了消费者跟不上生产者时的三种选择:丢消息、缓冲,或施加背压。在这一分类中,基于日志的方法属于一种"用大但大小固定的缓冲区做缓冲"(受可用磁盘空间限制)。

若某消费者落后到所需的消息已经比磁盘上保留的更旧,它就再也读不到这些消息了——代理实际上已经丢掉了那些早于缓冲区容纳范围的旧消息。你可以监控消费者距日志头有多远,落后过多就告警。由于缓冲区很大,运维有充足时间去修慢消费者、让它在开始丢消息前赶上来。

即使消费者真的落后到开始漏消息,受影响的也只是那个消费者本身,并不会影响其他消费者的服务。这是一项巨大的运维优势:你可以出于开发、测试或调试目的实验性地消费一份生产日志,而不必担心扰乱生产服务。当一个消费者关闭或崩溃时,它就不再消耗资源——它留下的只有消费者偏移。

这一行为与传统消息代理形成对比:传统代理中,你必须小心把那些消费者已经关掉的队列也删掉,否则它们会无谓地堆积消息、占用本应留给活动消费者的内存。

重放旧消息

前面提过,AMQP/JMS 风格的消息代理中,处理并确认消息是一种破坏性操作——它会让消息从代理中被删除。基于日志的消息代理则不同:消费消息更像从文件中读取——这是一种只读操作,不会改变日志。

除了消费者输出之外,处理产生的唯一副作用是消费者偏移向前移动;而偏移由消费者控制,必要时可以轻易操控。例如你可以用昨天的偏移启动一份消费者副本,把输出写到另一位置,从而重新处理昨天的消息;你可以用不同的处理代码反复重做。

正是这一点让基于日志的消息更接近上一章的批处理:派生数据通过可重复的转换过程从输入数据中清晰地分离出来,便于实验、也便于从错误与 bug 中恢复,是在组织内部整合数据流的好工具 [27]。

数据库与流

我们已经把消息代理与数据库做了一些比较。尽管两者历来被视为不同类别的工具,我们看到基于日志的消息代理已经成功地把数据库的思想引入消息系统。我们也可以反过来,把消息与流的思想引入数据库。

一种做法是把事件流作为存储数据的真实数据源(见第 10 页"真实数据源与派生数据")。这正是事件溯源所做的事,第 101 页"事件溯源与 CQRS"中讨论过。与其用更新与删除来改变数据模型中的状态,你可以把每个状态变化都建模为一个不可变事件并写入只追加日志;任何读优化的物化视图都从这些事件派生而来。基于日志的消息代理(配置为永不删除旧事件)非常适合事件溯源——它们使用只追加存储,能以低延迟通知消费者新事件。

但你不必走到事件溯源那一步:即便用可变数据模型,事件流对数据库也很有用。事实上,对数据库的每次写都是一个可被捕获、存储与处理的事件。数据库与流之间的联系远不止"日志在磁盘上的物理存储"那么浅——这是相当根本的。

例如复制日志(见第 206 页"复制日志的实现")就是一条数据库写事件流,由主节点在处理事务时产出。从节点把这一写流应用到自己的数据库副本上,最终就得到一份与之准确一致的数据副本。复制日志中的事件描述的就是已经发生的数据变化。

我们在第 433 页"使用共享日志"中也见过状态机复制原则:如果每个事件代表对数据库的一次写、每个副本以相同顺序处理相同事件,那么所有副本最终会到达同一个最终状态(前提是事件处理是确定性的)。这就是事件流的又一个例子!

本节我们先看异构数据系统中出现的一个问题,再探讨如何借鉴事件流的思想到数据库上来解决它。

保持系统同步

如本书所见,没有哪一个系统能满足所有的数据存储、查询与处理需求。实际上多数稍复杂的应用都得组合几种技术来满足要求——例如用一个 OLTP 数据库服务用户请求,用缓存加速常见请求,用全文索引处理搜索查询,用数据仓库做分析。每种系统都按各自用途、以各自的表示形式存放着自己的一份数据。

由于同一份或相关的数据出现在多个地方,它们之间必须保持同步。数据库中某一项被更新时,缓存、搜索索引、数据仓库中相应项也都得更新。对数据仓库而言,这种同步通常由 ETL 流程完成(见第 7 页"数据仓库"),常见做法是把数据库整体复制一份、做转换,再批量加载到数据仓库——也就是一种批处理。第 476 页"批处理用例"中我们也看到搜索索引、推荐系统、其他派生数据系统同样可以用批处理建立。

如果周期性的全量数据库导出太慢,有时会改用双写:应用代码在数据变化时显式写入每个系统——例如先写数据库、再更新搜索索引、再让缓存项失效(或者并发地做这些写入)。

然而双写有严重问题,其一便是图 12-4 所示的竞态条件。例子中两个客户端并发想更新同一项 X:客户端 1 想把它设为 A,客户端 2 想把它设为 B。两个客户端都先把新值写入数据库,再写入搜索索引。由于时序不巧,请求被交错执行:数据库先看到客户端 1 的写、再看到客户端 2 的写,因此最终 X 在数据库中为 B;搜索索引先看到客户端 2 的写、再看到客户端 1 的写,因此最终 X 在搜索索引中为 A。两个系统从此永久彼此不一致,尽管整个过程并未发生任何错误。

在数据库中 X 先被设为 A 再被设为 B,而在搜索索引中两次写入到达的顺序相反

图 12-4. 在数据库中 X 先被设为 A 再被设为 B,而在搜索索引中两次写入到达的顺序相反 除非你有额外的并发检测机制(比如第 237 页"检测并发写"中讨论过的版本向量),否则你甚至不会注意到曾发生过并发写——一个值会被另一个静悄悄地覆盖。

双写的另一个问题是:其中一次写可能成功、另一次失败。这是个容错问题而非并发问题,但同样会让两个系统彼此不一致。要确保两次写要么都成功、要么都失败,就是原子提交问题,代价昂贵(见第 324 页"两阶段提交")。

如果你只有一个具备单一主节点的复制数据库,主节点决定写顺序,因此状态机复制方法在数据库的副本之间就能工作。然而图 12-4 中并没有单一主节点:数据库可能有自己的主节点,搜索索引也可能有自己的主节点,但谁也不跟随谁,因此会发生冲突(见第 215 页"多主复制")。

如果真的有一个单一主节点(比如就是数据库),并能让搜索索引成为数据库的从节点,情况就会更好。但实践中这是否可行?

变更数据捕获

多数数据库的复制日志的问题在于:它们历来被视为数据库的内部实现细节,而非公开 API。客户端应通过数据模型与查询语言来查询数据库,而不是去解析复制日志、试图从中抽取数据。

数十年间,许多数据库根本没有任何文档化的方式来获取已写入的变更日志。这让你很难把一个数据库里的所有变更取走,再复制到另一种存储技术(如搜索索引、缓存或数据仓库)。

近年来对变更数据捕获(CDC)的兴趣与日俱增。CDC 即观察写入数据库的所有数据变更、并以一种可复制到其他系统的形式抽取出来的过程 [28]。一旦变更能在写入的同时以的形式立即可用,CDC 就格外有意思。

例如你可以在数据库里捕获变更,并持续把这些变更应用到搜索索引上。只要变更日志按相同顺序应用,搜索索引中的数据就可以与数据库中的数据保持一致。搜索索引以及任何其他派生数据系统都不过是变更流的消费者。

图 12-5 展示了如何用 CDC 解决图 12-4 的并发问题。即使把 X 设为 A 与 B 的两个请求并发到达数据库,数据库会决定以何种顺序执行它们、并按该顺序写入复制日志;搜索索引随后按相同顺序应用。如果你需要把数据放到其他系统(如数据仓库),只需让它作为 CDC 事件流的另一个消费者订阅即可。

把数据库提交的变更同序传播到下游系统

图 12-5. 把数据库提交的变更同序传播到下游系统

实现 CDC

我们可以把日志消费者称为派生数据系统,如第 10 页"真实数据源与派生数据"所述:搜索索引或数据仓库中存的数据不过是真实数据源中数据的另一种视图。CDC 即一种机制,确保所有对真实数据源的变更都被反映到派生数据系统上,让派生系统拥有数据的准确副本。

本质上 CDC 把一个数据库变成主节点(变更从这里被捕获)、把其他都变成从节点。基于日志的消息代理非常适合把变更事件从源数据库送到派生系统,因为它能保留消息顺序(避免图 12-2 中的乱序问题)。

可以借助逻辑(行级)复制日志实现 CDC(见第 208 页"逻辑(基于行的)日志复制"),但其中有些挑战,例如处理 schema 变更、正确建模更新等。Debezium 这一开源项目正是为此而生,它提供针对 MySQL、PostgreSQL、Oracle、SQL Server、Db2、Cassandra 等众多数据库的 source connector。这些连接器挂在数据库的复制日志上,把变更以标准事件 schema 暴露出来;消息随后可被转换并写到下游数据库。Kafka Connect 框架也提供了多种数据库的 CDC 连接器。Maxwell 通过解析 binlog 为 MySQL 做类似的事 [29],GoldenGate 为 Oracle 提供类似能力,pgcapture 则为 PostgreSQL 做同样的事。

与消息代理一样,CDC 通常是异步的:真实数据源数据库不会等变更被应用到消费者后才提交事务。这一设计的运维优势是慢消费者不会过分影响真实数据源;缺点是所有复制延迟问题都适用(见第 209 页"复制延迟下的问题")。

初始快照

如果你拥有数据库自创建以来所有变更的日志,就可以通过重放日志重建数据库的整个状态。然而很多情况下,永远保留所有变更会占用太多磁盘空间,重放也会花太久,所以日志要被截断。

例如建立一个新的全文索引,需要数据库的完整副本——只应用最近变更日志是不够的,因为它会漏掉最近未更新的项。因此如果你没有全部日志历史,就需要从一份一致快照开始(见第 201 页"建立新的从节点")。

数据库快照必须对应到变更日志中已知的某个位置或偏移,这样你才知道处理完快照后从哪一点开始应用变更。一些 CDC 工具集成了这一快照功能,另一些则把它当作手动操作。Debezium 使用 Netflix 的 DBLog 水位线算法提供增量快照 [30, 31]。

日志压实

如果你只能保留有限的日志历史,每次想加一个新的派生数据系统时都得跑一次快照过程。日志压实提供了一个不错的替代。

我们之前在第 118 页"日志结构存储"中以日志结构存储引擎为例讨论过日志压实(图 4-3 给出例子)。原理简单:存储引擎周期性地查找带有相同 key 的日志记录,丢弃任何重复,只保留每个 key 最近的一次更新。这能让日志段大幅缩小,所以这些段也可以在压实过程中合并,如图 12-6 所示。整个过程在后台运行。

在这一键值对日志中,键是某个猫视频的 ID(mew、purr、scratch 或 yawn),值是该视频被播放的次数;日志压实只为每个 key 保留最近的值

图 12-6. 在这一键值对日志中,键是某个猫视频的 ID(mew、purr、scratch 或 yawn),值是该视频被播放的次数;日志压实只为每个 key 保留最近的值 在日志结构存储引擎中,带特殊 null 值(墓碑)的更新表示某 key 被删除,导致它在日志压实时被移除。但只要某 key 没有被覆盖或删除,它就会永远留在日志里。压实后日志所需的磁盘空间只取决于数据库当前内容,与对数据库做过的写入次数无关。如果同一 key 频繁被覆盖,旧值最终会被回收,只保留最新值。

同样的思想也适用于基于日志的消息代理与 CDC。如果 CDC 系统设置成每个变更带一个主键、且每个 key 的更新都替换该 key 的前一个值,那么对某个 key 只保留最近一次写就够了。

现在每当你想重建某个派生数据系统(如搜索索引),就可以从日志压实主题的偏移 0 起一个新的消费者,顺序扫描日志中所有消息。日志保证含有数据库中每个 key 最近的值(也许还有些更老的值)。换言之,无须再对源数据库做一次快照,你就能用它得到数据库内容的完整副本。

Apache Kafka 支持这一日志压实特性。如本章后面所见,它让消息代理可以用作持久存储而不仅是瞬时消息。

变更流的 API 支持

如今多数流行数据库已把变更流作为一等接口暴露出来,而不是事后改造、反向工程的 CDC。MySQL、PostgreSQL 等关系型数据库通常通过其用于自身副本的同一个复制日志发出变更。多数云厂商也提供其产品的 CDC 解决方案——例如 Datastream 为 Google Cloud 的关系型数据库与数据仓库提供数据流式访问。

即便是最终一致、基于 quorum 的数据库(如 Cassandra)现在也支持 CDC。如第 411 页"实现线性一致系统"所见,客户端必须把写持久化到多数节点才能被认为可见。给 quorum 写做 CDC 支持有难度,因为没有单一真实源可以订阅。能否看到数据取决于每个读者的一致性偏好。Cassandra 通过为每个节点暴露原始日志段绕开这一问题,而非提供单一变更流。希望消费数据的系统必须读取每个节点的原始日志段,并自行决定如何把它们合并为单一流(与 quorum 读者所做大体相似)[32]。

Kafka Connect [33] 把适用于多种数据库的 CDC 工具与 Kafka 集成。一旦变更事件流入 Kafka,便可被用来更新搜索索引等派生数据系统,或喂给流处理系统——后者本章稍后会讨论。

CDC 与事件溯源

CDC 与事件溯源比较起来如何?与 CDC 相似,事件溯源也涉及把对应用状态的所有变更存为变更事件日志。最大区别在于事件溯源把这一思想应用在不同的抽象层次:

  • 在 CDC 中,应用是以可变方式使用数据库的——任意更新、删除记录。变更日志是从数据库的低层(例如解析复制日志)抽取的,从而保证从数据库抽出的写顺序与它们实际写入时的顺序一致,避免图 12-4 的竞态。
  • 在事件溯源中,应用逻辑显式建立在不可变事件被写入事件日志这一基础之上。事件存储是只追加的,对事件的更新或删除被劝阻或禁止。事件被设计为反映应用层级发生的事情,而非低层状态变化。

哪种更好取决于你的情形。对一个尚未采用事件溯源的应用而言,引入事件溯源是一次大改动,有不少利弊,第 101 页"事件溯源与 CQRS"中已讨论过。CDC 则可以以最小改动加到既有数据库上——写入数据库的应用甚至可能根本不知道 CDC 在发生。

变更数据捕获与数据库 schema

虽然 CDC 看起来比事件溯源更易采用,它也有自己一套挑战。在微服务架构里,数据库通常只被一个服务访问,其他服务通过该服务的公共 API 与之交互,并不直接访问数据库。这让数据库成了服务的内部实现细节,开发者可以在不影响公共 API 的情况下修改其 schema。

然而 CDC 系统在复制数据时通常使用上游数据库的 schema,使这些 schema 变成必须像服务公共 API 一样被管理的公共 API。从数据库表中删一列会破坏依赖该字段的下游消费者。这种挑战在数据流水线里早已存在,但通常它们仅由数据仓库 ETL 处理。由于 CDC 通常以数据流形式实现,其他生产服务也可能成为消费者。让这些消费者中断会导致面向用户的故障 [34]。数据契约常被用来防止此类破坏。

把内部 schema 与外部 schema 解耦的常见做法是采用 outbox 模式。outbox 是带有自己 schema 的表,对外暴露给 CDC 系统使用,而非数据库内部领域模型 [35, 36]。开发者可以放手修改内部 schema,而 outbox 表保持不变。这看起来很像一次双写——确实是。但 outbox 避免了第 501 页"保持系统同步"中讨论的问题,因为两次写都在同一系统(数据库)内进行:可以让两次写在同一事务里出现。

不过 outbox 也有一些取舍。开发者仍需维护内外 schema 之间的转换,这可能颇具挑战。outbox 还增加了数据库要写入底层存储的数据量,可能引发性能问题。

与 CDC 相同,重放事件日志可以重建系统当前状态。但日志压实对它们的处理方式不同:

  • 表示更新某条记录的 CDC 事件通常包含该记录的全新版本,因此某主键的当前值完全由该主键的最近事件决定,日志压实可以丢弃同一 key 的旧事件。
  • 事件溯源中事件建模在更高层次。一个事件通常表达用户动作的意图,而非该动作引发的具体状态更新机制。这种情况下后续事件通常并不覆盖前面的事件,因此你需要完整事件历史才能重建最终状态。同样意义上的日志压实并不可行。

使用事件溯源的应用通常会有一种机制,把从事件日志派生的当前状态做快照保存,从而无须反复重新处理整个日志。然而这只是加速读取与崩溃恢复的性能优化;意图是系统能永远保留所有原始事件、必要时重新处理整份事件日志。第 512 页"不可变性的局限"中将讨论这一假设。

状态、流与不可变性

第 11 章中我们看到批处理因输入不可变而获益良多——你可以在既有输入文件上做实验性处理而不必担心损坏它们。这一不可变性原则正是事件溯源与 CDC 之所以如此强大的根本。

我们通常把数据库视为存储应用当前状态的东西。这种表示是为读优化的,通常也是服务查询最方便的形式。状态本就会变,所以数据库支持更新与删除,而非只支持插入。这与不可变性如何调和?

任何会变的状态,都是随时间累积、修改它的事件的结果。例如你当前的可订座位列表,是你已处理过的所有预订的结果;当前账户余额是账户上历次贷记借记的结果;Web 服务器响应时间图表则是发生过的所有 Web 请求各自响应时间的聚合。

无论状态怎么变,总有一连串事件引起了这些变化。即使事情被做又被撤销,这些事件曾经发生过这一事实始终为真。关键思想是:可变状态不可变事件的只追加日志并不矛盾——它们是同一枚硬币的两面。所有变更的日志,即变更日志(changelog),代表着状态随时间的演化。

如果你有数学倾向,会说应用状态是事件流随时间的积分、而变更流是状态对时间的微分(如图 12-7 所示)[37, 38]。这一类比有局限(例如状态的二阶导数似乎并无意义),但作为思考数据的一个起点很有用。

当前应用状态与事件流的关系

图 12-7. 当前应用状态与事件流的关系 如果你把变更日志持久化下来,那么状态就变得可重现。如果你把事件日志视为真实数据源,把任何可变状态都视为它的派生,那么对系统中数据流的推理就会容易得多。正如 Jim Gray 与 Andreas Reuter 在 1992 年所言 [39]:

从根本上说没必要保留数据库;日志里已含一切信息。保留数据库(也就是日志的当前末端)的唯一原因是检索操作的性能。

日志压实是一种弥合日志与数据库状态二者差异的方法:压实只保留每条记录的最新版本,丢弃被覆盖的版本。

不可变事件的优势

不可变性在数据库中并非新思想。例如会计师们已经把不可变性用于记账数百年。每次发生交易就被记入只追加的总账,本质上就是一份描述资金、商品或服务易手的不可变事件日志。账目(如损益表或资产负债表)通过把总账中的交易加和派生出来 [40]。

如果出错,会计师并不擦除或修改总账中的错误交易;他们另加一条补偿交易抵消错的——例如退还误扣金额。错的交易仍永远留在总账中,因为出于审计目的它可能很重要。如果由错总账派生出的错误数字已经发布出去,下个会计期间的数字会包含一份订正。这一过程在会计中完全正常 [41]。

虽然这种可审计性在金融系统中尤为重要,对许多并不受这种严格监管的系统也很有益。如果你不小心部署了把坏数据写到数据库的有 bug 代码,倘若代码可以破坏性地覆盖数据,恢复就会困难得多;用只追加的不可变事件日志,诊断发生了什么并恢复就容易得多。同样客服可以用审计日志来诊断客户请求与投诉。

不可变事件还能捕获到比当前状态更多的信息。例如在购物网站上,顾客可能把商品加进购物车再取出。尽管从订单履行角度看第二个事件抵消了第一个,但出于分析目的"顾客曾考虑过某商品但又决定放弃"也许是有用的——也许他们将来会买,也许找了替代品。这一信息记录在事件日志中,但在删除项就抹去信息的数据库里就丢失了。

从同一事件日志派生多种视图

通过把可变状态从不可变事件日志中分离出来,你可以从同一事件日志派生多种读优化的表示——就像有多个流的消费者一样(图 12-5)。例如分析数据库 Druid 通过这种做法直接从 Kafka 摄入数据,Kafka Connect sink 可以把数据从 Kafka 导出到各种数据库与索引中。

显式做"从事件日志到数据库"的翻译,让你的应用更易随时间演化。如果想引入一种以新方式呈现既有数据的新功能,可以用事件日志为这一新功能构建一个独立的读优化视图、与既有系统并行运行而无需修改它们。运行新旧系统并行通常比在既有系统中做复杂 schema 迁移更容易。一旦读者切换到新系统、旧系统不再需要,就可以关掉它、回收资源 [42, 43]。

我们在第 101 页"事件溯源与 CQRS"中遇到过这一思路——用一种写优化形式写数据,再按需翻译为各种读优化表示。这一过程不一定需要事件溯源;从 CDC 事件流出发也能构建多个物化视图 [44]。

传统的数据库与 schema 设计基于"数据要按它将被查询的形式写入"这一谬误。围绕规范化与反规范化的争论(见第 72 页"规范化、反规范化与连接")若你能从一份写优化的事件日志翻译到读优化的应用状态,便基本无关紧要了:在读优化视图里把数据反规范化是完全合理的,因为翻译过程为你提供了让它与事件日志保持一致的机制。

第 34 页"案例研究:社交网络主页时间线"讨论过社交网络的主页时间线——某用户所关注的人最近发帖的缓存(就像收件箱)。这是另一个读优化状态的例子:主页时间线是高度反规范化的,因为你的帖子被复制到所有关注你的人的时间线里。然而扇出服务在新帖与新关注关系下持续保持这一重复状态同步,让重复变得可管理。

并发控制

CQRS 的最大缺点是事件日志的消费者通常是异步的——用户可能给日志写一条然后从派生视图读取,发现自己的写还未被反映出来。我们在第 210 页"读己之写"中讨论过这一问题及可能的解法。

一种解法是与把事件追加到日志同步地更新读视图。这要求在事件日志与派生视图之间做分布式事务,或者用某种方式等到事件被反映到视图里。两种通常都不实用,所以视图通常异步更新。

另一方面,从事件日志派生当前状态也简化了并发控制的某些方面。多对象事务的需求(见第 284 页"单对象与多对象操作")很多源自单一用户动作要在多处变更数据。借助事件溯源,你可以设计一个事件,使其本身就是某用户动作的自包含描述:该用户动作就只在一个位置需要写——把事件追加到日志——而这容易做成原子操作。

如果事件日志与应用状态以同样方式分片(例如某分片 3 上顾客对应的事件只更新该应用状态的分片 3),单线程消费日志的写就无须并发控制——按构造它一次只处理一个事件(见第 309 页"实际串行执行")。日志通过为某分片中的事件定义串行顺序,消除了并发的非确定性 [27]。如果一个事件触及多个状态分片,要做的工作多一点,第 13 章会讨论。

许多并不使用事件溯源模型的系统也依赖不可变性来做并发控制。许多数据库内部使用不可变数据结构或多版本数据来支持点-时刻快照(见第 298 页"索引与快照隔离")。Git、Mercurial、Fossil 这类版本控制系统也依赖不可变数据来保留文件版本历史。

不可变性的局限

把所有变更的不可变历史永远保留下去在多大程度上可行?答案取决于数据集中的更改量。某些工作负载主要是新增、很少更新或删除——容易做成不可变的。另一些工作负载在相对较小的数据集上更新与删除频繁——这种情况下不可变历史可能膨胀得难以承受、出现碎片,压实与垃圾回收的性能就成为运维稳健性的关键 [45, 46]。

除性能原因外,你也可能因行政或法律原因不得不删除数据,尽管它本来不可变。例如 GDPR 这类隐私法规要求按需删除用户的个人信息、抹除错误信息,或封堵敏感信息的意外泄露。

这类情况下,仅向日志再追加一个事件以表示"先前数据应被视为删除"是不够的——你实际上想要重写历史,假装那条数据从未写过。例如 Datomic 把这一特性叫excision [47],Fossil 版本控制系统有个相似概念叫 shunning [48]。

真正删除数据出乎意料地难 [49]:副本可能存在于很多地方。存储引擎、文件系统、SSD 通常写到新位置而不是原地覆盖 [41],备份也常被有意做成不可变以防止意外删除或损坏。

让不可变数据可被删除的一种办法是加密粉碎(crypto-shredding)[50]:未来可能想删除的数据加密存储;想删除时丢掉加密密钥即可。加密的数据仍在那里,但没人能用了。

某种意义上这只是把问题挪了挪:实际数据仍不可变,但你的密钥存储是可变的。此外你必须事先决定哪些数据用同一密钥加密,又何时要换不同密钥——这是个重要决定,因为之后你可以加密粉碎掉某密钥下的所有或没有数据,但不能只删一部分。给每个数据项单独存一份密钥太笨重——密钥存储会膨胀到与主数据存储一样大。可撤销加密(puncturable encryption)[51] 这类更精细的方案能选择性地撤销某密钥的解密能力,但还没被广泛使用。

总的来说,删除更多关乎"让数据更难被检索",而非真正"让数据无法被检索"。尽管如此,有时也得试一试,第 596 页"立法与自我监管"中将再讨论。

处理流

到目前为止本章谈了流从哪里来(用户活动事件、传感器、对数据库的写)以及流如何被传输(直接消息、通过消息代理、事件日志)。

剩下要讨论的,便是拿到流之后能用它做什么——也就是处理它。大体上有三种选择:

  1. 把事件中的数据写入数据库、缓存、搜索索引或类似存储系统,再由其他客户端查询。如图 12-5 所示,这是让数据库与系统其他部分保持同步的好办法——尤其当流的消费者是该数据库的唯一写者时。把数据写到存储系统是流式版的"批处理用例"(见第 476 页)。
  2. 以某种形式把事件推给用户——比如发邮件提醒或推送通知,或把事件流到实时仪表盘上做可视化。此时人是流的最终消费者。
  3. 处理一个或多个输入流以产出一个或多个输出流。流可能要经过若干处理阶段组成的流水线,最终落到某个输出(选项 1 或 2)。

本章余下篇幅讨论选项 3:处理流以产出其他派生流。处理流的这种代码段被称为算子作业,与第 11 章里讨论的 Unix 进程与 MapReduce 作业密切相关,数据流模式也相似:流处理器以只读方式消费输入流,把输出以追加方式写到另一位置。

流处理器中的分片与并行化模式也与 MapReduce 及第 11 章中的数据流引擎非常相似,这里不再赘述。基本的映射操作(如转换与过滤记录)也工作方式相同。

与批作业相比一个关键差别是:流永远不会结束。这一差异蕴含许多影响。如本章开头所讨论,对无界数据集做排序毫无意义,因此排序-归并连接(见第 471 页"连接与分组")就用不上了。容错机制也必须改变:跑了几分钟的批作业里失败的任务可以简单地从头重启;但跑了几年的流作业崩溃后从头重启就未必可行。

流处理的用途

流处理长期以来用于监控——组织希望在某些事发生时收到告警。例如:

  • 欺诈检测系统需要判断信用卡使用模式是否出现异常变化,若可能被盗就冻结该卡。
  • 交易系统需要监视金融市场上的价格变化,并按规定规则执行交易。
  • 制造系统需要监控工厂中机器的状态,出现故障迹象时迅速定位问题。
  • 军事与情报系统需要追踪潜在攻击者的活动,在出现攻击迹象时报警。

这类应用需要相当精细的模式匹配与关联分析。不过流处理的其他用法也随时间陆续出现,本节我们简要比较其中几种。

复杂事件处理

复杂事件处理(CEP)是 1990 年代发展起来的一种分析事件流的方法,特别面向那种需要查找某些事件模式的应用 [52]。正如正则表达式让你在字符串中查找特定字符模式,CEP 让你定义规则在流中查找特定事件模式。

CEP 系统通常用 SQL 这类高层声明式查询语言或 GUI 来描述要检测的事件模式。这些查询被提交给一个处理引擎——它消费输入流并在内部维护一台状态机以执行所需的匹配。匹配上时引擎发出一个复杂事件(CEP 的名字由此而来),其中带有所检测到事件模式的细节 [53]。

这类系统里查询与数据的关系与普通数据库正好相反。数据库通常持久存放数据、把查询当作瞬时的——查询到来时数据库搜索匹配该查询的数据,查询完成后就把它忘掉。CEP 引擎反其道而行之:查询长期驻留;事件到达时引擎检查它是否匹配某条常驻查询 [54]。

CEP 的实现包括 Esper、Apama、TIBCO StreamBase。Flink、Spark Streaming 这类分布式流处理器也支持对流的声明式 SQL 查询。

流分析

流处理也用于在流上做分析。CEP 与流分析之间的边界模糊,但一般来说,流分析较少关注检测特定事件序列,更多关注大量事件上的聚合与统计指标。例如:

  • 度量某种事件的发生速率(每段时间出现多少次)
  • 计算某值在某时间段内的滚动平均
  • 把当前统计与此前时间段比较(如检测趋势、对比上周同期异常高或异常低的指标做告警)

这类统计通常按固定时间区间计算——比如你可能想知道过去 5 分钟里每秒平均查询数及该期间 99 百分位响应时间。在几分钟上做平均能平掉秒级的不相关波动,又能及时反映流量模式的变化。聚合所用的时间区间叫做窗口,第 518 页"关于时间的推理"会更详细讨论。

流分析系统有时使用概率算法,比如布隆过滤器(见第 122 页"布隆过滤器")做集合成员测试、HyperLogLog [55] 做基数估计,以及各种百分位估计算法(见第 42 页"计算百分位")。概率算法产出近似结果,但好处是在流处理器上所需的内存远少于精确算法。这一近似算法的使用有时让人误以为流处理系统总是有损、不精确——其实并非如此。流处理本身并不内含近似,使用概率算法只是一种优化 [56]。

许多开源分布式流处理框架——如 Apache Storm、Spark Streaming、Flink、Samza、Apache Beam、Kafka Streams——都是为分析而设计的 [57]。托管服务包括 Google Cloud Dataflow 与 Azure Stream Analytics。

维护物化视图

我们看到对数据库的变更流可以用来让派生数据系统(如缓存、搜索索引、数据仓库)与源数据库保持最新。这就是维护物化视图的例子:在数据集上派生一种替代视图以高效查询,并在底层数据变化时更新该视图 [37]。

类似地,事件溯源中应用状态由事件日志逐步应用维护——这里应用状态本身也是一种物化视图。与流分析场景不同的是,仅考虑某段时间窗口里的事件通常不够;构建物化视图潜在地需要任意时间区间上的所有事件——除了被日志压实丢弃的过期事件。实际上你需要一个一直延伸到时间起点的窗口。

原则上任何流处理器都能做物化视图维护,尽管"必须永远保留事件"这一需求与一些主要在有限时长窗口上工作的分析框架的假设相悖。Kafka Streams 与 Confluent 的 ksqlDB 在 Kafka 对日志压实的支持之上,明确支持这种用法 [58]。

增量视图维护

数据库似乎很适合维护物化视图——毕竟它们就是用来保留某数据集的完整副本的。许多数据库也支持物化视图。我们在第 143 页"物化视图与数据立方体"中看到,数据仓库中典型的分析查询可以物化成 OLAP 数据立方体。

然而数据库通常通过周期性批作业或按需请求(如 PostgreSQL 的 REFRESH MATERIALIZED VIEW)来刷新物化视图表,并不在每次源数据更新时都做。这种做法对流处理视图维护有两个明显缺点:

效率差

每次更新视图都重新处理所有数据,尽管多数数据可能并未变化。

数据新鲜度低

源数据的变更直到下一次计划更新时才反映到物化视图里。

当数据易于分区且计算天然增量时,可以写数据库触发器来高效地更新物化视图。例如某物化视图维护"按天的总销售额",每次新销售出现时把对应那天的行更新即可。这类定制方案对少数情况奏效,但许多 SQL 查询并不易被转化为高效的增量计算。

增量视图维护(IVM)是上述问题的更通用解。IVM 技术把用 SQL 或其他语言写的查询转换成能做增量计算的算子。它不再处理整个数据集,而只重算并更新那些变了的数据 [38, 59, 60]。视图计算因此高效得多;这意味着更新可以更频繁地跑,从而显著提高数据新鲜度。

Materialize [61]、RisingWave、ClickHouse、Feldera 等数据库都用 IVM 技术高效提供增量物化视图。这些数据库摄入事件流以实时暴露物化视图。最近事件先在内存中缓冲,再周期性地用以更新磁盘上的物化视图。读则把最近事件与已物化的数据合起来,提供单一的实时视图。由于读常以 SQL 表达、物化视图常以 OLAP 风格格式存储,这类系统也支持第 11 章中讨论的那种数据仓库风格的大规模查询。

在流上搜索

除了 CEP 那种由多个事件构成的模式搜索,有时也需要按复杂条件(例如全文搜索查询)在每个事件上做搜索。比如媒体监控服务订阅来自媒体机构的新闻文章与广播信息源,搜索任何提及公司、产品、感兴趣话题的新闻。要实现的方法是预先制定一份搜索查询,然后持续把每条新闻流过来时都和这个查询对比。某些网站也有类似功能——例如房产网站的用户可以请求在符合其搜索条件的新房源上市时收到通知。Elasticsearch 的 percolator 特性 [62] 是实现这种流上搜索的一种选择。

传统搜索引擎先索引文档、再对索引执行查询;而流上搜索把这个过程颠倒过来:查询被存放,文档随它们到达而被一一与查询比对——就像 CEP 一样。最简单情形下,可以让每个文档都对照每个查询测试一遍——但若查询数很多就会慢。为优化此过程,可以同时给查询与文档建索引,从而缩小可能匹配的查询集合 [63]。

事件驱动架构与 RPC

第 189 页"事件驱动架构"中我们把消息传递系统作为 RPC 的替代讨论过。这一服务间通信机制被用于例如 actor 模型。

虽然这些系统也基于消息和事件,我们通常不把它们视为流处理器,原因有几条:

  • actor 框架主要是一种管理并发与通信模块分布式执行的机制,而流处理主要是数据管理技术。
  • actor 之间的通信通常是瞬时的、一对一的,而事件日志是持久的、多订阅者的。
  • actor 之间可以以任意方式通信(包括循环的请求/响应模式),而流处理器通常被设置为无环流水线——其中每个流是某个特定作业的输出,并由一组定义良好的输入流派生而来。

不过 RPC 风格系统与流处理之间也有重叠。例如 Apache Storm 有个特性叫分布式 RPC,让用户查询能扇出给一组也处理事件流的节点;这些查询随后与输入流中的事件交织起来,结果再聚合发回用户。

也可以借助 actor 框架处理流。然而许多此类框架在崩溃情况下不保证消息投递,因此除非另外加重试逻辑,处理就并非容错的。

关于时间的推理

流处理器经常要处理时间,尤其是跑分析任务时——这类任务常用"过去五分钟的平均值"这样的时间窗口。"过去五分钟"看似清晰无歧义,意外的是它其实相当棘手。

在一次批处理中,处理任务一气呵成、迅速地跑完大批历史事件。如果需要按时间做某种切分,批处理过程必须看每条事件中嵌入的时间戳——看运行该处理那台机器的系统时钟是没意义的,因为它的运行时间与事件实际发生的时间毫无关系。

批处理可以在几分钟内读完一年的历史事件;多数情况下我们感兴趣的时间线是这一年历史,而不是处理过程那几分钟。况且使用事件中的时间戳让处理具备确定性:在同样输入上再跑一次会得到同样结果。

而许多流处理框架使用处理机器上的本地系统时钟(处理时间)来决定窗口 [64]。这种做法简单,且当事件创建与事件处理之间的延迟可忽略时倒也合理;但一旦处理出现明显延迟(即处理明显晚于事件实际发生时间),它就站不住脚了。

事件时间 vs 处理时间

处理可能因多种原因延迟:排队、网络故障、消息代理或处理器中的性能争用、流消费者重启,或从故障恢复后、修复 bug 后对过去事件做重新处理,等等。

消息延迟还可能让消息出现不可预测的顺序。例如某用户先发起一次 Web 请求(由 Web 服务器 A 处理),再发起第二次请求(由 Web 服务器 B 处理)。A 与 B 都发出描述各自处理过程的事件,但 B 的事件在 A 的事件之前到达消息代理。于是流处理器先看到 B 事件再看到 A 事件——尽管它们实际是反向发生的。

打个比方:想想《星球大战》电影。第 4 集 1977 年上映、第 5 集 1980 年、第 6 集 1983 年;第 1、2、3 集分别是 1999、2002、2005 年;第 7、8、9 集分别是 2015、2017、2019 年 [65]。如果你按上映顺序观看,你处理它们的顺序就与故事发生的顺序不一致(集数像事件时间戳,看电影的日期像处理时间)。作为人类我们能应对这种顺序的不连续;但流处理算法则需要专门写来应对这种时序与排序问题。

把事件时间与处理时间搞混会得到坏数据。例如假设你有一个度量请求速率(每秒请求数)的流处理器。重新部署该处理器时它可能停一分钟,再处理这一分钟里积压的事件。如果按处理时间衡量速率,看起来像是处理积压时出现了一次异常的请求尖峰;但实际上请求率是稳定的(图 12-8)。

按处理时间开窗会因处理速率变化而引入伪迹

图 12-8. 按处理时间开窗会因处理速率变化而引入伪迹

处理掉队事件

按事件时间定义窗口的一个棘手问题是:你永远无法确定某个窗口的事件是否已全部到齐,还是后面还会有更多。

例如你按 1 分钟把事件分组以统计每分钟请求数。某小时第 37 分钟里的事件已统计过一些,时间也在往前推进;如今到来的事件大多落在该小时的第 38、39 分钟。何时你才能宣告第 37 分钟那个窗口已结束、输出其计数?

可以这么做:当某个窗口在一段时间内未再收到新事件时超时,宣告该窗口已就绪。但有些事件可能被另一台机器缓冲、或被网络中断延迟。你必须能处理那些在窗口已被宣告完成之后才到达的掉队事件。大体上有两种选择 [1]:

  • 忽略掉队事件——正常情况下它们大概只占事件总数的一小部分。可以把丢弃事件的数量作为指标跟踪、一旦开始丢大量数据就告警。
  • 发布订正——一个把掉队事件包含进来的窗口更新值。可能还要撤回之前的输出。

某些情况下可以用一条特殊消息表示"从此以后不会再有时间戳早于 t 的消息了",消费者据此触发窗口 [66]。若不同生产者在不同机器上各自生成事件、各有自己的最小时间戳门槛,消费者就需要为每个生产者单独跟踪——这种情形下增减生产者就更棘手。

你用的是谁的时钟?

当事件可能在系统中多个点被缓冲时,给事件分配时间戳就更困难了。例如某移动 App 把使用度量事件上报给服务器:App 可能在设备离线时被使用——这时它会把事件本地缓冲,等下次互联网连接可用(可能在数小时乃至数天之后)再发给服务器。从该流的任何消费者看来,这些事件像是极度延迟的掉队者。

在这一情境下,事件时间戳真正应该是的,是用户与 App 交互发生时按移动设备本地时钟计的时刻。然而用户控制的设备上时钟通常不可信——它可能因故意或意外被设错(见第 360 页"时钟同步与精度")。事件被服务器接收的时间(按服务器钟)更可能准确(因为服务器在你掌控之中),但用它来描述用户交互那件事就没那么有意义了。

要校正错误的设备时钟,一种做法是记录三个时间戳 [67]:

  • 事件按设备时钟发生的时间
  • 事件按设备时钟发送给服务器的时间
  • 事件按服务器时钟被服务器接收的时间

用第三个减第二个,便能估出设备时钟与服务器时钟之间的偏差(假定网络延迟相比所需精度可以忽略)。再把该偏差应用到事件时间戳上,便能估出事件实际发生的真实时间(假定事件发生与发送给服务器期间设备时钟偏差未变)。

这一问题并非流处理独有——批处理也有完全相同的时间推理问题,只是在我们对时间流逝更敏感的流式语境里更显眼罢了。

窗口类型

知道事件时间戳应当如何确定后,下一步是决定如何按时间段定义窗口。窗口可被用于聚合——例如统计窗口内事件数或计算值的平均。常用的几种窗口 [64, 68]:

滚动窗口

滚动窗口长度固定,每个事件恰好属于一个窗口。例如一分钟的滚动窗口里,时间戳在 10:03:00 至 10:03:59 之间的所有事件归到一个窗口、10:04:00 至 10:04:59 归到下一个,依此类推。要实现一分钟滚动窗口,把每个事件时间戳向下舍入到最近的整分钟,便能确定它所属的窗口。

跳跃窗口

跳跃窗口长度也固定,但相邻窗口之间有重叠以提供平滑。例如步长为一分钟的五分钟窗口将覆盖 10:03:00 至 10:07:59 的事件,下一个窗口覆盖 10:04:00 至 10:08:59,依此类推。要实现,可先算一分钟滚动窗口,再聚合相邻几个。

滑动窗口

滑动窗口包含彼此之间在某时间间隔内发生的所有事件。例如五分钟的滑动窗口会把 10:03:39 与 10:08:12 的两件事覆盖在一起,因为它们间隔不到五分钟(注意五分钟的滚动或跳跃窗口不会把这两件事放进同一个窗口,因为它们用的是固定边界)。滑动窗口可以通过维护一份按时间排序的事件缓冲、并在事件过期时移除来实现。

会话窗口

与其他窗口类型不同,会话窗口没有固定时长。它的定义是:把同一用户在时间上紧密发生的事件聚合到一起,用户一段时间没活动(例如 30 分钟)就关闭窗口。"会话化"是网站分析的一项常见需求。

窗口操作通常需要维护临时状态。某些情况下状态大小固定,与窗口大小或事件多少无关——例如一个计数操作只有一个计数器,不管窗口大小或事件数。另一方面像滑动窗口或下一节讨论的流连接,则需要把事件缓冲到窗口结束。因此窗口大、或吞吐高的流可能让流处理器维持大量临时状态。你必须确保跑流处理任务的机器有足够能力在内存或磁盘上承载这些状态。

流连接

第 471 页"连接与分组"中我们讨论了批作业如何按 key 连接数据集,以及这种连接如何成为数据流水线的重要组成部分。由于流处理把数据流水线推广为对无界数据集做增量处理,对流做连接的需求同样存在。

然而"事件可在任何时刻出现在流上"这一事实让流上的连接比批作业里的连接更具挑战。为更好地理解,我们区分三种连接类型:流-流连接、流-表连接、表-表连接。下面分别举例说明。

流-流连接(窗口连接)

假设你的网站有搜索功能,你想检测最近被搜索的 URL 的趋势。每当有人输入搜索查询,记录一个包含查询与返回结果的事件;每当有人点击其中一个搜索结果,再记录一个事件。要计算每个搜索结果中 URL 的点击通过率,需要把搜索动作与点击动作的事件配在一起——它们由同一会话 ID 关联。广告系统也有类似分析需求 [69]。

如果用户放弃搜索,点击可能根本不来;即便来了,搜索与点击之间的时间差也可能高度可变。多数情况下也许是几秒,但也可能长达数日数周(用户做了一次搜索,忘掉那个浏览器标签页,然后稍后回到那个标签页点了一个结果)。由于可变的网络延迟,点击事件甚至可能比搜索事件还早到达。你可以为连接选择合适的窗口——例如只在搜索之后最多一小时内发生的点击进行连接。

注意把搜索的细节嵌入点击事件并不等同于连接事件——那样只能告诉你用户点了搜索结果的情况,而不能告诉你用户没点任何结果时所做的搜索。要衡量搜索质量,你需要准确的点击通过率,而这需要搜索事件与点击事件都齐全。

要实现这种连接,流处理器需要维护状态——例如把过去一小时内发生的所有事件按会话 ID 索引。每当出现搜索或点击事件,把它加进相应索引,流处理器还要查另一个索引看同一会话 ID 是否已有另一事件到达。如果有,就发出一条事件说明哪个搜索结果被点了;如果搜索事件过期且未看到匹配的点击事件,就发出一条事件说明哪些搜索结果没被点击。

流-表连接(流富化)

第 471 页"连接与分组"(图 11-2)中我们见过一个批作业连接两份数据集的例子:一组用户活动事件与一份用户资料数据库。把用户活动事件视为一个流、并以同样方式持续在流处理器中执行 join 也很自然。输入是带 user ID 的活动事件流,输出是同样的活动事件流,只是 user ID 已被该用户的资料信息富化。这种做法有时也被叫做用数据库信息富化活动事件。

要做这一连接,流处理需要每次拿一条活动事件,到数据库里查它的 user ID,再把资料信息加到该活动事件上。数据库查找可通过查询远程数据库实现;不过如第 471 页"连接与分组"所讨论,这种远程查询很可能慢,且有把数据库压垮的风险 [58]。

另一种做法是把数据库的副本加载到流处理器内部,让它能本地查询而无需网络往返。这一技术叫做哈希连接,因为本地数据库副本可能是装得下的内存哈希表,或本地磁盘上的索引。

与批作业的差别在于:批作业用数据库的某一时点快照作为输入;而流处理器是长跑的,数据库内容很可能随时间变化,所以流处理器的本地数据库副本需要保持最新。这一问题可以用 CDC 解决:流处理器既订阅用户活动事件流,也订阅用户资料数据库的变更日志。资料被创建或修改时,流处理器更新自己的本地副本。这样就得到两个流之间的连接:活动事件与资料更新。

流-表连接很像流-流连接,最大区别是表的变更日志一侧的连接所用的窗口一直延伸到"时间起点"(一个概念上无限的窗口),且较新的记录会覆盖较老的;流输入侧的连接可能根本不需要维护窗口。

表-表连接(物化视图维护)

回想第 34 页"案例研究:社交网络主页时间线"里讨论的社交网络时间线例子。我们说过用户想看自己的主页时间线时,遍历他所关注的所有人、找出他们最近的帖子再合并起来代价太大。

我们要的是一个时间线缓存——一种按用户的"收件箱"——帖子被发出时直接写入这一收件箱,让读取时间线只需一次查询即可。物化与维护这一缓存需要以下事件处理:

  • 用户 u 发新帖时,加入到所有关注 u 的用户的时间线。
  • 用户删帖或注销账户时,从所有用户的时间线里把它移除。
  • 用户 u₁ 开始关注 u₂ 时,u₂ 最近的帖子加入到 u₁ 的时间线。
  • 用户 u₁ 取消关注 u₂ 时,u₂ 的帖子从 u₁ 的时间线中移除。

要在流处理器中实现这种缓存维护,你需要帖子(发布与删除)与关注关系(关注与取关)的事件流。流过程要维护一个数据库,记录每个用户的关注者集合,以便在新帖到达时知道要更新哪些时间线。

看待这一流过程的另一种方式是:它在维护一份针对"连接两张表(postsfollows)"的查询的物化视图——大致如下:

SELECT follows.follower_id AS timeline_id,
  array_agg(posts.* ORDER BY posts.timestamp DESC)
FROM posts
JOIN follows ON follows.followee_id = posts.sender_id
GROUP BY follows.follower_id

流的连接直接对应这条查询里两张表的连接。时间线实际上是这条查询结果的一份缓存,每次底层表变化都被更新。

如果像图 12-7 那样把流视为表的导数、把连接视为两张表 u·v 的乘积,会发生有趣的事:物化连接结果的变更流遵循乘积法则 (u·v)′ = u′v + uv′——posts 的任何变更与当前关注者连接,follows 的任何变更与当前帖子连接 [37]。

连接的时间依赖

上述三种连接(流-流、流-表、表-表)有许多共同点。它们都要求流处理器维护一份从某连接输入派生而来的状态(搜索与点击事件、用户资料、关注列表),并在处理另一输入的记录时查询该状态。

维护这一状态所用事件的顺序很重要——比如先关注、再取关一个用户,与反过来,结果不同。在 Kafka 这样的分片事件日志中,单个分片(分区)内事件的顺序被保留,但跨流或跨分片通常没有顺序保证。

这就引出一个问题:如果不同流上的事件在差不多同一时间发生,应按什么顺序处理它们?例如流-表连接里,如果用户在更新资料的同时有活动事件在被处理,该把这些活动事件与新资料、还是旧资料连接?换一种说法:如果状态会随时间变化、而你又要与某个状态连接,你用哪一时间点的状态做连接?

这种时间依赖在很多地方出现。例如卖东西时要给发票应用正确税率——它取决于国家或地区、商品类型与销售日期(税率会随时间变)。把销售连接到税率表时,你大概想用销售发生那一刻的税率,而不是当前税率(如果你在重新处理历史数据,两者很可能不同)。

如果跨流的事件顺序未定,连接就成为非确定的 [70]:你不能在同一输入上重跑同一作业、必然得到相同结果。再次跑作业时输入流上的事件可能以不同方式交错。

数据仓库里这一问题被称为缓慢变化维度(SCD),常见的处理办法是给被连接记录的某个特定版本一个唯一标识符——比如每次税率变化就给它一个新标识符,发票里包含销售时税率的标识符 [71, 72]。这样连接就变得确定,但代价是日志压实做不了——表中所有记录版本都要保留下来。或者你可以反规范化数据,把当时适用的税率直接放进每个销售事件里。

容错

本章最后一节,让我们看看流处理器如何容错。第 11 章中我们看到批处理框架能相当容易地容忍故障:任务失败时只需在另一台机器上重新启动它,失败任务的输出就直接丢弃。这种透明重试之所以可行,是因为输入文件不可变、每个任务把输出写到独立文件、输出仅在任务成功完成后才可见。

特别地,批处理的容错方法保证批作业的输出与"什么也没出错"时一致——即便确有任务失败也是如此。看起来就像每条输入记录都被恰好处理过一次——没有跳过、也没处理两次。尽管重启任务意味着某些记录可能实际被处理过多次,但输出中的可见效果就好像它们只被处理过一次。这一原理叫恰好一次语义,不过实际只一次会是更准确的描述 [73]。

同样的容错问题也出现在流处理上,但解决起来不那么直接。等任务结束才让输出可见的做法在这里行不通——流是无限的,你永远没法处理完它。

微批与检查点

一种解法是把流切成小块,每块当作一个迷你批处理来处理。这种做法叫微批,被 Spark Streaming 采用 [74]。批大小通常约一秒,这是性能权衡的结果:批越小,调度与协调开销越大;批越大,流处理结果可见的延迟就越长。

微批还隐式地提供了一个等于批大小的滚动窗口(按处理时间分窗,不是按事件时间戳);任何需要更大窗口的作业都得显式地把状态从一个微批带到下一个。

Apache Flink 采用的另一种相关做法是周期性生成滚动状态检查点,写到持久存储 [75, 76]。如果某个流算子崩溃,可以从最近的检查点重启,丢弃最后一次检查点与崩溃之间产生的任何输出。检查点由消息流中的屏障触发,类似微批之间的边界,但不强制特定的窗口大小。

在流处理框架的范围内,微批与检查点方法提供了与批处理一样的恰好一次语义。然而一旦输出离开流处理器(如写入数据库、向外部消息代理发布消息或触发邮件发送),框架就再也无法丢弃失败微批的输出。这种情况下重启失败任务会让外部副作用发生两次,单靠微批或检查点不足以防止这一问题。

原子提交再讨论

要在故障下呈现"恰好一次处理"的表象,需要确保处理某事件的所有输出与副作用当且仅当处理成功时才生效。这包括发到下游算子或外部消息系统的任何消息(包括邮件或推送通知)、对数据库的写、对算子状态的修改,以及对输入消息的确认(包括在基于日志的消息代理中把消费者偏移向前推进)。

这些动作要么全部以原子方式发生,要么都不发生。如果这听起来熟悉,那是因为我们在第 329 页"恰好一次消息处理"中以分布式事务与两阶段提交的语境讨论过它。

我们考察过 XA 等传统分布式事务实现的问题。然而在更受限的环境里高效实现这种原子提交设施是可行的。Google Cloud Dataflow [66, 75]、VoltDB [77]、Apache Kafka [78, 79] 都采用此法。与 XA 不同,这些实现并不试图跨异构技术提供事务,而是把状态变更与消息收发都放在流处理框架内部,让事务保持在"内部"。事务协议的开销可以通过把多条输入消息放在同一事务里处理来摊销。

幂等性

我们的目标是把失败任务的部分输出丢弃,以便它们能被安全重试。分布式事务是达成这一目标的一种方式;另一种是依赖幂等性,第 187 页"持久执行与工作流"中我们讨论过 [80]。

幂等操作是这样的操作:你可以多次执行它,效果与只执行一次相同。例如在键值存储中删除某 key 是幂等的(再删一遍没有进一步效果),而把计数器加一就不是幂等的(再加一意味着值被加了两次)。

即便某操作并非天然幂等,也常常可以靠加一点元数据让它变得幂等。例如从 Kafka 消费消息时,每条消息都有一个持久、单调递增的偏移。把值写入外部数据库时可以把"触发该写的消息的偏移"也包含进去——这样你就能判断某更新是否已被应用、避免重复应用。Storm 的 Trident 中状态处理就基于这一思路。

依赖幂等性蕴含若干假设:失败的任务被重启时必须按相同顺序重放相同消息(基于日志的消息代理可以做到);处理必须确定性;并且没有别的节点可以并发更新同一个值 [81, 82]。当处理从一个节点故障转移到另一个时,可能需要 fencing(见第 373 页"分布式锁与租约")以防被认为已死却其实仍活着的节点造成干扰。尽管有这些注意事项,幂等操作仍是以小开销实现恰好一次语义的有效方法。

故障后重建状态

任何需要状态的流处理——例如带窗口的聚合(计数器、平均值、直方图)以及连接所用的任何表与索引——都必须在故障后能够恢复其状态。

一种选择是把状态保存在远程数据存储中并复制;不过对每条消息都查远程数据库可能慢。另一种是把状态保留在流处理器本地、并周期性地复制。当流处理器从故障恢复时,新任务可以读取被复制的状态、不丢数据地继续处理。

例如 Flink 周期性地捕获算子状态的快照、把它们写到分布式文件系统这类持久存储 [75, 76];Kafka Streams 通过把状态变更发到一个用日志压实的专用 Kafka 主题来复制状态,类似 CDC [83];VoltDB 则通过在多个节点上冗余地处理同一输入消息来复制状态(见第 309 页"实际串行执行")。

某些情况下甚至无须复制状态——它可以从输入流重建。例如某状态由对相当短的窗口的聚合构成,那么直接重放该窗口对应的输入事件可能足够快;如果状态是某数据库的本地副本(由 CDC 维护),数据库也可以从日志压实的变更流中重建。

这一切都取决于底层基础设施的性能特征。某些系统中网络延迟可能低于磁盘访问延迟、网络带宽可与磁盘带宽相当。没有一种方案能在所有情形下都最理想;本地状态与远程状态的优劣也可能随存储与网络技术演进而变化。

总结

本章我们讨论了事件流——它们的用途以及如何处理它们。某种意义上流处理与第 11 章里的批处理颇为相似,但它在无界(永不终止)的流上持续进行,而不是在固定大小的输入上跑完即止。从这个视角看,消息代理与事件日志便是流式版的文件系统。

我们花了一些篇幅比较两类消息代理:

AMQP/JMS 风格的消息代理

代理把单条消息分配给消费者,消费者处理完成后逐条确认。消息一经确认便从代理上删除。这一做法适合做一种异步形式的 RPC(亦见第 189 页"事件驱动架构")——例如任务队列里消息处理顺序无关紧要、且消息处理完后无需回头重读旧消息。

基于日志的消息代理

代理把某分片中的所有消息分给同一个消费者节点,并按相同顺序投递。并行通过分片实现,消费者通过记录上次处理消息的偏移来跟踪自己的进度。代理把消息保留在磁盘上,因此必要时可回头重读旧消息。

基于日志的做法与数据库中的复制日志(见第 6 章)以及日志结构存储引擎(见第 4 章)有相似之处,也是一种共识形式(见第 10 章)。我们看到这种做法尤其适合那种消费输入流并产生派生状态或派生输出流的流处理系统。

关于流的来源,我们讨论了几种可能:用户活动事件、提供周期性读数的传感器,以及天然就以流的形式呈现的数据(如金融市场行情)。我们也看到把"对数据库的写"视为流大有用处:可以通过 CDC 隐式地、或通过事件溯源显式地捕获变更日志——数据库历来发生的所有变更的历史。日志压实让流能保留数据库内容的完整副本。

把数据库表达为流为整合各系统打开了强大的可能。你可以通过消费变更日志并把它应用到派生系统,让搜索索引、缓存、分析系统等派生数据系统持续保持最新;你甚至可以从零开始、自始消费整份变更日志,对既有数据构建全新的视图。

把状态以流的形式维护、重放消息的能力,也是各种流处理框架中实现流连接与容错技术的基础。我们讨论了流处理的几种用途,包括搜索事件模式(复杂事件处理)、计算开窗聚合(流分析)、维护派生数据系统(物化视图)。

随后我们讨论了在流处理器中推理时间的难点,包括处理时间与事件时间戳的区别,以及如何处理在你以为窗口已结束之后才到达的掉队事件。

我们区分了流处理中可能出现的三种连接类型:

流-流连接

两路输入流都由活动事件构成,连接算子在某时间窗口内寻找相关事件。例如算子可能匹配同一用户在 30 分钟内做的两次动作。如果你想找出某个流内部的相关事件,两路连接输入实际上也可能是同一个流(自连接)。

流-表连接

一路输入流是活动事件,另一路是数据库的变更日志。变更日志让数据库的本地副本保持最新。每来一条活动事件,连接算子查数据库、输出富化后的活动事件。

表-表连接

两路输入流都是数据库的变更日志。这种情况下,一侧的每个变更都与另一侧的最新状态相连接。结果是一条变更流:两张表之间连接的物化视图,随每次变更而更新。

最后,我们讨论了在流处理器中实现容错与恰好一次语义的若干技术。和批处理一样,我们要丢弃任何失败任务的部分输出。然而由于流过程是长跑的、持续产出输出,不能简单丢弃所有输出,因此可以用更细粒度的恢复机制——基于微批、检查点、事务或幂等写。

参考文献

[1] Tyler Akidau 等. "The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing." Proceedings of the VLDB Endowment, volume 8, issue 12, pages 1792–1803, August 2015. doi:10.14778/2824032.2824076

[2] Harold Abelson, Gerald Jay Sussman, and Julie Sussman. Structure and Interpretation of Computer Programs, 2nd edition. MIT Press, 1996. ISBN: 9780262510875. 归档于 archive.org

[3] Patrick Th. Eugster 等. "The Many Faces of Publish/Subscribe." ACM Computing Surveys, volume 35, issue 2, pages 114–131, June 2003. doi:10.1145/857076.857078

[4] Don Carney 等. "Monitoring Streams—A New Class of Data Management Applications." 见 28th International Conference on Very Large Data Bases (VLDB), August 2002. doi:10.1016/B978-155860869-6/50027-5

[5] Matthew Sackman. "Pushing Back." wellquite.org, May 2016. 归档于 perma.cc/3KCZ-RUFY

[6] Thomas Figg (tef). "How (Not) to Write a Pipeline." cohost.org, June 2023. 归档于 perma.cc/A3V8-NYCM

[7] Vicent Martí. "Brubeck, a statsd-Compatible Metrics Aggregator." github.blog, June 2015. 归档于 perma.cc/TP3Q-DJYM

[8] Seth Lowenberger. "MoldUDP64 Protocol Specification V 1.00." nasdaqtrader.com, July 2009. 归档于 perma.cc/7CRQ-QBD7

[9] Ian Malpass. "Measure Anything, Measure Everything." codeascraft.com, February 2011. 归档于 archive.org

[10] Dieter Plaetinck. "25 Graphite, Grafana and statsd Gotchas." grafana.com, March 2016. 归档于 perma.cc/3NP3-67U7

[11] Jeff Lindsay. "Web Hooks to Revolutionize the Web." progrium.com, May 2007. 归档于 perma.cc/BF9U-XNX4

[12] Jim N. Gray. "Queues Are Databases." Microsoft Research Technical Report MSR-TR-95-56, December 1995. 归档于 arxiv.org

[13] Mark Hapner 等. "JSR-343 Java Message Service (JMS) 2.0 Specification." jms-spec.java.net, March 2013. 归档于 perma.cc/E4YG-46TA

[14] Sanjay Aiyagari 等. "AMQP: Advanced Message Queuing Protocol Specification." Version 0-9-1, November 2008. 归档于 perma.cc/6YJJ-GM9X

[15] "Architectural Overview of Pub/Sub." cloud.google.com, 2025. 归档于 perma.cc/VWF5-ABP4

[16] Aris Tzoumas. "Lessons from Scaling PostgreSQL Queues to 100k Events Per Second." rudderstack.com, July 2025. 归档于 perma.cc/QD8C-VA4Y

[17] Robin Moffatt. "Kafka Connect Deep Dive—Error Handling and Dead Letter Queues." confluent.io, March 2019. 归档于 perma.cc/KQ5A-AB28

[18] Dunith Danushka. "Message Reprocessing: How to Implement the Dead Letter Queue." redpanda.com. 归档于 perma.cc/R7UB-WEWF

[19] Damien Gasparina, Loic Greffier, and Sebastien Viale. "KIP-1034: Dead Letter Queue in Kafka Streams." cwiki.apache.org, April 2024. 归档于 perma.cc/3VXV-QXAN

[20] Jay Kreps, Neha Narkhede, and Jun Rao. "Kafka: A Distributed Messaging System for Log Processing." 见 6th International Workshop on Networking Meets Databases (NetDB), June 2011. 归档于 perma.cc/CSW7-TCQ5

[21] Jay Kreps. "Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)." engineering.linkedin.com, April 2014. 归档于 archive.org

[22] Kartik Paramasivam. "How We're Improving and Advancing Kafka at LinkedIn." engineering.linkedin.com, September 2015. 归档于 perma.cc/3S3V-JCYJ

[23] Philippe Dobbelaere and Kyumars Sheykh Esmaili. "Kafka Versus RabbitMQ: A Comparative Study of Two Industry Reference Publish/Subscribe Implementations." 见 11th ACM International Conference on Distributed and Event-Based Systems (DEBS), June 2017. doi:10.1145/3093742.3093908

[24] Kate Holterhoff. "Why Message Queues Endure: A History." redmonk.com, December 2024. 归档于 perma.cc/6DX8-XK4W

[25] Andrew Schofield. "KIP-932: Queues for Kafka." cwiki.apache.org, May 2023. 归档于 perma.cc/LBE4-BEMK

[26] Jack Vanlightly. "The Advantages of Queues on Logs." jack-vanlightly.com, October 2023. 归档于 perma.cc/WJ7V-287K

[27] Jay Kreps. "The Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction." engineering.linkedin.com, December 2013. 归档于 perma.cc/2JHR-FR64

[28] Andy Hattemer. "Change Data Capture Is Having a Moment. Why?" materialize.com, September 2021. 归档于 perma.cc/AL37-P53C

[29] Prem Santosh Udaya Shankar. "Streaming MySQL Tables in Real-Time to Kafka." engineeringblog.yelp.com, August 2016. 归档于 perma.cc/5ZR3-2GVV

[30] Andreas Andreakis and Ioannis Papapanagiotou. "DBLog: A Watermark Based Change-Data-Capture Framework." 归档于 arXiv:2010.12597, October 2020.

[31] Jiri Pechanec. "Percolator." debezium.io, October 2021. 归档于 perma.cc/EQ8E-W6KQ

[32] Debezium maintainers. "Debezium Connector for Cassandra." debezium.io. 归档于 perma.cc/WR6K-EKMD

[33] Neha Narkhede. "Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines." confluent.io, February 2016. 归档于 perma.cc/8WXJ-L6GF

[34] Chris Riccomini. "Kafka Change Data Capture Breaks Database Encapsulation." cnr.sh, November 2018. 归档于 perma.cc/P572-9MKF

[35] Gunnar Morling. "'Change Data Capture Breaks Encapsulation'. Does It, Though?" decodable.co, November 2023. 归档于 perma.cc/YX2P-WNWR

[36] Gunnar Morling. "Revisiting the Outbox Pattern." decodable.co, October 2024. 归档于 perma.cc/M5ZL-RPS9

[37] Ashish Gupta and Inderpal Singh Mumick. "Maintenance of Materialized Views: Problems, Techniques, and Applications." IEEE Data Engineering Bulletin, volume 18, issue 2, pages 3–18, June 1995. 归档于 archive.org

[38] Mihai Budiu 等. "DBSP: Incremental Computation on Streams and Its Applications to Databases." SIGMOD Record, volume 53, issue 1, pages 87–95, March 2024. doi:10.1145/3665252.3665271

[39] Jim Gray and Andreas Reuter. Transaction Processing: Concepts and Techniques. Morgan Kaufmann, 1992. ISBN: 9781558601901

[40] Martin Kleppmann. "Accounting for Computer Scientists." martin.kleppmann.com, March 2011. 归档于 perma.cc/9EGX-P38N

[41] Pat Helland. "Immutability Changes Everything." 见 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015. 归档于 perma.cc/33WX-3669

[42] Martin Kleppmann. Making Sense of Stream Processing. Report, O'Reilly Media, May 2016. 归档于 perma.cc/RAY4-JDVX

[43] Kartik Paramasivam. "Stream Processing Hard Problems—Part 1: Killing Lambda." engineering.linkedin.com, June 2016. 归档于 archive.org

[44] Stéphane Derosiaux. "CQRS: What? Why? How?" sderosiaux.medium.com, September 2019. 归档于 perma.cc/FZ3U-HVJ4

[45] Baron Schwartz. "Immutability, MVCC, and Garbage Collection." xaprb.com, December 2013. 归档于 archive.org

[46] Daniel Eloff 等. "Re: Turning the Database Inside-out with Apache Samza." Hacker News 讨论, news.ycombinator.com, March 2015. 归档于 perma.cc/ML9E-JC83

[47] Cognitect, Inc. "Datomic Documentation: Excision." docs.datomic.com. 归档于 perma.cc/J5QQ-SH32

[48] "Fossil Documentation: Deleting Content from Fossil." fossil-scm.org, 2025. 归档于 perma.cc/DS23-GTNG

[49] Jay Kreps. "The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard." x.com, March 2015. 归档于 perma.cc/7RRZ-V7B7

[50] Brent Robinson. "Crypto Shredding: How It Can Solve Modern Data Retention Challenges." medium.com, January 2019. 归档于 perma.cc/4LFK-S6XE

[51] Matthew D. Green and Ian Miers. "Forward Secure Asynchronous Messaging from Puncturable Encryption." 见 IEEE Symposium on Security and Privacy, May 2015. doi:10.1109/SP.2015.26

[52] David C. Luckham. "What's the Difference Between ESP and CEP?" complexevents.com, June 2019. 归档于 perma.cc/E7PZ-FDEF

[53] Arvind Arasu, Shivnath Babu, and Jennifer Widom. "The CQL Continuous Query Language: Semantic Foundations and Query Execution." The VLDB Journal, volume 15, issue 2, pages 121–142, June 2006. doi:10.1007/s00778-004-0147-z

[54] Julian Hyde. "Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch." ACM Queue, volume 7, issue 11, December 2009. doi:10.1145/1661785.1667562

[55] Philippe Flajolet 等. "HyperLogLog: The Analysis of a Near-Optimal Cardinality Estimation Algorithm." 见 Conference on Analysis of Algorithms (AofA), June 2007. doi:10.46298/dmtcs.3545

[56] Jay Kreps. "Questioning the Lambda Architecture." oreilly.com, July 2014. 归档于 perma.cc/2WY5-HC8Y

[57] Ian Reppel. "An Overview of Apache Streaming Technologies." ianreppel.org, March 2016. 归档于 perma.cc/BB3E-QJLW

[58] Jay Kreps. "Why Local State Is a Fundamental Primitive in Stream Processing." oreilly.com, July 2014. 归档于 perma.cc/P8HU-R5LA

[59] RisingWave Labs. "Deep Dive into the RisingWave Stream Processing Engine—Part 2: Computational Model." risingwave.com, November 2023. 归档于 perma.cc/LM74-XDEL

[60] Frank McSherry, Derek G. Murray, Rebecca Isaacs, and Michael Isard. "Differential Dataflow." 见 6th Biennial Conference on Innovative Data Systems Research (CIDR), January 2013. 归档于 perma.cc/T83W-ZBR2

[61] Andy Hattemer. "Incremental Computation in the Database." materialize.com, March 2020. 归档于 perma.cc/AL94-YVRN

[62] Shay Banon. "Percolator." elastic.co, February 2011. 归档于 perma.cc/LS5R-4FQX

[63] Alan Woodward and Martin Kleppmann. "Real-Time Full-Text Search with Luwak and Samza." martin.kleppmann.com, April 2015. 归档于 perma.cc/2U92-Q7R4

[64] Tyler Akidau. "The World Beyond Batch: Streaming 102." oreilly.com, January 2016. 归档于 perma.cc/4XF9-8M2K

[65] Stephan Ewen. "Streaming Analytics with Apache Flink." 见 Kafka Summit, April 2016. 归档于 perma.cc/QBQ4-F9MR

[66] Tyler Akidau 等. "MillWheel: Fault-Tolerant Stream Processing at Internet Scale." Proceedings of the VLDB Endowment, volume 6, issue 11, pages 1033–1044, August 2013. doi:10.14778/2536222.2536229

[67] Alex Dean. "Improving Snowplow's Understanding of Time." snowplow.io, September 2015. 归档于 perma.cc/6CT9-Z3Q2

[68] "Azure Stream Analytics: Windowing Functions." learn.microsoft.com, July 2025. 归档于 archive.org

[69] Rajagopal Ananthanarayanan 等. "Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams." 见 ACM International Conference on Management of Data (SIGMOD), June 2013. doi:10.1145/2463676.2465272

[70] Ben Kirwin. "Doing the Impossible: Exactly-Once Messaging Patterns in Kafka." ben.kirw.in, November 2014. 归档于 perma.cc/A5QL-QRX7

[71] Pat Helland. "Data on the Outside Versus Data on the Inside." 见 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005. 归档于 perma.cc/K9AH-LQPS

[72] Ralph Kimball and Margy Ross. The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd edition. John Wiley & Sons, 2013. ISBN: 9781118530801

[73] Viktor Klang. "I'm coining the phrase 'effectively-once' for message processing with at-least-once + idempotent operations." x.com, October 2016. 归档于 perma.cc/7DT9-TDG2

[74] Matei Zaharia 等. "Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters." 见 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.

[75] Kostas Tzoumas, Stephan Ewen, and Robert Metzger. "High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink." ververica.com, August 2015. 归档于 archive.org

[76] Paris Carbone 等. "Lightweight Asynchronous Snapshots for Distributed Dataflows." 归档于 arXiv:1506.08603, June 2015.

[77] Ryan Betts and John Hugg. Fast Data: Smart and at Scale. Report, O'Reilly Media, October 2015. 归档于 perma.cc/VQ6S-XQQY

[78] Neha Narkhede and Guozhang Wang. "Exactly-Once Semantics Are Possible: Here's How Kafka Does It." confluent.io, June 2019. 归档于 perma.cc/Q2AU-Q2ED

[79] Jason Gustafson 等. "KIP-98—Exactly Once Delivery and Transactional Messaging." cwiki.apache.org, November 2016. 归档于 perma.cc/95PT-RCTG

[80] Pat Helland. "Idempotence Is Not a Medical Condition." Communications of the ACM, volume 55, issue 5, pages 56–65, May 2012. doi:10.1145/2160718.2160734

[81] Jay Kreps. "Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind." samza-dev mailing list, September 2014. 归档于 perma.cc/7DPD-GJNL

[82] E. N. (Mootaz) Elnozahy 等. "A Survey of Rollback-Recovery Protocols in Message-Passing Systems." ACM Computing Surveys, volume 34, issue 3, pages 375–408, September 2002. doi:10.1145/568522.568525

[83] Adam Warski. "Kafka Streams—How Does It Fit the Stream Processing Landscape?" softwaremill.com, June 2016. 归档于 perma.cc/WQ5Q-H2J2

[84] Stephan Ewen, Fabian Hueske, and Xiaowei Jiang. "Batch as a Special Case of Streaming and Alibaba's contribution of Blink." flink.apache.org, February 2019. 归档于 perma.cc/A529-SKA9

原书 © 2026 Martin Kleppmann & Chris Riccomini · 中文翻译仅供学习交流