问题描述
我正在尝试使用 spring webflux 来构建一个反应式长轮询 API 服务,我已经实现了一个从我的数据资源 (Kafka) 生成数据流的服务。这个长轮询 API 的调用者应该使用请求的密钥获取/等待第一个数据。但不知何故,如果应用程序获取数据后收到数据,API 不会返回数据。我是否使用了正确的反应器方法?非常感谢!
我的服务:
@Service
public class KafkaService {
// the processor is a in-memory cache that keeps tracking of the old messages
@Autowired private KafkaMessageProcessor kafkaMessageProcessor;
@Autowired private ApplicationEventService applicationEventService;
private Flux<KafkaMessage> eventFlux;
@PostConstruct
public void setEventFlux() {
this.eventFlux = Flux.create(applicationEventService).share();
}
public Flux<KafkaMessage> listenToKeyFlux(String key) {
return kafkaMessageProcessor
.getAll()
.concatWith(this.eventFlux.share())
.filter(x -> x.getKey().equals(key));
}
}
我的处理程序:
@RestController
@RequestMapping("/longpolling")
public class LongPollingController {
private static final String MSG_PREFIX = "key_";
@Autowired private KafkaService kafkaService;
@GetMapping("/message/{key}")
@CrossOrigin(allowedHeaders = "*",origins = "*")
public CompletableFuture<KafkaMessage> getMessage(@PathVariable String key) {
return kafkaService.listenToKeyFlux(MSG_PREFIX + key).shareNext().toFuture();
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)