如何测试 Spring Kafka Streams

问题描述

我使用这种方法编写了这个流应用程序: StreamConfigs 类:

@Configuration
@EnableKafka
public class StreamConfigs {
  @Value(...)
  private String applicationId;
    
  @Value(...)
  private String bootstrapServer;
    
  @Bean
  public KafkaStreamsConfiguration streamsConfig() {
    Map<String,Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,applicationId);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,0);
    return new KafkaStreamsConfiguration(props);
  }
    
  @Bean
  public StreamsBuilderFactoryBean streamBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig());
  }
    
  @Bean
  public StreamsBuilderFactoryBean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
  }
}

还有这个班

@Component
public class MyStream {
  @Value(value = ...)
  private String inputTopicA;
    
  @Value(value = ...)
  private String inputTopicB;
    
  @Value(value = ...)
  private String outputTopic;
    
  public MyStream() {}
    
  public MyStream(String inputTopicA,String inputTopicB,String outputTopic) {
    this.inputTopicA = inputTopicA;
    this.inputTopicB = inputTopicB;
    this.outputTopic = outputTopic;
  }
    
  @Bean
  public KStream<String,String> kStream(StreamsBuilder streamBuilder) {
    KTable<String,String> aKTable = streamBuilder.table(inputTopicA);
    KTable<String,String> bKTable = streamBuilder.table(inputTopicB);    
    KTable<String,String> outputKTable = aKTable
        .join(bKTable,(a,b) -> {...})
        .toStream()
        .groupByKey()
        .reduce((aggregate,current) -> {...});
    
    KStream<String,String> stream = outputKTable.toStream();
    stream.to(outputTopic);
    
    return stream;
  }
}

kStream() 类的 MyStream 方法中,有我想要测试的应用程序的流逻辑。

所以我写了这个类用于测试,使用嵌入式 kafka

@EmbeddedKafka(partitions = 1)
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyStreamApplicationTests {
  @Value(...) private String topicA;
  @Value(...) private String topicB;
  @Value(...) private String outputTopic;

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  private BlockingQueue<ConsumerRecord<String,String>> aTopicQueue;
  private BlockingQueue<ConsumerRecord<String,String>> bTopicQueue;
  private BlockingQueue<ConsumerRecord<String,String>> outputTopicQueue;

  KafkaMessageListenerContainer<String,String> container;

  @BeforeAll
  void setupKafka() {
    Map<String,Object> consumerConfigs = new HashMap<>(
      KafkaTestUtils.consumerProps("consumer","true",embeddedKafkaBroker)
    );

    consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

    DefaultKafkaConsumerFactory<String,String> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerConfigs,new StringDeserializer(),new StringDeserializer());

    ContainerProperties containerProperties = new ContainerProperties(aTopic,bTopic,outputTopic);
    container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);

    //init record queues
    aTopiccQueue = new LinkedBlockingQueue<>();
    bTopicQueue = new LinkedBlockingQueue<>();
    outputTopicQueue = new LinkedBlockingQueue<>();

    container.setupMessageListener((MessageListener<String,String>) this::pushRecordIntoQueue);
    container.start();

    ContainerTestUtils.waitForAssignment(container,embeddedKafkaBroker.getTopics().size() * embeddedKafkaBroker.getPartitionsPerTopic());
  }

  @Test
  void testStream() throws Exception {
    //test logic of the stream
  }

}

这是testStream()方法

 @Test
 void testStream() throws Exception {
   Map<String,Object> props = new HashMap<>();
   props.put(StreamsConfig.APPLICATION_ID_CONFIG,"test");
   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafkaBroker.getBrokersAsString());
   props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
   props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,0);
        
   KafkaStreamsConfiguration configuration = new KafkaStreamsConfiguration(props);
   
   //ERROR HERE!!!
   StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getObject();

   MyStream myStream = new MyStream(inputTopicA,inputTopicB,outputTopic);
   myStream.kStream(builder);

   //produce a record into inputTopicA
   //produce a record into inputTopicB

   ConsumerRecord<String,String> outputRecord = outputTopicQueue.take();

   assertThat(outputRecord).isNotNull();
}

这行不通,因为我在这行代码上出错

StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getObject();

这是堆栈跟踪

org.springframework.beans.factory.FactoryBeanNotInitializedException: org.springframework.kafka.config.StreamsBuilderFactoryBean does not support circular references

    at org.springframework.beans.factory.config.AbstractFactoryBean.getEarlySingletonInstance(AbstractFactoryBean.java:172)
    at org.springframework.beans.factory.config.AbstractFactoryBean.getObject(AbstractFactoryBean.java:156)
    at test.MyStreamApplicationTests.testStream(MyStreamApplicationTests.java:226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

如何获得 StreamBuilder 实例以便将我的 kStream() 方法重用于测试和嵌入式代理? 如果我尝试使用 StreamBuilder 创建 new 实例,我的测试停留在等待我的机器上的活动代理上。

你能帮我吗?

编辑: 我认为这可以解决问题: 我以这种方式编辑 setupKafka() 类的 MyStreamApplicationTest 方法:

@BeforeAll
void setupKafka() {
  ...
  Properties streamConfig = new Properties();
  streamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"test");
  streamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafkaBroker.getBrokersAsString());
  streamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
  streamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
  streamConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,0);

  StreamsBuilder streamsBuilder = new StreamsBuilder();
  MyStream myStream = new MergeStream(topicA,topicB,outputTopic);
  myStream.process(streamsBuilder);
  Topology topology = streamsBuilder.build();
  new KafkaStreams(topology,streamConfig).start();

  ContainerTestUtils.waitForAssignment(container,embeddedKafkaBroker.getTopics().size() * embeddedKafkaBroker.getPartitionsPerTopic());
}

解决方法

你不能那样做 - Spring 需要管理它。

只需将 bootstrapServersProperty = ... 添加到 @EmbeddedKafka 注释中,然后您的 bootstrapServer 中的 StreamConfigs 字段将被设置为嵌入式代理的地址。将其设置为您的 @Value 属性。

然后您可以简单地将工厂 bean 创建的 @Autowired StreamBuilder 放入您的测试中。

此外,您只需要其中之一

  @Bean
  public StreamsBuilderFactoryBean streamBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig());
  }
    
  @Bean
  public StreamsBuilderFactoryBean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
  }

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...