问题描述
我有Kafka Produce,它将消息发送到kafka。然后,我使用帮助存储过程将消息记录在onsucess和onFailure中。如代码所示,我正在使用异步
-
我应该在存储库中将我的callStoredProcedure方法标记为已同步以避免死锁吗?我认为不需要同步,因为回调将在单个线程中按顺序执行。
-
通过下面的链接
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
请注意,回调通常会在的I / O线程中执行 生产者,因此应该相当快,否则会延迟 从其他线程发送消息。如果要执行 建议阻止或计算量大的回调 在回调主体中使用您自己的Executor来并行化处理。
我应该在其他线程中执行回调吗? 您是否可以共享代码片段如何在其他线程中执行回调。像在3个线程中并行化回调
我的代码段
@Autowired
private Myrepository myrepository;
public void sendMessageToKafka(List<String> message) {
for (String s : message) {
future = kafkaTemplate.send(topicName,message);
future.addCallback(new ListenableFutureCallback<SendResult<String,String>>() {
@Override
public void onSuccess(SendResult<String,String> result) {
System.out.println("Message Sent " + result.getRecordMetadata().timestamp());
myrepository.callStoredProcedure(result,"SUCCESS");
}
@Override
public void onFailure(Throwable ex) {
System.out.println(" sending failed ");
myrepository.callStoredProcedure(result,"FAILED");
}
});
}
解决方法
private final ExecutorService exec = Executors.newSingleThreadExecutor();
...
this.exec.submit(() -> myrepository.callStoredProcedure(result,"SUCCESS"));
任务仍将在单个线程(而不是Kafka IO线程)上运行。
如果它不能跟上您的发布速度,则可能需要使用其他执行程序,例如缓存的线程池执行程序或Spring的ThreadPoolTaskExecutor
。