流处理
- 复杂事件处理(CEP):存储一个搜索模式,在流数据流经时判断是否符合这样的模式
- 流分析:对一定窗口期内的数据进行计算、分析
- 通过流来进行RPC
消息系统
早期使用消息队列来实现流处理:
stateDiagram direction LR 数据源 --> 消息队列1 消息队列1 --> 处理逻辑1 处理逻辑1 --> 消息队列2 消息队列2 --> 处理逻辑3 消息队列2 --> 处理逻辑4 处理逻辑3 --> 消息队列3 处理逻辑4 --> 消息队列3
消息系统与传统的数据库有着本质的区别:数据临时与永久之分
分区日志消息系统:结合了传统消息系统与数据库:既是流,又能存
graph TB subgraph 以日志为中心的基础设施栈 A[Log] B[Graph DB, OLAP Store, Etc] C[Key-Value Query Layer] D[Search Query Layer] E[Monitoring & Graphs] F[Stream Processing] G[Hadoop] B --> A C --> A A --> C D --> A E --> A F --> A A --> F G --> A A --> G end
流与数据库
- 数据库的变更通过流与系统异构存储保持同步
- 变更数据捕获(CDC):初始快照 + 后续变更操作
- 事件溯源:回放所有日志得到数据的最终状态
本质上就是状态复制机的实现
本质上数据库的日志就是流,数据库里的数据就是当前流重放的快照
---title: 带有快速追随者分析数据库的变更数据获取---graph LR A[应用程序] B[生产数据库] C[分析快速跟随数据库] D[分析] A -- 生产事务 --> B B -- CDC --> C C -- 查询 --> D
这种模式通过实时数据同步优化性能和数据分析,但增加了系统复杂性和维护成本
DataFlow模型
核心概念:
- ParDo,地位相当于 MapReduce 里的 Map 阶段。所有的输入数据,都会被一个 DoFn,也就是处理函数处理
- GroupByKey,地位则是 MapReduce 里的 Shuffle 操作。把相同的 Key 汇总到一起,然后再通过一个 ParDo 下的 DoFn 进行处理
时间问题
流处理依赖于本地时间戳,时钟是不可靠的,同时考虑消息堆积、软件错误等问题,基于时间戳的流分析可能不准
流处理中有两种时间:
- 事件时间:代表了事件在数据源中实际发生的时间。事件时间通常由数据生产者(如传感器、日志系统等)记录,并嵌入到数据中
- 处理时间:由流处理框架记录的时间,反映了数据进入系统并被处理时的系统时间
水位线
水位线用于追踪和管理事件时间进度,帮助系统处理乱序事件,确保计算结果的及时性和准确性。它是一个时间标记,表示在此时间点之前的所有事件都已经到达,系统可以基于这个时间点进行窗口计算、触发输出等操作
时间值为X的水位线表示:“所有事件时间小于X的输入数据都被观察到。”因此,当我们在观察一个无界数据源时,水位线会作为一种进度度量
sequenceDiagram participant Source as 数据源 participant System as 流处理系统 participant Window as 窗口计算 Note right of Source: 事件流 Source->>System: 事件A (时间戳t1) Source->>System: 事件B (时间戳t2) Source->>System: 事件C (时间戳t3) Source->>System: 水位线W1 (时间t3) Note over System: <br>当水位线W1到达<br>表示在t3之前的事件都已接收<br> System->>Window: 触发窗口计算 (t1到t3) Window->>System: 计算结果 System->>Source: 输出结果 Note right of Source: <br>延迟到达的事件 Source->>System: 事件D (时间戳t2, 延迟) System->>Window: 处理延迟事件 (如果策略允许)
水位线创建
- 完美水位线:确保水位线涵盖了所有数据,即水印后不会再有早于该水位线时间的数据,生成方法包括直接使用处理时间而非事件时间,处理一个静态、有序的数据
- 启发式水位线:对没有早于水位线时间的数据将不再出现的一个估计。使用启发式水位线的管道可能需要处理一些延迟数据,延迟数据是指在水位线前进到数据事件时间之后才到达的数据
水位线传播
指的是从输入源开始,随着数据通过各个阶段逐步传递的过程。水位线在每个阶段都被更新,以反映该阶段处理数据的进度
- 输入水位线:每个阶段都有一个输入水位线,它表示该阶段上游所有输入源的进度。对于源输入,输入水位线由源特定的函数创建。对于非源阶段,输入水位线是所有上游阶段的输出水位线的最小值。
- 输出水位线:每个阶段还有一个输出水位线,它表示该阶段本身的处理进度。输出水位线通常定义为输入水位线与该阶段内所有非延迟数据事件时间的最小值。
- 计算事件时间延迟:通过比较一个阶段的输入和输出水位线,可以计算该阶段引入的事件时间延迟。这表示输出数据相对于输入数据和实际时间的延迟程度。例如,执行10秒窗口聚合的阶段会有至少10秒的延迟。
- 细粒度水位线:每个阶段内部的处理可以分成多个缓冲区,每个缓冲区都有自己的水位线。阶段的输出水位线是所有这些缓冲区水位线的最小值。这种细粒度的水位线跟踪提供了更好的系统行为可见性
处理时间水位线
处理时间水位线基于系统当前的操作时间而不是数据的事件时间,处理时间水位线与事件时间水位线组合使用可以被用来监控系统延迟与数据延迟
数据处理模式
时间无关
这类流数据处理由于不涉及到时间,所以较简单,延迟也更低
窗口化处理
窗口类型:
- 固定窗口:固定长度,相互之间没有重叠且紧邻 [1,3] [4,6]
- 滑动窗口:固定长度,允许之间重叠以进行平滑过度 [1,3] [2,4]
- 会话窗口:没有固定时间,将同一用户的事件组合在一起,并过滤掉没有事件发生的非活动期
窗口的划分跟时间是强相关的,如果使用处理时间进行窗口划分,实现简单,不用对数据进行排序及缓存,适用于对实时性要求高但对时间顺序不敏感的场景,如实时监控和报警系统,缺点是一旦输入的数据的顺序发生变化,则计算结果也会不同
而基于事件时间进行窗口划分,需要对数据进行排序和缓存,即根据数据生成时的时间戳将数据分配到相应的窗口中,适用于需要严格按照数据发生时间进行统计分析的场景,如金融交易分析、用户行为分析等。基于事件时间的窗口对于输入的数据顺序不敏感
自定义窗口:
在 Beam 中,其可以通过窗口分配策略与窗口合并策略来自定义窗口策略
- 窗口分配:元素应该属于哪个窗口
- 窗口合并(可选):允许窗口根据时间进行合并,这使得窗口会随时间演变
触发器
用来控制窗口何时进行计算和输出结果的机制。触发器决定了窗口何时触发执行其聚合函数,并产生结果数据
Repeated update triggers
允许在窗口尚未完全关闭的情况下多次触发计算,每次触发都会输出当前的计算结果。这种触发器的特点就是更新频繁、延迟低、但代价也大。
Completeness triggers
会在确认窗口中的所有数据都已到达后,才触发窗口计算和输出。这通常通过检测Watermarks(水位线)或特定的业务逻辑条件来实现
- 水位线触发器:水位线是一个时间戳,通过根据事件时间周期性地生成水位线,当水位线达到窗口的结束时间时,此时认为这个窗口所有数据已到达,不再等待,触发窗口的计算
- 完美水位线:在完全了解所有输入数据的情况下,没有迟到的数据;所有数据都是提前或按时到达的
- 启发式水位线:使用关于输入的任何可用信息(分区、分区内的排序(如果有的话)、文件的增长率等)来提供尽可能准确的进度估计
时间触发器
- 过早触发器:会定期触发计算,直到水位线通过窗口的末尾
- 准时触发器:在水位线越过窗口后就认为数据已经完整了,此时触发窗口的计算
- 延迟触发器:在水位线越过窗口后,还会定期地去触发窗口计算
垃圾回收
流处理的窗口计算势必会需要再内存中缓存一些数据,所以需要清理掉过期的窗口数据
累加模式
如何处理多个窗口的聚合结果
丢弃模式
系统仅保留最新一个窗口的聚合结果,丢弃其他窗口的聚合结果
聚合模式
聚合是指在同一窗口中观察到的多个结果之间的关系。这些结果可能是完全脱节的;例如,他们可以完全独立的或者有重叠的(eg: 滚动窗口聚合和滑动窗口的聚合)
累加并撤销
系统不仅保留所有的聚合结果,还能够撤销先前发出的聚合结果。当新事件导致聚合结果变化时,系统会撤销之前计算过的结果,并用新的结果替换掉它们
精确一次性
- 精确一次性(Exactly-once semantics)是指在流处理系统中,每条记录只被处理一次的语义保障,即确保记录不会被遗漏或重复处理
- 副作用(Side effects)指在流处理过程中,操作对外部系统(如数据库、文件系统或其他服务)产生的影响。这些副作用有可能是多次执行的,因为系统可能会在处理失败时重试某些操作,从而导致非幂等操作的重复执行
精确一致性被用来解决数据丢失及重复时的计算结果准确性问题,而为了实现至少一次处理(at-least-once processing),系统在面对处理失败时会重试操作,这可能导致外部系统的操作被多次执行,这会导致副作用
为了实现精确一致性及消除由此带来的副作用,有如下方法:
- 状态快照与重放机制,在发生故障时,将系统状态进行回滚并重试
- 幂等处理,但不是所有操作都能转为幂等
- 唯一标识符:可以准确记录防止重复处理,但需要维护大量的状态信息
容错
- 微批处理:将流切成固定大小的块,如果这个块发生错误,则丢弃这个块的所有输出
- 校验点:定期生成检查点,如果流处理发生错误,就回到上一个检查点重新跑
这需要消费端保证幂等性,否则为了容错会输出不止一次导致副作用
持久状态
长期运行的流处理管道失败不可避免,鉴于这个问题,持久化状态提供了:
- 确保了在处理无界数据时,即使输入数据源已经丢失或移动,系统仍能继续处理
- 有助于在应对故障时减少重复工作和持久化数据
隐式状态
在数据处理过程中依赖计算逻辑和数据流顺序维护的状态,而不是通过显式存储管理的状态
例如,当处理一个流数据时,如果计算逻辑中包含了累积的计算结果或某些条件的中间结果,而这些结果没有被显式地存储为持久状态,这些信息就是隐式状态的一部分
广义状态
为了更加灵活地持久化状态,更进一步
- 要支持多种数据结构,以便针对不同任务选择最合适和高效的数据存储方式
- 根据需求调整数据的读写量和类型,以优化效率。这意味着在任何时间点,能够精确地读写所需的数据量,并尽可能并行化操作
- 提供更加灵活的触发器,满足不同的数据处理聚合需求
Streaming SQL
对传统SQL的扩展,使其能够处理持续不断的实时数据流。它不仅支持对静态数据表进行查询,还能处理不断变化的数据流,适应实时数据处理的需求
Beam 模型
- 偏向于流,涉及表的地方,都需要进行某种转换以隐藏表
sources:通常会硬编码表的触发方式。可能会在每次更新时触发,也可能会批量处理更新,或提供某个时间点的单一、有界数据快照
sinks:硬编码输入流的分组方式。有时通过用户指定的键进行分组,其他情况下则可能根据随机物理分区进行分组
group:由用户代码决定如何对数据进行分组聚合
SQL 模型
- 偏向于表,主要特点是查询应用于表并产生新的表
输入表:总是隐式地在特定时间点触发生成一个有界流,包含该时间点的表快照。
输出表:可以是查询结果的直接表现,或者在没有分组操作的查询中,通过将最终流隐式分组生成。
group 操作:SQL 提供完整的 group 操作灵活性,但只有一种隐式的解组操作,即触发整个中间表并将其转化为流,再进行后续操作
SQL 通过物化视图提供了一种流处理方式,物化视图是物理上物化为表并随源表变化而持续更新的视图。它使 SQL 能够执行持续查询,实时处理数据更新
流式连接
在流式数据处理中,对两个或多个实时数据流进行连接操作,以便将相关联的数据组合在一起。这种连接操作与传统数据库中的连接类似,但它需要处理的是不断到达的实时数据
join 操作可以当做一种特殊的分组操作,分组操作总是消费流并产生表。这意味着连接操作在本质上也是处理流,并且最终输出为表的一部分
与 SQL 模型一致的流式连接模型:
- 内连接(INNER JOIN):只返回两个流中键值匹配的记录。
- 左外连接(LEFT OUTER JOIN):返回左侧流中的所有记录,以及右侧流中键值匹配的记录。如果没有匹配的记录,则右侧记录为空。
- 右外连接(RIGHT OUTER JOIN):返回右侧流中的所有记录,以及左侧流中键值匹配的记录。如果没有匹配的记录,则左侧记录为空。
- 全外连接(FULL OUTER JOIN):返回两个流中的所有记录,不论是否有匹配。没有匹配的记录将用空值填充。
- 半连接(SEMI JOIN):只返回在另一个流中有匹配的左侧流记录。
- 反连接(ANTI JOIN):只返回在另一个流中没有匹配的左侧流记录
---title: 表连接---graph LR A[数据流1] --> B[表1] C[数据流2] --> D[表2] B --> E[新表] D --> E[新表]
---title: 丰富数据---graph LR 数据流 --> 更丰富的数据 其他数据源 --> 更丰富的数据
---title: 双流连接---graph LR 数据流1 --> 流缓存1 数据流2 --> 流缓存2 流缓存1 --> 连接处理器 流缓存2 --> 连接处理器 连接处理器 --> 新的流
窗口化的流式连接:时间窗口用于限制连接操作在一定的时间范围内,以控制内存使用和计算复杂度,但窗口化在流式连接中不是必须的。
流与表
- 数据处理管道(包括批处理和流处理)由表、流以及对这些表和流进行的操作组成
- 表是静态数据的存储容器,用于数据的积累和随时间观察
- 流是动态数据,编码了表随时间演化的离散视图
流表转换
流 -> 表:随着时间的推移,对流进行聚合会产生一张表
表 -> 流:观察表发生的变化,可以产生流
批处理中
graph TD A[Table] --> B[MapRead] B --> C[Stream] C --> D[Map] D --> E[Stream] E --> F[MapWrite] F --> G[Table] G --> H[ReduceRead] H --> I[Stream] I --> J[Reduce] J --> K[Stream] K --> L[ReduceWrite] L --> M[Table]