当前位置:首页 > 技术 > 正文内容

Flink 任务容错机制详解

访客 技术 2026年6月11日 1

Flink 的容错能力核心依赖于其内置的检查点(Checkpoint)机制,该机制确保在算子因异常中断时,整个流处理作业能够恢复至故障前的一致状态,从而实现端到端的精确一次(Exactly Once)语义。其底层原理源自 Chandy-Lamport 分布式快照算法。

每个启用检查点的 Flink 作业在启动时,由 JobManager 创建一个 CheckpointCoordinator,负责协调整个作业的快照流程。

  1. CheckpointCoordinator 定期向所有数据源(Source)发送 barrier 标记。
  2. 当某个 Source 收到 barrier 后,暂停处理当前数据,将自身状态序列化并持久化至外部存储,完成后向 Coordinator 报告,并将该 barrier 转发给下游算子,恢复处理。
  3. 下游算子接收到 barrier 后,同样暂停处理,保存本地状态,报告进度,并继续向下传播 barrier。
  4. 此过程持续传递直至 sink 算子完成状态保存,标志着一次完整检查点结束。
  5. 当 Coordinator 收集到所有算子的确认信息,即判定本次快照成功;若超时未收到全部响应,则视为失败。

对于具有多个输入流的算子(如双路合并),系统会延迟处理,直到所有上游输入均到达相同编号的 barrier。此时才触发本地状态快照,并统一向下游广播合并后的 barrier。

两个输入源的检查点同步示意图:

  1. 算子 C 接收来自 A 与 B 的输入流。
  2. 若第 i 个周期中,来自 A 的 barrier 先抵达,算子 C 将阻塞来自 A 的通道,仅接收 B 的数据。
  3. 当来自 B 的同编号 barrier 到达后,算子 C 触发状态保存,将两路 barrier 合并为一条,向下游统一转发。

一旦发生故障,CheckpointCoordinator 会通知所有算子回滚至最近成功的检查点状态,重新开始数据处理,保证数据不丢失、不重复。

检查点存储后端

目前支持三种状态后端配置:

内存型状态后端(MemStateBackend)

  • 快照数据驻留在 JobManager 内存中。
  • 仅适用于小规模测试场景,不推荐用于生产环境。

文件系统状态后端(FsStateBackend)

  • 状态数据写入分布式或本地文件系统。
  • 支持 HDFS(路径以 hdfs:// 开头)或本地文件(file:///)。
  • 部署于集群时应避免使用本地路径,否则跨节点恢复将导致状态不可读。

RocksDB 状态后端(RocksDBStateBackend)

  • 结合本地 RocksDB 存储与远程持久化能力。
  • 默认将状态暂存于本地磁盘,检查点时上传至 HDFS 或其他远程存储。
  • 使用方式:new RocksDBStateBackend("hdfs://...")new RocksDBStateBackend("file:///...")
  • 特别注意:若使用自定义窗口且状态以 ListState 形式管理,性能可能显著下降,因 RocksDB 对大列表读取效率较低。建议根据实际需求选择 FsStateBackend + HDFSRocksDBStateBackend + HDFS

检查点配置语法

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 每 1 秒触发一次检查点
env.enableCheckpointing(1000)

// 设置检查点模式为精确一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 设置最大超时时间为 60 秒
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 是否在快照失败时终止任务
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// 允许同时执行的最大检查点数
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

应用示例:每 6 秒进行一次检查点,统计滑动窗口内数据量

需求说明:

  • 每秒生成约 10,000 条数据。
  • 数据结构:(id: Long, name: String, info: String, count: Int)
  • 每隔 1 秒对过去 4 秒内的数据进行聚合统计。
  • 每 6 秒触发一次检查点,结果输出至控制台。

实现代码:

case class SEvent(id: Long, name: String, info: String, count: Int)

class SEventSourceWithChk extends RichSourceFunction[SEvent] {
  private var count = 0L
  private var isRunning = true
  private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

  override def cancel(): Unit = isRunning = false

  override def run(sourceContext: SourceContext[SEvent]): Unit = {
    while (isRunning) {
      for (_ <- 0 until 10000) {
        sourceContext.collect(SEvent(1, s"hello-$count", alphabet, 1))
        count += 1L
      }
      Thread.sleep(1000)
    }
  }
}

object FlinkEventTimeAPIChkMain {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointInterval(6000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = env.addSource(new SEventSourceWithChk)
    source.assignTimestampsAndWatermarks(
      new AssignerWithPeriodicWatermarks[SEvent] {
        override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())
        override def extractTimestamp(t: SEvent, l: Long): Long = System.currentTimeMillis()
      }
    )
    .keyBy(0)
    .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
    .apply(new WindowStatisticWithChk)
    .print()

    env.execute("Window Checkpoint Example")
  }
}

class UDFState extends Serializable {
  private var count = 0L
  def setState(s: Long): Unit = count = s
  def getState: Long = count
}

class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState] {
  private var total = 0L

  override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
    val count = input.size.toLong
    total += count
    out.collect(count)
  }

  override def restoreState(state: util.List[UDFState]): Unit = {
    total = state.get(0).getState
  }

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
    val list = new util.ArrayList[UDFState]
    val state = new UDFState
    state.setState(total)
    list.add(state)
    list
  }
}
标签: FlinkCheckpoint

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。