Apache Kafka 最初由 LinkedIn 开发,通过 Scala 和 Java 编写的一个分布式消息系统,在 2011 年成为 Apache 的孵化项目,随后于 2012 年成为 Apache 的主要项目之一。
因为其可扩展、高吞吐、高可用等特性被广泛应用在大规模的消息处理场景中,一些常见的流处理工具都支持与 Kafka 的集成。
简介
消息队列是服务常用的中间件,在消息系统最常见的几个场景:
- 系统解耦,将非核心的任务以异步方式调用,例如交易系统中的下单、减库存原子操作完成,通知则异步发送。
- 削峰填谷,业务需要允许异步处理,对实时性要求不高,通常处理机器资源有限,但是偶尔会有较大的业务量,这样可以将请求先临时缓存。
- 消息订阅,通常就是包含某个消息源,然后多个系统需要接收此消息进行处理。
Kafka 是一种基于发布/订阅机制的分布式消息系统,其主要的特性或者设计目标有:
- 支持高性能、在线水平扩容。
- 以时间复杂度为
O(1)
的方式提供消息持久化能力,支持 TB 级的消息处理。 - 支持分布式消费,可以保证每个 Partition 内的消息顺序传输。
基本概念
如下简单介绍一下 Kafka 中一些常见的概念:
- Broker 服务器进程,负责接收并保存生产者消息以及响应消费者,会选举一台作为 Leader 管理分区状态、监听元数据变化。
- Topic 逻辑概念,可以理解为一个队列,屏蔽了底层实现,物理上会再划分为多个 Partition,通常由多个 Broker 组成,增加吞吐量以及提供高可用。
- Partition 分区,一个 Topic 分成多个 Partition ,每个 Partition 作为 Topic 的持久化单元,生产者可以根据 Key 的哈希选择 Partition 存储,也可以随机选择。
- Producer 生产者,发送消息到指定 Topic ,可以配置发送到 Partition 的策略,支持批量发送,所以吞吐量很高。
- Consumer 消费者,接收消息,不过需要周期性的轮询,而非订阅方式。
- Consumer Group 消费组,逻辑概念,用于实现单播和广播两种消息模型,同一个 Topic 的数据会广播给不同的 Group,而同一个 Group 中只有一个 Worker 能拿到这个数据。
- Message 消息,包括了 Key (可以是空) 和 Content ,其中 Key 可以作为路由。
- Replication 副本,提供高可用,每个 Partition 可以存在 N 个副本,包含了一个主副本,消息会从主副本复制到从副本上。
- Offset 偏移量,用来记录消息消费的偏移,也就是某个 Consumer Group 在某个 Partiton 中当前已经消费的位置。
其中 Topic 可以在逻辑上认为是一个 Queue ,为了保证 Kafka 的吞吐量可以线性扩展,可以将 Topic 分成了多个 Partition ,每个 Partition 会对应磁盘上的一个文件夹,保存了所有的消息以及索引文件。
Partition 作为底层数据的物理存储单元,将同一 Topic 的数据分散存储到不同的机器上,从而方便水平扩展,避免单机的容量、性能的限制,同时可以通过复制来增加数据冗余性,提高容灾能力,为了做到均匀分布,通常 Partition 的数量通常是 Broker 数量的整数倍。
安装部署
在 2.8 版本之前,Kafka 会强依赖于 Zookeeper 管理元数据,包括 Broker 会监听 ZooKeeper 中的数据变化,而之后,Kafka 内置了基于 RAFT 的元数据管理。
部署一个单机的Kafka节点,直接从 kafka.apache.org 下载相应版本。
基于Zookeeper
----- 已经包含了一个ZooKeeper的安装包,默认数据保存在/tmp目录下,会在前台运行
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> logs/zookeeper.out 2>&1 </dev/null &
----- 修改默认的主机配置,关键是如下三个配置项,如果要在单台机器上启动多个进程,同样需要保证如下配置不同
$ cat config/server.properties
broker.id=0
log.dirs=kafka-logs-0
listeners=PLAINTEXT://IP-ADDR:9092
zookeeper.connect=localhost:2181
----- 启动Kafka集群
$ nohup bin/kafka-server-start.sh config/server.properties >> logs/kafka.out 2>&1 </dev/null &
这里大部分的与分区相关的元数据不在保存在 ZooKeeper 中的,不过还有功能是依赖的。
基于KRaft
也就是完全通过 Kafka 自身管理元数据了。
----- 生成kraft相关元数据,详见kraft/server.properties中的log.dirs配置项
$ bin/kafka-storage.sh random-uuid
$ bin/kafka-storage.sh format -t axef7030SpWLCFKORT0NzA -c config/kraft/server.properties
$ nohup bin/kafka-server-start.sh config/kraft/server.properties >> logs/kafka.out 2>&1 </dev/null &
其中比较关键的配置项有如下几个:
# 其中 controller 角色就是 raft 的控制器
process.role=broker,controller
# 类似于broker.id参数,这个是用来区分raft节点
node.id=1
# 用于选举的节点信息
controller.quorum.voters=1@localhost:9093
在 config/kraft
目录下包含了多个配置文件,可以直接修改 log.dirs
的配置项。