问题描述
我有一个简单的Spring Batch Kafka Consumer Job,它可以从Kafka主题中读取数据并将数据写入文件中。
我想到了生成 5个实例的我的Kafka消费者工作,以便该工作可以更快地完成。也就是说,我启动了5次程序,以便在自己的JVM进程中启动 5个消费者Jobs 。
此方法的直接问题是将有5个进程写入同一文件。我通过在文件名后附加一个唯一的进程ID来解决此问题。我更新的writer bean如下:
private static final String UNIQUE_PROCESS_IDENTIFIER = System.currentTimeMillis();
@Bean
public FlatFileItemWriter<String> testFileWriter() {
FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource(
"I:/CK/data/output_from_consumer_"+UNIQUE_PROCESS_IDENTIFIER+".dat"));
writer.setAppendAllowed(false);
writer.setShouldDeleteIfExists(true);
DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
writer.setLineAggregator(lineAggregator);
return writer;
}
通过将时间戳附加到输出文件名,可以确保每个Consumer JVM进程都写入其自己的文件。
当我最终启动同一程序的5个实例(JVM进程)时,我的期望是,如果在其自己的JVM进程中运行的一个使用者作业从分区中读取一条消息,则在他们自己的JVM进程中运行的其他使用者作业将不会从同一分区再次读取同一条消息(因为所有5个Java进程将使用相同的使用者组,即 mygroup )
但是,我可以看到每个使用者作业进程(JVM)最终都读取了所有消息。结果,我现在有5个文件,每个文件包含相同的内容。示例输出文件名以及每个文件中的记录数,以进行更好的说明:
output_from_consumer_1600530320385.dat -> 1 million records
output_from_consumer_1600530335555.dat -> 1 million reocrds
output_from_consumer_1900530335555.dat -> 1 million records
output_from_consumer_1900530335556.dat -> 1 million records
output_from_consumer_1900730334556.dat -> 1 million records
Total records: 5 million
问题:如何配置Spring Batch作业,以使即使使用该使用者作业启动了多个Java进程,该Java进程也只能读取尚未由同一组中的使用者读取的数据。是作为单独的Java进程启动的?
这是我的预期输出(仅代表):
output_from_consumer_1600530320385.dat -> 100,000 records
output_from_consumer_1600530335555.dat -> 200,000 records
output_from_consumer_1900530335555.dat -> 200,000 records
output_from_consumer_1900530335556.dat -> 400,000 records
output_from_consumer_1900730334556.dat -> 100,000 records
Total records : 1 million
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)