当前位置:首页 > 技术 > 正文内容

Kafka 消息发送确认机制:防止程序关闭引起的数据丢失

访客 技术 2026年5月24日 3

在使用 spring-kafka 开发时,一个常见的隐患是:当程序通过信号量(如 SignalHandler)优雅关闭时,如果在 kafkaTemplate.send() 调用后、消息被实际发送到 Kafka broker 之前程序终止,就可能导致已发送的消息丢失。要避免这种情况,需要理解 spring-kafka 的消息发送确认机制。

环境准备

组件版本
JDK8
Kafka2.0.1
spring-boot2.1.8.RELEASE

spring-kafka 消息发送核心方法

spring-kafka 提供了多种发送消息的 send 方法,均来自 KafkaOperations 接口,KafkaTemplate 实现了这些方法。以下是常见签名:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);

每个方法都返回 ListenableFuture 对象。在 KafkaTemplate 内部,多个 send 最终会调用 doSend 方法:

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    if (this.transactional) {
        Assert.state(inTransaction(),
            "No transaction is in process; "
                + "possible solutions: run the template operation within the scope of a "
                + "template.executeInTransaction() operation, start a transaction with @Transactional "
                + "before invoking the template method, "
                + "run in a transaction started by a listener container when consuming a record");
    }
    final Producer<K, V> producer = getTheProducer();
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    producer.send(producerRecord, buildCallback(producerRecord, producer, future));
    if (this.autoFlush) {
        flush();
    }
    return future;
}

该方法返回 SettableListenableFuture,它继承了 Future 接口。设计意图是实现异步计算:先将消息放入待发送队列(由 ProducerBatch 管理),累积到一定条件后统一发送,以提高效率。由于具体发送时间不确定,应避免粗暴调用 get() 阻塞等待,转而使用回调函数监听发送结果。

消息发送确认策略

1. 同步阻塞

调用 Future.get() 并设定超时,强制等待发送结果。代码示例如下:

try {
    kafkaTemplate.send(topic, partition,
                        String.valueOf(Math.round(Math.random() * 100)),
                        message).get(100, TimeUnit.SECONDS);
    handleSuccess(data);
}
catch (ExecutionException e) {
    handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
    handleFailure(data, record, e);
}

2. 异步回调

直接获取 ListenableFuture 并注册回调,发送完成后自动触发:

ListenableFuture<SendResult<String, String>> future = 
    kafkaTemplate.send(topic, partition,
                       String.valueOf(Math.round(Math.random() * 100)),
                       message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    @Override
    public void onSuccess(SendResult<String, String> result) {
        log.trace("消息成功发送,主题:{}", topic);
    }

    @Override
    public void onFailure(Throwable ex) {
        log.error("消息发送失败,主题:{},异常:{}", topic, ex.getMessage());
        handleFailureData(topic, message);
    }
});

在生产中推荐使用异步回调方式,既不阻塞主流程,又能及时感知发送结果,在 onFailure 中记录失败消息以便后续补偿。

要点总结

spring-kafkasend 方法本质是异步操作,返回的 ListenableFuture 提供了结果查询和回调机制。通过在 onFailure 回调中处理发送失败的消息(如记录到日志或重试队列),可以有效防止因程序退出导致的数据丢失。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。