使用JUnit 5和EmbeddedKafkaBroker在Spring Boot应用程序中测试Apache Kafka集成

问题描述

我有一个简单的生产者类,定义如下:

@Configuration
public class MyKafkaProducer {

    private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);

    @Value("${my.kafka.producer.topic}")
    private String topic;

    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    public void sendDataToKafka(@RequestParam String data) {

        ListenableFuture<SendResult<String,String>> listenableFuture = kafkaTemplate.send(topic,data);

        listenableFuture.addCallback(new ListenableFutureCallback<>() {

            @Override
            public void onSuccess(SendResult<String,String> result) {
                log.info("Sent data {}",result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("Unable to send data {} due to: {}",data,ex.getMessage());
            }
        });
    }
}

这是正在进行的测试类:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {

    private static final String TOPIC = "device";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private MyKafkaProducer producer;

    BlockingQueue<ConsumerRecord<String,String>> records;

    KafkaMessageListenerContainer<String,String> container;

    @BeforeAll
    void setUp() {
        Map<String,Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer","false",embeddedKafkaBroker));
        DefaultKafkaConsumerFactory<String,String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs,new StringDeserializer(),new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String,String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container,embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    @Test
    public void testIfWorks() throws InterruptedException {
        // Arrange
        Map<String,Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String,String> producer = new DefaultKafkaProducerFactory<>(configs,new StringSerializer(),new StringSerializer()).createProducer();

        // Act
        producer.send(new ProducerRecord<>(TOPIC,"my-aggregate-id","{\"event\":\"Test Event\"}"));
        producer.flush();

        // Assert
        ConsumerRecord<String,String> singleRecord = records.poll(100,TimeUnit.MILLISECONDS);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
        assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
    }

问题在于测试创建了默认的生产者:

Producer<String,new StringSerializer()).createProducer();

如何使用自己的生产者MyKafkaProducer并调用其sendDataToKafka方法?在这种情况下,我们可以如何测试?

可以找到源代码here。 进行中的测试分支为here。 谢谢。

解决方法

这是一个Spring Boot应用程序,您正在使用自动配置的KafkaTemplate

要覆盖bootstrap-servers以使用嵌入式kafka代理,请参见https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation

@EmbeddedKafka(topics = "someTopic",bootstrapServersProperty = "spring.kafka.bootstrap-servers")

然后您可以从测试用例中致电生产者。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...