一个典型的 Kafka 体系架构包括若干 Producer、若干 broker、若干 Consumer,以及一个ZooKeeper集群。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到broker,broker负责将收到的消息存储到磁盘中,而Consumer负责从broker订阅并消费消息。
一、生产者
生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
二、消费者
消费者:消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。
偏移量:偏移量是一个不断递增的整数值,在创建消息时,Kafka会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka上,如果消费者关闭或重启,它的读取状态不会丢失。
消费者群组:消费者是消费者群组的一部分,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。
三、broker
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个broker可以轻松处理数千个分区以及每秒百万级的消息量。
broker是集群的组成部分。每个集群都有一个broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给broker和监控broker。在集群中,一个分区从属于一个broker,该broker被称为分区的首领。一个分区可以分配给多个broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。
保留消息:Kafka broker默认的消息保留策略是这样的:要么保留一段时间(比如7天),要么保留到消息达到一定大小的字节数(比如1GB)。当消息数量达到这些上限时,旧消息就会过期并被删除,所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。
四、主题、分区
kafka的消息通过主题进行分类,主题可以被分为若干个分区,消息以追加的方式写入分区,然后以先入先出的顺序读取。由于一个主题包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
如图,主题中有 4 个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker,以此来提供比单个broker更强大的性能。
1、broker配置
- log.dirs:Kafka把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs来指定的。
- auto.create .topics.enable:默认情况下,Kafka会在如下几种情形下自动创建主题
当一个生产者开始往主题写入消息时。
当一个消费者开始从主题读取消息时。
当任意一个客户端向主题发送元数据请求时。
- num.partitions:指定了新创建的主题将包含多少个分区。注意,我们可以增加主题分区的个数,但不能减少分区的个数。
- log.retention.hours:决定数据可以被保留的时间,默认168小时,也就是一周。通过检查磁盘上日志片段文件的最后修改时间来实现的,一般指日志片段的关闭时间。
- log.retention.bytes:通过保留的字节数来判断消息是否过期,作用在每个分区上,例如,如果一个包含8个分区的主题,log.retention.bytes被设为1GB,那么这个主题最多可以保留8GB的数据,所以当主题的分区个数增加时,整个主题可以保留的数据也随之增加。如果同时指定了log.retention.bytes和log.retention.ms,只要任意一个条件满足,消息就会被删除。
- log.segment.bytes:当日志片段大小达到log.segment.bytes指定的上限时(默认1GB),当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。过大可能会导致很长一段时间才能填满一个日志片段。
- log.segment.ms:控制日志片段关闭时间,指定了多长时间之后日志片段会被关闭。
- message.max.bytes:限制单个消息的大小,默认1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker返回的错误信息。值过大会导致负责处理网络连接和请求的线程花更多的时间来处理这些请求,增大磁盘写入块的大小,影响IO吞吐量。消费者客户端设置的fetch.message.max.bytes必须与服务器端设置的消息大小进行协调。如果这个值比message.max.bytes小,那么消费者就无法读取比较大的消息,导致出现消费者被阻塞的情况。
2、broker首领
集群里第一个启动的 broker 通过在 Zookeeper 里创建一个临时节点/controuer 让自己成为控制器。 其他 broker 在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意 识”到控制器节点已存在, 也就是说集群里已经有一个控制器了 。 其他 broker 在控制器节点上创建 Zookeeperwatch 对象,这样它们就可以收到这个节点 的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
控制器其实就是一个 broker,只不过它除了具有一般 broker 的功能之外,还负责分区首领的选举。
3、分区首领
Kafka 为分区引入了多副本(Replica)机制,可通过增加副本数量来提升容灾能力。同一分区的副本保存的是相同的消息(不过在同一时刻,副本之间并非完全一样)。副本之间是 “一主多从” 的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的broker中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。
4、消费者首领
当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka 内置了两种分配策略。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
四、Zookeeper群组
Kafka使用Zookeeper保存集群的元数据信息和消费者信息。Zookeeper使用的是一致性协议,建议每个群组里应该包含奇数个节点,只有当群组里的大多数节点处于可用状态,Zookeeper才能处理外部的请求,但节点过多会降低整个群组的性能。