反应式 Kafka 接收器可以与非反应式 Elasticsearch 客户端一起使用吗?

问题描述

下面是一个示例代码,它使用 reactor-kafka 并从一个主题(具有重试逻辑)读取数据,该主题具有通过非反应式生产者发布的记录。在我的 doOnNext() 消费者中,我使用非反应式 Elasticsearch 客户端来索引索引中的记录。所以我有几个我仍然不清楚的问题:

  1. 我知道消费者和生产者是独立的解耦系统,但是否建议使用反应性生产者以及其消费者是反应性的?
  2. 如果我使用的是非反应性的东西,在这种情况下 Elasticsearch 客户端 org.elasticsearch.client.RestClient代码“反应性” 是否有效?如果有或没有,我该如何测试? (通过“反应性”,我的意思是它的非阻塞 IO 部分,即如果我产生三个反应性消费者并且一个由于某种原因是潜在的,线程应该被解锁并用于其他反应性消费者)。
  3. 一般来说,问题是,如果我用响应式客户端包装一些 API,该 API 是否也应该是响应式的?

public disposable consumeRecords() {
    long maxAttempts = 3,duration = 10;
    RetryBackoffSpec retrySpec = Retry.backoff(maxAttempts,Duration.ofSeconds(duration)).transientErrors(true);
    Consumer<ReceiverRecord<K,V>> doOnNextConsumer = x -> {
        // use non-reactive elastic search client and index record x
    };

    return KafkaReceiver.create(receiverOptions)
            .receive()
            .doOnNext(record -> {
                try {
                    // calling the non-reactive consumer
                    doOnNextConsumer.accept(record);
                } catch (Exception e) {
                    throw new ReceiverRecordException(record,e);
                }
                record.receiverOffset().ackNowledge();
            })
            .doOnError(t -> log.error("Error occurred: ",t))
            .retrywhen(retrySpec)
            .onErrorContinue((e,record) -> {
                ReceiverRecordException receiverRecordException = (ReceiverRecordException) e;
                log.error("Retries exhausted for: " + receiverRecordException);
                receiverRecordException.getRecord().receiverOffset().ackNowledge();
            })
            .repeat()
            .subscribe();
}

解决方法

对它有一些了解。

Reactive KafkaReceiver 会在内部调用一些 API;如果 API 是 阻塞 API,那么即使 KafkaReceiver 是“反应式”,非阻塞 IO 也不会工作,并且接收器线程将被阻塞,因为您正在调用 阻塞式 API / 非反应式 API

您可以通过创建一个简单的服务器(阻止调用某个时间/睡眠)并从此接收器调用该服务器来测试这一点