用于 OnFailure 回调的 Kafka Junit

问题描述

我有以下代码将数据发送到 Kafka。我能够成功发送测试 OnSucess 回调,但我无法测试 onFailure() 方法

我收到以下错误

java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock$2137573915 不能转换为 org.springframework.kafka.core.KafkaProducerException

@Test
    void test() throws InterruptedException,ExecutionException {

         Throwable ex = mock(Throwable.class);
                 Employee employee = new Employee();

        when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
            when(sendResult.getProducerRecord()).thenReturn(producerRecord);
            when(producerRecord.value()).thenReturn(employee);

            doAnswer(invocationOnMock -> {
                ListenableFutureCallback<SendResult<String,Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
                
                listenableFutureCallback.onFailure(ex);
               
                return null;
            }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
            
            kafkaSender.sendMessage(employee);
            
         }

@Service
public class KafkaSender{
    @Autowired
    private KafkaTemplate<String,Employee> kafkaTemplate;
    public void sendMessage(Employee employee) {

        ObjectMapper objectMapper = new ObjectMapper();

        ListenableFuture<SendResult<String,Employee>> listenableFuture = kafkaTemplate.send(topic,employee);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String,Employee>>() {

            @Override
            public void onSuccess(SendResult<String,Employee> result) {

                
                saveInDatabaseMethod(result.getProducerRecord()); // method to save in DB

            }

            @Override
            public void onFailure(Throwable ex) {

                                 // class cast exception occur here
                ProducerRecord<String,Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();


                saveInDatabaseMethod(producerRecord);

            }

            }

解决方法

ProducerRecord<String,Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();

你的模拟不是用 KPE 调用回调,而是用这个调用它

Throwable ex = mock(Throwable.class);

您需要将其包装在 KPE 中。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...