使用 webflux 构建响应式长轮询 API

问题描述

我正在尝试使用 spring webflux 来构建一个反应式长轮询 API 服务,我已经实现了一个从我的数据资源 (Kafka) 生成数据流的服务。这个长轮询 API 的调用者应该使用请求的密钥获取/等待第一个数据。但不知何故,如果应用程序获取数据后收到数据,API 不会返回数据。我是否使用了正确的反应器方法?非常感谢!

我的服务:

@Service

public class KafkaService {
    // the processor is a in-memory cache that keeps tracking of the old messages
    @Autowired private KafkaMessageProcessor kafkaMessageProcessor;
    

@Autowired private ApplicationEventService applicationEventService;


    private Flux<KafkaMessage> eventFlux;

    @PostConstruct
    
public void setEventFlux() {

        this.eventFlux = Flux.create(applicationEventService).share();

    }

    public Flux<KafkaMessage> listenToKeyFlux(String key) {
    
        return kafkaMessageProcessor

            .getAll()

            .concatWith(this.eventFlux.share())

            .filter(x -> x.getKey().equals(key));
}
}

我的处理程序:

@RestController

@RequestMapping("/longpolling")

public class LongPollingController {


    private static final String MSG_PREFIX = "key_";


    @Autowired private KafkaService kafkaService;

    @GetMapping("/message/{key}")
    
@CrossOrigin(allowedHeaders = "*",origins = "*")
    
public CompletableFuture<KafkaMessage> getMessage(@PathVariable String key) {

    
        return kafkaService.listenToKeyFlux(MSG_PREFIX + key).shareNext().toFuture();
    
}
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)