Kafka Producer回调性能

问题描述

我有Kafka Produce,它将消息发送到kafka。然后,我使用帮助存储过程将消息记录在onsucess和onFailure中。如代码所示,我正在使用异步

  1. 我应该在存储库中将我的callStoredProcedure方法标记为已同步以避免死锁吗?我认为不需要同步,因为回调将在单个线程中按顺序执行。

  2. 通过下面的链接

    https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

请注意,回调通常会在的I / O线程中执行 生产者,因此应该相当快,否则会延迟 从其他线程发送消息。如果要执行 建议阻止或计算量大的回调 在回调主体中使用您自己的Executor来并行化处理。

我应该在其他线程中执行回调吗? 您是否可以共享代码片段如何在其他线程中执行回调。像在3个线程中并行化回调

我的代码段

@Autowired
private Myrepository  myrepository;

public void sendMessageToKafka(List<String> message) {
    
            for (String s : message) {
    
                future = kafkaTemplate.send(topicName,message);
    
                future.addCallback(new ListenableFutureCallback<SendResult<String,String>>() {
    
                    @Override
                    public void onSuccess(SendResult<String,String> result) {
    
                        System.out.println("Message Sent  " + result.getRecordMetadata().timestamp());
                        
                        myrepository.callStoredProcedure(result,"SUCCESS");
    
                    }
    
                    @Override
                    public void onFailure(Throwable ex) {
    
                        System.out.println(" sending failed  ");
                        myrepository.callStoredProcedure(result,"FAILED");
    
                    }
                });
    
            }

解决方法


private final ExecutorService exec = Executors.newSingleThreadExecutor();


...

this.exec.submit(() -> myrepository.callStoredProcedure(result,"SUCCESS"));

任务仍将在单个线程(而不是Kafka IO线程)上运行。

如果它不能跟上您的发布速度,则可能需要使用其他执行程序,例如缓存的线程池执行程序或Spring的ThreadPoolTaskExecutor

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...