问题描述
我有一个包含15个分区[0-14]的kafka主题,并且正在以5个并行度运行flink。因此,理想情况下,每个并行flink使用者均应使用3个分区。但是,即使在多次重新启动后,也没有任何flink从属服务器订阅过kafka分区。
org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-13,topic_name-8,topic_name-9
org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-11,topic_name-12,topic_name-13
org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-14,topic_name-0,topic_name-10
org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-5,topic_name-6,topic_name-10
org.apache.kafka.clients.consumer.KafkaConsumer assign Subscribed to partition(s): topic_name-2,topic_name-3,topic_name-7
从上面的日志中可以看到,分区10和13已被2个使用者预订,分区1和4根本没有预订。
注意:如果我以1个并行度开始工作,则该工作将正常进行。
Flink版本:1.3.3
解决方法
这听起来像https://issues.apache.org/jira/browse/FLINK-7143。
通读Jira票证和拉取请求(https://github.com/apache/flink/pull/4301)中的详细信息,这听起来像是如果您使用的是Flink 1.3.x,则只有进行全新的重启才能受益于此错误修复程序。从保存点重新启动不足以从此修复程序中受益。