Kafka回复在PCF中自动缩放应用程序时超时

问题描述

我正在使用 ReplyingKafkaTemplate 进行Kafka同步响应,并且只有一次实例运行时,我才能获得响应。但是,如果应用程序扩展到多个实例,我会收到超时错误

来自文档

使用单个答复主题进行配置时,每个实例必须使用不同的group.id。在这种情况下,所有实例都会收到每个回复

根据文档,如果我们需要使用其他使用者组,这是否意味着我们需要手动使用其他使用者组运行实例?如果使用PCF等工具,如何处理自动缩放。以下是我的kafka配置。

clang-format

解决方法

replyContainer bean中,添加

containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // so the new group doesn't get old replies
containerProperties.setKafkaConsumerProperties(props);

replyingKafkaTemplate中,添加

rkt.setSharedReplyTopic(true);

请求主题至少需要与最大扩展量一样多的分区。 回复主题可以有任意数量的分区(包括1个)。

使用PCF,您可以使用instanceIndex来构造groupId,而不是使其随机化。

您还可以将instanceIndex用作REPLY_PARTITION标头,并使用固定的答复分区。在这种情况下,您至少需要与预期使用的最大instanceIndex一样多的分区。