使用 EmbeddedKafka

问题描述

我正在尝试使用 Springboot 和 EmbeddedKafka 为单元测试设置一个类。 我有两个主题,topicA 和 topicB,我将测试消息生成到 topicA 和 topicB。

这是我的课程:

    @EmbeddedKafka()
    @SpringBootTest
    class ApplicationTests {
        private String topicA = "A";
        private String topicB = "B";
    
        @Autowired
        private EmbeddedKafkaBroker embeddedKafkaBroker;
    
        BlockingQueue<ConsumerRecord<String,String>> topicAContent;
        BlockingQueue<ConsumerRecord<String,String>> topicBContent;
    
        KafkaMessageListenerContainer<String,String> container;

        @BeforeEach
        void setup() {
            Map<String,Object> consumerConfigs = new HashMap<>(
                KafkaTestUtils.consumerProps("consumer","true",embeddedKafkaBroker)
            );

            DefaultKafkaConsumerFactory<String,String> consumerFactory =
                new DefaultKafkaConsumerFactory<>(consumerConfigs,new StringDeserializer(),new StringDeserializer());


            ContainerProperties containerProperties = new ContainerProperties(topicA,topicB);
            container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
            topicAContent = new LinkedBlockingQueue<>();
            topicBContent = new LinkedBlockingQueue<>();

            container.setupMessageListener((MessageListener<String,String>) this::pushRecord);
            container.start();

            ContainerTestUtils.waitForAssignment(container,embeddedKafkaBroker.getPartitionsPerTopic());
        }

        private void pushRecord(ConsumerRecord<String,String> record) {
            String topic = record.topic();
            if(topic.equals(topicA)) {
                topicAContent.add(record);
            }
            else if(topic.equals(topicB)) {
                topicBContent.add(record);
            }
        }

        @Test
        public void produceIntoTopicA() {
            Map<String,Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
            Producer<String,String> producer = new DefaultKafkaProducerFactory<>(configs,new StringSerializer(),new StringSerializer()).createProducer();
            producer.send(new ProducerRecord<>(topicA,"a","Hello A"));
            producer.flush();

            ConsumerRecord<String,String> singleRecord = topicAContent.poll(100,TimeUnit.MILLISECONDS);
            assertThat(singleRecord).isNotNull();
            assertThat(singleRecord.key()).isEqualTo("a");
            assertThat(singleRecord.value()).isEqualTo("Hello A");
        }

        @Test
        public void produceIntoTopicB() {
            Map<String,new StringSerializer()).createProducer();
            producer.send(new ProducerRecord<>(topicB,"b","Hello B"));
            producer.flush();

            ConsumerRecord<String,String> singleRecord = topicBContent.poll(100,TimeUnit.MILLISECONDS);
            assertThat(singleRecord).isNotNull();
            assertThat(singleRecord.key()).isEqualTo("b");
            assertThat(singleRecord.value()).isEqualTo("Hello B");
        }
    }

现在,如果我运行测试,produceIntoTopicB 测试将失败并显示此错误:

java.lang.IllegalStateException: Expected 1 but got 2 partitions

    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForSingleContainerAssignment(ContainerTestUtils.java:115)
    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:51)
    at it.test.ApplicationTests.setup(ApplicationTests.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:68)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$9(ClassBasedTestDescriptor.java:384)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:382)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:196)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:78)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:136)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

而另一个测试因此错误而失败:

java.lang.AssertionError: 
Expecting actual not to be null

我错在哪里?

解决方法

对于第二个,你需要

consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

默认情况下是 latest,因此存在竞争(尽管 waitForAssignment() 应该可以防止这种情况发生,请尝试 DEBUG 日志记录)。

对于第一个,编辑问题以显示完整的堆栈跟踪。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...