Kafka 消息发送确认机制:防止程序关闭引起的数据丢失
在使用 spring-kafka 开发时,一个常见的隐患是:当程序通过信号量(如 SignalHandler)优雅关闭时,如果在 kafkaTemplate.send() 调用后、消息被实际发送到 Kafka broker 之前程序终止,就可能导致已发送的消息丢失。要避免这种情况,需要理解 spring-kafka 的消息发送确认机制。
环境准备
| 组件 | 版本 |
|---|---|
| JDK | 8 |
| Kafka | 2.0.1 |
| spring-boot | 2.1.8.RELEASE |
spring-kafka 消息发送核心方法
spring-kafka 提供了多种发送消息的 send 方法,均来自 KafkaOperations 接口,KafkaTemplate 实现了这些方法。以下是常见签名:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
每个方法都返回 ListenableFuture 对象。在 KafkaTemplate 内部,多个 send 最终会调用 doSend 方法:
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
if (this.transactional) {
Assert.state(inTransaction(),
"No transaction is in process; "
+ "possible solutions: run the template operation within the scope of a "
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
+ "before invoking the template method, "
+ "run in a transaction started by a listener container when consuming a record");
}
final Producer<K, V> producer = getTheProducer();
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
return future;
}
该方法返回 SettableListenableFuture,它继承了 Future 接口。设计意图是实现异步计算:先将消息放入待发送队列(由 ProducerBatch 管理),累积到一定条件后统一发送,以提高效率。由于具体发送时间不确定,应避免粗暴调用 get() 阻塞等待,转而使用回调函数监听发送结果。
消息发送确认策略
1. 同步阻塞
调用 Future.get() 并设定超时,强制等待发送结果。代码示例如下:
try {
kafkaTemplate.send(topic, partition,
String.valueOf(Math.round(Math.random() * 100)),
message).get(100, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
2. 异步回调
直接获取 ListenableFuture 并注册回调,发送完成后自动触发:
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, partition,
String.valueOf(Math.round(Math.random() * 100)),
message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.trace("消息成功发送,主题:{}", topic);
}
@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败,主题:{},异常:{}", topic, ex.getMessage());
handleFailureData(topic, message);
}
});
在生产中推荐使用异步回调方式,既不阻塞主流程,又能及时感知发送结果,在 onFailure 中记录失败消息以便后续补偿。
要点总结
spring-kafka 的 send 方法本质是异步操作,返回的 ListenableFuture 提供了结果查询和回调机制。通过在 onFailure 回调中处理发送失败的消息(如记录到日志或重试队列),可以有效防止因程序退出导致的数据丢失。