Kafka——kafkaProducer 分析

由于本人最近在学习 kafka,看了kafka 的源码解析以及厮大的深入理解 kafka 之后决定自己在源码 debug 更加深入的学习 kafka。
先从 producer 看起:

在这里插入图片描述


运行 zookeeper,kafka server, producer 之后。在控制台随意输入一条消息进行 debug。
首先他会把消息封装成 ProducerRecord

在这里插入图片描述


主要的6个参数:
headers:可以是多个 header 组成也可以为 null 也可以是单个 header。这个 header 会和 kafka 的幂等,事务有所关联
timestamp:如果没有指定的话,会默认使用System.currentTimeMillis()当前时间戳。
key:如果指定 key 的话 会影响到 log compression 以及分区
value:具体的消息内容
topic:主题
partition:分区

之后通过 KafkaProducer 发送消息:

在这里插入图片描述


1.首先如果你经过的是拦截器。默认是没有拦截器的,可以自己实现拦截器实现ProducerInterceptor 接口即可。

在这里插入图片描述


其中有两个方法

在这里插入图片描述


在这里插入图片描述


在 onSend 中可以进行自己的业务逻辑,onAcknowledgement 可以对 ack 响应进行预处理,这个方法是最先获取从服务端获取的 response。

在这里插入图片描述


之后 producer 调用了 doSend 方法

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


该方法的整体流程:
2.首先根据 topic,partition,maxBlcokTimeMs(等待更新 kafka 元数据的最长时间默认60000ms)在 waitOnMetadata方法中获取该 topic 的元数据 如果集群中没有该topic 那就会添加,通过partitionCountForTopic(topic)获取该 topic 下的分区数。如果有如果有缓存的元数据,并且记录的分区没有定义,则返回缓存的元数据。
返回元数据以及等待时间:

元数据


元数据

元数据


3.然后对key以及 value 进行序列化。这里也是可以自定义序列化 只要实现 Serializer 和 Deserializer 即可

在这里插入图片描述


4.下一步操作就是选择消息的分区,优先根据 ProducerRecord 指定的分区,如果没有则通过partitioner.partition 方法进行分区

在这里插入图片描述


在这里插入图片描述


具体的分区策略:

在这里插入图片描述


首先根据 topic 找出该 topic 下的所有 partitions 的数量
keyBytes 是 producerRecord 中的 key序列化之后的值,所以说如果指定 key 是会对分区造成影响
如果有 key 会对 key 进行 hash(murmur2这种hash 的方式) 然后跟分区数量进行取模
如果没有key且可用分区大于0的话:通过 counter 和可用分区进行取模 counter 不断自增
如果没有key且可用分区小于0的话:通过 counter 和分区进行取模 counter 不断自增

5.然后通过计算出来的 partition 和 topic 封装成 TopicPartition 形成 topic-partition 的映射关系
6.预估这个 record 处理大小的上限值,没有考虑到压缩算法的开销
7.生产回调讲调用拦截器的回调,这就是为什么之前说可以在onAcknowledgement做预处理
8.判断是否开启事务
9.把TopicPartition,时间戳,序列化 key,序列化值,请求头,拦截器回调,剩余等待时间 预估一下需要处理的值放入 buffer 存入RecordAccumulator 中
10.如果batch已经满了或者创建了新的batch 唤醒 sender 所以说 KafkaProducer 它不会对消息进行发送,而且将消息进行处理存入RecordAccumulator 中,有 sender 线程进行发送数据。

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...