Flink Demo

TaskManager 会在同一个JVM 进程内以多线程的方式执行任务。和独立进程 相比,线程更加轻量并且通信开销更低,但无法严格地将任务彼此隔离。 因此只要有一个任务运行异常,就有可能“杀死” TaskManager整个进程,导致 它上面运行的所有任务都停止。如果将每个 TaskManager配置成只有一个slot, 则可以限制应用在 TaskManag 级别进行隔离,即每个 TaskManager 只运 行单个应用的任务。

为了执行应用的全部任务, Flink 需要足够数量的slot 假设Flink此设置包含4个TaskManager ,每个 TaskManager2个slot,那么 个流式应用最多支持以并行度8来运行。 如果有一个 TaskManager 出现故障,则可用处理槽的数量就降到了6个。 这时候 JobManager就会向ResourceManager 申请更多的slot。若无法完成(例如应用运行在一个独立集群上), JobManager 将无法重启应用,直至有足够数量的可用slot。 应用的重启策略决定了 JobManager 以何种频率重启应用以及重启尝试之间的等待间隔。

它用于控制流式应用执行以及保存该过程中的元数据(如已完成checkpoint的存储路径) 如果负 责管理的 JobManager 进程消失,流式应用将无怯继续处理数据。这就导致 JobManager 成为 Flink 应用中的 个单点失效组件。为了解决该问题,Flink 提供了高可用模式,支持在原 JobManager 消失的情况下将作业的管理职责及 元数据迁移到另 JobManager

  1. JobManager 在高可用模式下工作时,会将 JobGraph 以及全部所需的元数据(例如应用的 JAR 文件)写人一个远程持久化存储系统中。
  2. JobManager 还会将存储位的路径地址 ZooKeeper 的数据存储。

在应用执行过程中, JobManager 会接收每个任务检点的状态句柄(存储位置)。 在检查点即将完成的时候,如果所有任务已经将各自状态成功写入远程存储, JobManager 就会将状态句柄写入远程存储, 将远程位置的路径地址写入 ZooKeeper 。因此所有用于 JobManager 故障恢复的数据都在远程存储上面, ZooKeeper 持有这些存储位置的路径。

  1. 向ZooKeeper请求存储位置,以获取JobGraph,JAR文件及应用最新检 查点在远程存储的状态句柄。
  2. 向ResourceManager 申请处理槽来继续执行应用
  3. 重启应用并利用最近 次检查点重置任务状态。

TaskManager 负责将数据从发送任务传输至接收任务 。它的网络模块在记录传输前会先将它们收集到缓冲区中。 换言之,记录并非逐个发送的,而是在缓冲区中以批次形式发送。 该技术是有效利用网络资源、实现高吞吐的基础。它的机制类似于网络以及 磁盘 I/O 协议中的缓冲技术。

  • 每个 TaskManager 都有一个用于收发数据的网络缓冲地(每个缓冲默认 32KB 大小)。
  • 如果发送端和接收端的任务运行在不同的 TaskManager 进程中,它们就要用到操作系统的网络技进行通信。
  • 流式应用需要以流水线方式交换数据,因此每对 TaskManager 之间都要维护一个或多个永久的 TCP 连接来执行 数据交换。 shuffle 连接模式下,每个发送端任务都需要向任意一个接收 任务传输数据。
  • 对于每一个接收任务, TaskManager 都要提供一个专用的网络缓冲区,用于接收其他任务发来的数据。

当发送任务和接收任务处于同一个 TaskManager 进程时,发送任务会将要发 送的记录序列化到一个字节缓冲区中,一旦该缓冲区占满就会被放到一个队 列里。接收任务会从这个队列里获取缓冲区并将其中的记录反序列 。这意 味着同一个TaskManager 内不同任务之间的数据传输不会涉及网络通信。

基于信用值的流量控制

Flink实现了一个基于信用值的流量控制机制,它的工作原理如下 接收任务 会给发送任务授予一定的信用值,其实就是保留一些用来接收它数据的网络 缓冲。一旦发送端收到信用通知,就会在信用值所限定的范围 尽可能多 传输缓冲数据,并会附带上积压量(已经填满准备传输 网络缓冲数目)大 接收端使用保留的缓冲来处理收到的数据,同时依据各发送端的积压量信息 来计算所有相连的发送端在下一轮的信用优先级。

由于发送端可以在接收端有足够资源时立即传输数据,所以基于信用值 量控制有效降低延迟。 此外 ,信用值的授予是根据各发送端的数据积压 量来完成的,因此该制还能在出现数据倾斜( data skew )时有效 分配网 络资源。不难看出,基于信用值的流量控制是 Flink 实现高吞吐低延迟的重要 一环。

任务链接

任务链接的前提条件

  1. 多个算子必须有相同的井行度且通过本地转发通道 (local forward channe )相连

单线程执行的链接任务“融合” 了多个函数 ,并通过方法调用进行数据传输

时间戳将记录和特定时间点进行关联,这些时间点通常是记录所对应事件的发生时间。

水位线用于在事件时间应用中推断每个任务当前的事件时间。基于时间的算 子会使用这个时间来触发计算并推动进度前进。

包含带有时间戳的记录及水值线的数据流

水位线拥有两个基本属性

  1. 必须单调递增。这是为了确保任务中的事件时间时钟正确前进,不会倒退。
  2. 和记录的时间戳存在联系。 一个时间戳为T的水位线表示,接下来所有记 录的时间戳一定都大于T

水位线的意义之一在于它允许应用控制结果的完整性和延迟。如果水位线和 记录的时间戳非常接近,那结果的处理延迟就会很低,因为任务无须等待过 多记录就可以触发最终计算。但同时结果的完整性可能会受影响,因为可能 有部分相关记录被视为迟到记录,没能参与运算。相反,非常“保守”的水 位线会增加处理延迟,但同时结果的完整性也会有所提升。

水位线传播和事件时间

任务内部的时间服务(time service 会维护一些计时器( timer ),它们依赖接收到水位线来激活。 这些计时器是 由任务在时间服务内注册,并在将来的某个时间点执行计算。 例如:算子会为每个活动窗口注册一个计时器,它们会在事件时间超过窗口的结束时间清理窗口状态

当任务接收到一个水位线时会执行以下操作

  1. 基于水位线记录的时间戳更新内部事件时间时钟
  2. 任务的时间服务会找出所有触发时间小于更新后事件时间的计时器。对于每个到期的计时器, 周用 回调函数 ,利用它 执行计算或发 出记录
  3. 任务根据更新后的事件时间将水位线发出

每个分区作为一个数据流,都会含带有时间戳的记录以及水位线。根据算子的上下游连接情况,其任务可能需要同时接收来自多个输入分区的记录和水位线,也可能需要将它们发送到多个输出分区。

利用水位线更新任务的事件时间

Flink 的水位线处理和传播算在去保证了算子任务所发出的记录时间戳和水位线 -定会对齐。然而,这依赖于 个事实:所有分区都会持续提供自增的水位线。 只要有一个分区的水位线没有前进,或分区完全空闲下来不再发送任何记录 或水位线,任务的事件时间时钟就不会前进,继而导致计时器无法触发。 这种情形会给那些靠时钟前进来执行计算或清除状态的时间相关算子带来麻烦。

因此,如果一个任务没有从全部输入任务以常规间隔接收新的水位线,就会 导致时间相关算子的处理延迟或状态大小激增。

当算子两个输入流的水位线差距很大时,也会产生类似影响。对于一个有两 个输入流的任务而言,其事件时间时钟会受制于那个相对较慢的流,而较快 流的记录或中间结果会在状态、中缓冲,直到事件时间时钟到达允许处理它们 的那个点。

时间戳分自己和水位线生成

  1. 在数据源完成:我们可以利用 Source Function 在应用读入数据流的时候分 配时间戳和生成水位线。源 函数会发出一条记录流。每个发出的记录都可以 附加 个时间戳,水位线可以作为特殊记录在任何时间点发出。

  2. 周期分配器( periodic assigner)

  3. 定点分配器( punctuated assigner)

带有状态的流处理任务

算子状态(operator state )和键值分区状态( keyed state )

列表状态( list state) 将状态表示为一个条目列表。 联合列表状态( union list state) 同样是将状态表示为 个条目列表,但在进行故障恢复或从某个保存点启 动应用时,状态的恢复方式和普通列表状态有所不同 详细内容将在本章 稍后讨论 广播状态( broadcast state) 门为那些需要保证算子的每个任务状态都相同的场 而设计。这种相同 的特性将有利于检查点保存或算子扩缩容,我们同样会在本

键值分区状态会按照算子输入记录所定义的键值来进行维护或访问, Flink为每个键值都维护了一个状态实例,该实例总是位于那个处理对应键值记录的 算子任务上。当任务在处理一个记录时,会自动把状态的访问范围限制为当前记录的键值。

  • 单值状态( value state) 每个键对应存储 个任意类型的值,该值也可以是某个复杂数据结构。
  • 列表状态( list state) 每个键对应存储 个值的列表。列表中的条目可以是任意类型。
  • 映射状态( map state) 每个键对应存储 个键值映射,该映射的键和值可以是任意类型。

为了保证快速访问状态,每个并行任务都会把状态维护在本地。 至于状态具体的存储、访问和维护,则是由个名为状态后端的可插拔组件来决定。 状态后端主要负责两件事:

  1. 本地状态管理
  2. 将状态、以检查点的形式写入远程存储。

RocksDB 状态后端支持增量检查点。这对于大规模的状态而言,会显著降低生成检查点的开销。

在扩缩容时会根据新的任务数量对键值重新分区。 但为了降低状态在不同任务之间迁移的必要成本, Flink 不会对单独的键值实 施再分配,而是会把所有键值分为不同的键值组( key group )。 每个键值组都包含了部分键值, Flink 以此为单位把键值分配给不同任务。

在扩缩容时会对列表中的条目进行重新分配。理论 上,所有并行算子任务的列表条目会被统一收集起来,随后均匀分配到更少 或更多的任务之上。如果列表条目的数量小于算子新设置的并行度,部分 务在启动时的状态就可能为空。

算子会在扩缩容时把状态列表的全部条目广播到全 部任务上。随后由任务自己决定哪些条目该保留,哪些该丢弃。

在扩缩容时会把状态拷贝到全部新任务上。这样做的 原因是广播状态能确保所有任务的状态相同。在缩容的情况下 ,由于状态 经过制不会丢失

检查点、保存点及状态恢复

一致性检查点 Flink 的故障恢复机制需要基于应用状态的一致性检查点。有状态的流式应用 一致性检查点是在所有任务处理完等量的原始输入后对全部任务状态进行 个拷贝。我们可以通过一个朴素算怯对应用建立一致性检查点的过程进 行解释。朴素算法的步骤包括

  1. 暂停接收所有输入流。
  2. 等待已经流入系统的数据被完全处理,即所有任务已经处理完所有的输入 数据。
  3. 将所有任务的状态拷贝到远程持久化存储 ,生成检查点。在所有任务完成 自己的拷贝工作后,检查点生成完
  4. 恢复所有数据流的接收

负责从一个递增数字(1,2,3…)流中读取数据。 数字流会被分成奇数流和偶数流,求和算子的两个任务会分别对它们求和 数据源算子的任务会把输入流的当前偏移量存为状态;求和算子的任务会 当前和值存为状态。 Flink 会在输入偏移到达5的时候生成个检查点, 此时两个和值分别为 6和9

Flink 检查点算法 Flink查点是基于Chandy-Lamport 分布式快照算法来实现的。该算法不会暂停整个 应用,而是会把生成检查点的过程和处理过程分离,这样在部分任务持久

Flink 的检查点算怯中会用到一类名为检查点分隔符(checkpoint barrier )的特殊记 录。和水位线类似,这些检查点分隔符会通过数据 原算子注入到常规的记录流中。 相对其他记录,它们在流中的位置无告提前或延后。为了标识所属的检查点,每 个检查点分隔符都会带有一个检查点编号,这样就把一条数据流从逻辑上分成了 两个部分。所有先于分隔符的记录所引起的状态更改者 会被包含在分隔符所对 应的检查点之中;而所有晚于分隔符的记录所引起的状态更改者目会被纳入之 后的检查点中。

拥有两个有状态的数据源 两个有状态的任务 ,以及两个无状态数据汇的流式应

事sss