Spring Batch Kafka Consumer Job:跨多个JVM进程的荣誉消息组

问题描述

我有一个简单的Spring Batch Kafka Consumer Job,它可以从Kafka主题中读取数据并将数据写入文件中。

我想到了生成 5个实例的我的Kafka消费者工作,以便该工作可以更快地完成。也就是说,我启动了5次程序,以便在自己的JVM进程中启动 5个消费者Jobs

方法的直接问题是将有5个进程写入同一文件。我通过在文件名后附加一个唯一的进程ID来解决此问题。我更新的writer bean如下:

private static final String UNIQUE_PROCESS_IDENTIFIER = System.currentTimeMillis();    

@Bean
public FlatFileItemWriter<String> testFileWriter() {
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(
            "I:/CK/data/output_from_consumer_"+UNIQUE_PROCESS_IDENTIFIER+".dat"));
    writer.setAppendAllowed(false);
    writer.setShouldDeleteIfExists(true);
    DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter(",");
    writer.setLineAggregator(lineAggregator);
    return writer;
}

通过将时间戳附加到输出文件名,可以确保每个Consumer JVM进程都写入其自己的文件

当我最终启动同一程序的5个实例(JVM进程)时,我的期望是,如果在其自己的JVM进程中运行的一个使用者作业从分区中读取一条消息,则在他们自己的JVM进程中运行的其他使用者作业将不会从同一分区再次读取同一条消息(因为所有5个Java进程将使用相同的使用者组,即 mygroup

但是,我可以看到每个使用者作业进程(JVM)最终都读取了所有消息。结果,我现在有5个文件,每个文件包含相同的内容。示例输出文件名以及每个文件中的记录数,以进行更好的说明:

output_from_consumer_1600530320385.dat -> 1 million records
output_from_consumer_1600530335555.dat -> 1 million reocrds
output_from_consumer_1900530335555.dat -> 1 million records
output_from_consumer_1900530335556.dat -> 1 million records
output_from_consumer_1900730334556.dat -> 1 million records

Total records: 5 million

问题:如何配置Spring Batch作业,以使即使使用该使用者作业启动了多个Java进程,该Java进程也只能读取尚未由同一组中的使用者读取的数据。是作为单独的Java进程启动的?

这是我的预期输出(仅代表):

output_from_consumer_1600530320385.dat -> 100,000 records
output_from_consumer_1600530335555.dat -> 200,000 records
output_from_consumer_1900530335555.dat -> 200,000 records
output_from_consumer_1900530335556.dat -> 400,000 records
output_from_consumer_1900730334556.dat -> 100,000 records 

Total records : 1 million

解决方法

在同一组ID中运行具有相同消费者ID的多个Kafka消费者实例并不能帮助您实现并行性。

可以通过使用多个具有不同消费者ID和相同消费者组ID的消费者来实现Kafka消费者中的并行化。消费者组是一个组中多个消费者的分组机制。数据在组的所有使用者之间平均分配,组中没有两个使用者接收相同的数据。

在将分区分配给使用者之前,Kafka首先会检查是否存在具有给定组ID的现有使用者。 当不存在具有给定组ID的现有使用者时,它将为该新使用者分配该主题的所有分区。 当已经有两个使用给定组ID的消费者并且第三个消费者想要使用相同的组ID进行消费时。它将在所有三个使用者之间平均分配分区。不会将两个具有相同group-id的使用者分配到同一分区。

示例 假设有一个包含4个分区和两个使用者的主题,consumer-Aconsumer-B希望通过组ID为my-consumer-group的使用者使用,那么Kafka将为每个使用者分配相等数量的分区consumer-A2 to the consumer-B的2。

在您的用例中,由于Kafka主题包含4个分区,因此您可以使用4个使用者,每个使用者具有不同的使用者ID和相同的组ID。

,

创建KafkaItemReader时,可以指定要从哪个分区读取:

KafkaItemReader reader = new KafkaItemReader(myConsumerProperties,"topic1",0)

上述阅读器将从0中的分区topic1中读取消息。因此,在您的情况下,您可以并行运行作业,并配置每个作业以读取来自不同主题的消息(例如,将主题/分区作为作业参数传递)。