问题描述
我们需要一个关于如何使用 ReactiveKafkaConsumerTemplate
测试 ReactiveKafkaProducerTemplate
和 embedded-kafka-broker
的示例。谢谢。
正确的代码在讨论之后
您可以相应地使用自定义 de-serializer
以使用自定义 ReactiveKafkaConsumerTemplate
自定义序列化器:
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class EmployeeSerializer implements Serializer<Employee> {
@Override
public byte[] serialize(String topic,Employee data) {
byte[] rb = null;
ObjectMapper mapper = new ObjectMapper();
try {
rb = mapper.writeValueAsstring(data).getBytes();
} catch (JsonProcessingException e) {
e.printstacktrace();
}
return rb;
}
}
将其用作嵌入式 kfka-reactive 测试的一部分:
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.json.JsonSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.test.StepVerifier;
@EmbeddedKafka(topics = EmbeddedKafkareactiveTest.REACTIVE_INT_KEY_TOPIC,brokerProperties = { "transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1" })
public class EmbeddedKafkareactiveTest {
public static final String REACTIVE_INT_KEY_TOPIC = "reactive_int_key_topic";
private static final Integer DEFAULT_KEY = 1;
private static final String DEFAULT_VERIFY_TIMEOUT = null;
private ReactiveKafkaProducerTemplate<Integer,Employee> reactiveKafkaProducerTemplate;
@BeforeEach
public void setUp() {
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(),new MessagingMessageConverter());
}
private SenderOptions<Integer,Employee> setupSenderOptionsWithDefaultTopic() {
Map<String,Object> senderProps = KafkaTestUtils
.producerProps(EmbeddedKafkaCondition.getbroker().getbrokersAsstring());
SenderOptions<Integer,Employee> senderOptions = SenderOptions.create(senderProps);
senderOptions = senderOptions.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONfig,"reactive.transaction")
.producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONfig,true)
.producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,JsonSerializer.class.getName())
;
return senderOptions;
}
@Test
public void test_When_Publish() {
Employee employee = new Employee();
ProducerRecord<Integer,Employee> producerRecord = new ProducerRecord<Integer,Employee>(REACTIVE_INT_KEY_TOPIC,DEFAULT_KEY,employee);
StepVerifier.create(reactiveKafkaProducerTemplate.send(producerRecord)
.then())
.expectComplete()
.verify();
}
@AfterEach
public void tearDown() {
reactiveKafkaProducerTemplate.close();
}
}
解决方法
框架中的测试使用嵌入式 kafka 代理。
@EmbeddedKafka(topics = ReactiveKafkaProducerTemplateIntegrationTests.REACTIVE_INT_KEY_TOPIC,partitions = 2)
public class ReactiveKafkaProducerTemplateIntegrationTests {
...
,
使用非事务性生产者添加了正确的序列化。请参阅本页顶部的代码以获取答案。