问题描述
这是之前响应式 kafka 问题 (Issue while sending the Flux of data to the reactive kafka) 的后续问题。
我正在尝试使用反应式方法向 kafka 发送一些日志记录。这是使用反应式 kafka 发送消息的反应式代码。
public class LogProducer {
private final KafkaSender<String,String> sender;
public LogProducer(String bootstrapServers) {
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONfig,"log-producer");
props.put(ProducerConfig.ACKS_CONfig,"all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class);
SenderOptions<String,String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic,Flux<Logs.Data> records) throws InterruptedException {
AtomicInteger sentCount = new AtomicInteger(0);
sender.send(records
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
Thread.sleep(0,5); // sleep for 5 ns
return SenderRecord.create(new ProducerRecord<>(topic,id,lrec.toString()),id);
})).doOnNext(res -> sentCount.incrementAndGet()).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' Failed. "
+ e,topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'",sentCount,topic);
})
.subscribe();
}
}
public class ExecuteQuery implements Runnable {
private LogProducer producer = new LogProducer("localhost:9092");
@Override
public void run() {
Flux<Logs.Data> records = ...
producer.sendMessages(kafkaTopic,records);
.....
.....
// processing related to the messages sent
}
}
因此,即使存在 Thread.sleep(0,5);
,有时它也不会将所有消息发送到 kafka,并且程序会提前打印 SUCCESS 消息 (log.info("[SUCCESS]: {} records sent to the topic: '{}'",topic);
)。有没有更具体的方法来解决这个问题。例如,使用某种回调,让线程等待所有消息发送成功。
我有一个 spring 控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样
public class Main {
private scheduledexecutorservice scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);
public static void main(String[] args) {
QueryScheduler scheduledQuery = new QueryScheduler();
scheduler.scheduleAtFixedrate(scheduledQuery,5,TimeUnit.MINUTES);
}
class QueryScheduler implements Runnable {
@Override
public void run() {
// preprocessing related to time
executor.execute(new ExecuteQuery());
// postprocessing related to time
}
}
}
解决方法
您的 Thread.sleep(0,5); // sleep for 5 ns
没有任何值可以阻止主线程,因此它会在需要时退出,而您的 ExecuteQuery
可能尚未完成其工作。
不清楚您如何启动您的应用程序,但我建议 Thread.sleep()
正好在主线程中进行阻塞。准确地说是在 public static void main(String[] args) {
方法实现中。