Flink 任务容错机制详解
Flink 的容错能力核心依赖于其内置的检查点(Checkpoint)机制,该机制确保在算子因异常中断时,整个流处理作业能够恢复至故障前的一致状态,从而实现端到端的精确一次(Exactly Once)语义。其底层原理源自 Chandy-Lamport 分布式快照算法。
每个启用检查点的 Flink 作业在启动时,由 JobManager 创建一个 CheckpointCoordinator,负责协调整个作业的快照流程。

- CheckpointCoordinator 定期向所有数据源(Source)发送 barrier 标记。
- 当某个 Source 收到 barrier 后,暂停处理当前数据,将自身状态序列化并持久化至外部存储,完成后向 Coordinator 报告,并将该 barrier 转发给下游算子,恢复处理。
- 下游算子接收到 barrier 后,同样暂停处理,保存本地状态,报告进度,并继续向下传播 barrier。
- 此过程持续传递直至 sink 算子完成状态保存,标志着一次完整检查点结束。
- 当 Coordinator 收集到所有算子的确认信息,即判定本次快照成功;若超时未收到全部响应,则视为失败。
对于具有多个输入流的算子(如双路合并),系统会延迟处理,直到所有上游输入均到达相同编号的 barrier。此时才触发本地状态快照,并统一向下游广播合并后的 barrier。
两个输入源的检查点同步示意图:

- 算子 C 接收来自 A 与 B 的输入流。
- 若第 i 个周期中,来自 A 的 barrier 先抵达,算子 C 将阻塞来自 A 的通道,仅接收 B 的数据。
- 当来自 B 的同编号 barrier 到达后,算子 C 触发状态保存,将两路 barrier 合并为一条,向下游统一转发。
一旦发生故障,CheckpointCoordinator 会通知所有算子回滚至最近成功的检查点状态,重新开始数据处理,保证数据不丢失、不重复。
检查点存储后端
目前支持三种状态后端配置:
内存型状态后端(MemStateBackend)
- 快照数据驻留在 JobManager 内存中。
- 仅适用于小规模测试场景,不推荐用于生产环境。
文件系统状态后端(FsStateBackend)
- 状态数据写入分布式或本地文件系统。
- 支持 HDFS(路径以
hdfs://开头)或本地文件(file:///)。 - 部署于集群时应避免使用本地路径,否则跨节点恢复将导致状态不可读。
RocksDB 状态后端(RocksDBStateBackend)
- 结合本地 RocksDB 存储与远程持久化能力。
- 默认将状态暂存于本地磁盘,检查点时上传至 HDFS 或其他远程存储。
- 使用方式:
new RocksDBStateBackend("hdfs://...")或new RocksDBStateBackend("file:///...")。 - 特别注意:若使用自定义窗口且状态以
ListState形式管理,性能可能显著下降,因 RocksDB 对大列表读取效率较低。建议根据实际需求选择FsStateBackend + HDFS或RocksDBStateBackend + HDFS。
检查点配置语法
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// 每 1 秒触发一次检查点
env.enableCheckpointing(1000)
// 设置检查点模式为精确一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置最大超时时间为 60 秒
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 是否在快照失败时终止任务
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
// 允许同时执行的最大检查点数
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
应用示例:每 6 秒进行一次检查点,统计滑动窗口内数据量
需求说明:
- 每秒生成约 10,000 条数据。
- 数据结构:
(id: Long, name: String, info: String, count: Int)。 - 每隔 1 秒对过去 4 秒内的数据进行聚合统计。
- 每 6 秒触发一次检查点,结果输出至控制台。
实现代码:
case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent] {
private var count = 0L
private var isRunning = true
private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
override def cancel(): Unit = isRunning = false
override def run(sourceContext: SourceContext[SEvent]): Unit = {
while (isRunning) {
for (_ <- 0 until 10000) {
sourceContext.collect(SEvent(1, s"hello-$count", alphabet, 1))
count += 1L
}
Thread.sleep(1000)
}
}
}
object FlinkEventTimeAPIChkMain {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointInterval(6000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = env.addSource(new SEventSourceWithChk)
source.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks[SEvent] {
override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())
override def extractTimestamp(t: SEvent, l: Long): Long = System.currentTimeMillis()
}
)
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
.apply(new WindowStatisticWithChk)
.print()
env.execute("Window Checkpoint Example")
}
}
class UDFState extends Serializable {
private var count = 0L
def setState(s: Long): Unit = count = s
def getState: Long = count
}
class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState] {
private var total = 0L
override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
val count = input.size.toLong
total += count
out.collect(count)
}
override def restoreState(state: util.List[UDFState]): Unit = {
total = state.get(0).getState
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
val list = new util.ArrayList[UDFState]
val state = new UDFState
state.setState(total)
list.add(state)
list
}
}