当前位置:首页 > 技术 > 正文内容

Apache Kafka:分布式消息系统的原理与实践

访客 技术 2026年7月2日 1

概述

Apache Kafka 是一种高性能、分布式的流处理平台,最初由 LinkedIn 开发并贡献给 Apache 基金会。它被设计用于构建实时数据管道和流式应用,支持高吞吐量的消息发布与订阅机制。Kafka 的核心是一个可持久化、多副本、分区的日志服务,广泛应用于日志聚合、事件溯源、监控指标收集以及异步通信等场景。

Kafka 核心概念

  • Producer(生产者):向 Kafka 主题发送消息的客户端。
  • Consumer(消费者):从主题中读取消息的应用程序,通常以消费组的形式组织。
  • Topic(主题):消息的逻辑分类,每条消息都归属于某个主题。
  • Partition(分区):一个主题可以划分为多个分区,提升并发处理能力和水平扩展性。
  • Broker(代理):Kafka 集群中的服务器节点,负责存储和转发消息。
  • ZooKeeper 或 KRaft:早期版本依赖 ZooKeeper 管理集群元数据;新版本支持 KRaft 协议实现去中心化协调。

消息传递语义保障

Kafka 支持三种级别的消息投递保证,开发者可根据业务需求进行配置:

  1. 最多一次(At Most Once):消息可能丢失但不会重复。启用自动提交偏移量即可实现:
    props.put("enable.auto.commit", "true");
  2. 至少一次(At Least Once):确保消息不丢失,但可能出现重复。需关闭自动提交,并手动异步提交:
    props.put("enable.auto.commit", "false");
    consumer.commitAsync();
  3. 精确一次(Exactly Once):通过幂等生产者和事务机制实现。在发生异常时重置消费位置以避免重复处理:
    try {
        // 消费消息
    } catch (Exception e) {
        consumer.seek(partition, record.offset());
    }

优势与局限

优点

  • 高吞吐低延迟:利用 Linux 零拷贝技术减少内核态切换,显著提升 I/O 性能。
  • 强持久化能力:消息写入磁盘并支持多副本备份,保障数据可靠性。
  • 横向扩展性强:通过增加分区和 Broker 实现无缝扩容。
  • 容错机制完善:Controller 故障后可通过选举机制快速恢复,维持集群稳定运行。

限制

  • 仅保证单个分区内消息有序,跨分区顺序无法保证。
  • 传统架构依赖 ZooKeeper,增加了运维复杂度(新版已逐步淘汰)。
  • "脑裂"问题可能导致短暂状态不一致,需合理设置超时参数。

典型应用场景

  • 用户行为追踪:将注册、登录、点击等事件按类型划分到不同主题,供后续分析使用。
  • 集中式日志系统:结合 Flume 或 Filebeat 收集应用日志,统一写入 Kafka 并导入 HDFS、Hive 或 Elasticsearch。
  • 多路数据分发:作为中间通道将同一份数据同时输出至数据库、缓存和数据分析系统。

API 使用示例

命令行操作(Shell)

启动本地单机环境:

# 启动 ZooKeeper 和 Kafka 服务
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

# 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test-topic

# 查看所有主题
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 启动控制台生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

# 启动控制台消费者(从头开始读取)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic --from-beginning

Java API 示例

Maven 依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

生产者代码:

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        String topic = "test-topic";

        try {
            for (int i = 1; i <= 10; i++) {
                String key = "key-" + i;
                String value = "message-" + i;
                ProducerRecord<String, String> record =
                    new ProducerRecord<>(topic, 0, key, value);
                producer.send(record);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            producer.close();
        }
    }
}

消费者代码:

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset=%d, key=%s, value=%s%n",
                        record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

管理操作实战

重置消费组偏移量

查看当前消费进度:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group test-group

重置为最新位置(跳过历史消息):

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group test-group --reset-offsets --to-latest --execute --topic test-topic

注意:必须先停止所有该组下的活跃消费者,否则会提示"Group is active"错误。

调整单条消息最大大小

修改服务器配置文件 server.properties

# 允许接收的最大请求大小(默认约100MB)
socket.request.max.bytes=209715200

# Topic 层面允许的最大消息字节
message.max.bytes=209715200

重启 Broker 生效。若仅对特定主题生效,也可使用动态配置命令设置。

自定义主题数据保留时间

将名为 toptest 的主题数据保留周期设为 24 小时:

bin/kafka-configs.sh --zookeeper localhost:2181 \
--entity-type topics --entity-name toptest \
--alter --add-config retention.ms=86400000

验证配置结果:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic toptest

如需恢复默认策略,删除配置项即可:

bin/kafka-configs.sh --zookeeper localhost:2181 \
--entity-type topics --entity-name toptest \
--alter --delete-config retention.ms

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。