问题描述
我们有一个集成测试,我们使用 EmbeddedKafka 并向主题生成消息,我们的应用程序处理该消息,并将结果发送到我们使用和断言输出的第二个主题。在 CI 中,这可能有 2/3 的时间有效,但我们会遇到 KafkaTestUtils.getSingleRecord
抛出 java.lang.IllegalStateException: No records found for topic
的情况(参见下面的 [1])。
为了尝试解决这个问题,我为注册表中的每个侦听器容器添加了 ContainerTestUtils.waitForAssignment
(请参阅下面的 [2])。在 CI 中成功运行了几次之后,我看到了一个新的异常:java.lang.IllegalStateException: Expected 1 but got 0 partitions
。这让我想知道这是否真的是未找到记录的原始异常的根本原因。
有什么想法可以帮助解决这里的随机故障?如果您有任何有关如何进行故障排除的建议,我将不胜感激。
spring-kafka 和 spring-kafka-test v2.6.4。
编辑:添加了 newConsumer
以供参考。
我们的设置示例:
@SpringBoottest
@RunWith(springrunner.class)
@DirtiesContext
@EmbeddedKafka(
topics = { "topic1","topic2" },partitions = 1,brokerProperties = {"listeners=PLAINTEXT://localhost:9099","port=9099"})
public class IntegrationTest {
@Autowired
private EmbeddedKafkabroker embeddedKafkabroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
public void testExample() {
try (Consumer<String,String> consumer = newConsumer()) {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
[2]
ContainerTestUtils.waitForAssignment(messageListenerContainer,embeddedKafkabroker.getPartitionsPerTopic());
}
try (Producer<String,String> producer = newProducer()) {
embeddedKafkabroker.consumeFromAnEmbeddedTopic(consumer,"topic2"); // [1]
producer.send(new ProducerRecord<>(
"topic1","test payload"));
producer.flush();
}
String result = KafkaTestUtils.getSingleRecord(consumer,"topic2").value();
assertEquals(result,"expected result");
}
}
private Consumer<String,String> newConsumer() {
Map<String,Object> consumerProps = KafkaTestUtils.consumerProps("groupId","false",embeddedKafkabroker);
ConsumerFactory<String,AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProps,new StringDeserializer(),new CustomDeserializer<>());
return consumerFactory.createConsumer();
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)