使用 Spring 在多模块 Maven 项目中进行集成测试

问题描述

我有一个包含多个模块(父、服务、updater1、updater2)的多模块 maven 项目。 @SpringBootApplication 位于“服务”模块中,其他模块没有工件。

'updater1' 是一个模块,它有一个 Kafka 监听器和一个 http 客户端,当接收到一个 kafka 事件时会向外部 API 发出请求。我想使用 testcontainers 在此模块中创建集成测试,因此我创建了容器和 Kafka 生产者,以将 KafkaTemplate 发送给我的消费者。

我的问题是 Kafka 生产者自动装配为 null,因此测试会抛出 NullPointerException。我觉得应该是Spring的配置问题,但是找不到问题。你能帮助我吗?谢谢!

这是我的测试课:

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {KafkaConfiguration.class,CacheConfiguration.class,ClientConfiguration.class})
public class InvoicingTest {

@ClassRule
public static final Containers containers = Containers.Builder.aContainer()
        .withKafka()
        .withServer()
        .build();

private final MockHttpClient mockHttpClient =
        new MockHttpClient(containers.getHost(SERVER),containers.getPort(SERVER));

@Autowired
private KafkaEventProducer kafkaEventProducer;

@BeforeEach
@Transactional
void setUp() {
    mockHttpClient.reset();
}

@Test
public void createElementSuccesfullResponse() throws ExecutionException,InterruptedException,TimeoutException {

    mockHttpClient.whenPost("/v1/endpoint")
            .respond(HttpStatusCode.OK_200);

    kafkaEventProducer.produce("src/test/resources/event/invoiceCreated.json");

    mockHttpClient.verify();

}

这是事件生产者:

@Component
public class KafkaEventProducer {

private final KafkaTemplate<String,String> kafkaTemplate;

private final String topic;

@Autowired
KafkaInvoicingEventProducer(KafkaTemplate<String,String> kafkaTemplate,@Value("${kafka.topic.invoicing.name}") String topic){
    this.kafkaTemplate = kafkaTemplate;
    this.topic = topic;
}

public void produce(String event){
    kafkaTemplate.send(topic,event);
}

}

解决方法

您没有详细说明 KafkaEventProducer 是如何实现的(它是 @Component 吗?),您的测试类也没有使用 @SpringBootTest 和运行程序 @RunWith 进行注释。

查看此示例,使用 Apache KakfaProducer:

import org.apache.kafka.clients.producer.KafkaProducer;

public void sendRecord(String topic,String event) {
        try (KafkaProducer<String,byte[]> producer = new KafkaProducer<>(producerProps(bootstrapServers,false))) {
            send(producer,topic,event);
        } 
    }

哪里

public void send(KafkaProducer<String,byte[]> producer,String topic,String event) {
    try {
      ProducerRecord<String,byte[]> record = new ProducerRecord<>(topic,event.getBytes());
      producer.send(record).get();
    } catch (InterruptedException | ExecutionException e) {
        fail("Not expected exception: " + e.getMessage());
    }
}


protected Properties producerProps(String bootstrapServer,boolean transactional) {
        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        producerProperties.put(KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        producerProperties.put(VALUE_SERIALIZER_CLASS_CONFIG,ByteArraySerializer.class.getName());
        if (transactional) {
            producerProperties.put(TRANSACTIONAL_ID_CONFIG,"my-transactional-id");
        }
            
        return producerProperties;
    }

bootstrapServers 取自 kafka 容器:

KafkaContainer kafka = new KafkaContainer();
kafka.start();
bootstrapServers = kafka.getBootstrapServers();