问题描述
在Quarkus应用程序中,我想使用Kafka消息,并使用实体管理器将其信息保存在数据库中。
这是我到目前为止得到的:
@ApplicationScoped
public class ClientEventConsumer {
@Inject ClientRepository repository;
private final BlockingQueue<Message<ClientEvent>> messages = new LinkedBlockingQueue<>();
void startup(@Observes StartupEvent startupEvent) {
scheduledexecutorservice executor = Executors.newScheduledThreadPool(5);
executor.scheduleAtFixedrate(() -> {
if (messages.size() > 0) {
try {
Message<ClientEvent> message = messages.take();
ClientEvent clientEvent = message.getPayload();
ClientEntity clientEntity = new ClientEntity();
clientEntity.setId(clientEvent.getId());
clientEntity.setName(clientEvent.getName());
repository.merge(clientEntity);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},1000,500,TimeUnit.MILLISECONDS);
}
@Incoming("qualificationCheck")
public CompletionStage<Void> consume(Message<ClientEvent> msg) {
messages.add(msg);
return msg.ack();
}
}
但是使用这种方法,在实际将记录持久存储在数据库中之前就已确认消息。如果JPA交易成功,有什么方法只能确认消息吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)