Spring Kafka交易-尚无交易在进行,请在template.executeInTransaction

问题描述

我正在尝试使用KafkaTemplate通过事务将消息发布到Kafka:

@Autowired
KafkaTemplate<GenericRecord,GenericRecord> kafkaTemplate;

@Transactional
@RabbitListener(queues = "queueName")
void input(final List<Message> messages) {
     for (Message msg : messages) {
          PublishRequest request = prepareRequest(msg);
          kafkaTemplate.sendDefault(request.getKey(),reguest.getValue());
     }
     transactionalDatabaseInserts();
}

但是当我这样做时,我得到了这个异常:

由于:java.lang.IllegalStateException:没有事务在 处理;可能的解决方案:在 在template.executeInTransaction()操作的范围内,启动 在调用模板方法之前使用@Transactional进行事务处理, 在消耗容器时,在由侦听器容器启动的事务中运行 记录

KafkaTemplate的配置:

@EnableTransactionManagement
@Configuration
public class KafkaConfig{
    @Bean
    KafkaTransactionManager<GenericRecord,GenericRecord> kafkaTransactionManager(final ProducerFactory<GenericRecord,GenericRecord> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    KafkaTemplate<GenericRecord,GenericRecord> kafkaTemplate(final ProducerFactory<GenericRecord,GenericRecord> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

在我的application.yaml中,我包括了:

spring.kafka.producer.transaction-id-prefix: tx-

我希望我的方法可以使用@Transactional而不是kafkaTemplate.executeInTransaction()。为什么我会收到该异常?

解决方法

您必须配置有误-可以按预期运行...

@SpringBootApplication
@EnableTransactionManagement
public class So63596919Application {

    public static void main(String[] args) {
        SpringApplication.run(So63596919Application.class,args);
    }

    @Autowired
    private KafkaTemplate<String,String> template;

    private final CountDownLatch latch = new CountDownLatch(1);

    @Transactional
    @RabbitListener(queues = "so63596919")
    public void listen(List<String> in) throws InterruptedException {
        System.out.println(in);
        in.forEach(str -> this.template.send("so63596919",str));
        System.out.println("Hit enter to exit listener and commit transaction");
        this.latch.await();
    }

    @KafkaListener(id = "so63596919",topics = "so63596919")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public Queue queue() {
        return new Queue("so63596919");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63596919").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template,AbstractRabbitListenerContainerFactory<?> factory) {
        factory.setBatchListener(true);
        factory.setContainerCustomizer(container -> {
                ((SimpleMessageListenerContainer) container).setConsumerBatchEnabled(true);
                container.setDeBatchingEnabled(true);
        });
        return args -> {
            template.convertAndSend("so63596919","foo");
            template.convertAndSend("so63596919","bar");
            System.in.read();
            this.latch.countDown();
        };
    }

}
spring.kafka.producer.transaction-id-prefix: tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed

spring.rabbitmq.listener.simple.batch-size=2

如果您可以将项目缩减为一个这样的小示例,那么我可以看看有什么问题。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...