问题描述
我正在使用 ReplyingKafkaTemplate 进行Kafka同步响应,并且只有一次实例运行时,我才能获得响应。但是,如果应用程序扩展到多个实例,我会收到超时错误。
来自文档
根据文档,如果我们需要使用其他使用者组,这是否意味着我们需要手动使用其他使用者组运行实例?如果使用PCF等工具,如何处理自动缩放。以下是我的kafka配置。
clang-format
解决方法
在replyContainer
bean中,添加
containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // so the new group doesn't get old replies
containerProperties.setKafkaConsumerProperties(props);
在replyingKafkaTemplate
中,添加
rkt.setSharedReplyTopic(true);
请求主题至少需要与最大扩展量一样多的分区。 回复主题可以有任意数量的分区(包括1个)。
使用PCF,您可以使用instanceIndex
来构造groupId,而不是使其随机化。
您还可以将instanceIndex
用作REPLY_PARTITION
标头,并使用固定的答复分区。在这种情况下,您至少需要与预期使用的最大instanceIndex
一样多的分区。