将Reactivex用于GraphQL时将Spring上下文集成到线程中

问题描述

我正在尝试将GraphQL集成到SpringBoot应用程序中。数据应通过WebSocket发布。为此,我已经配置了订阅解析器,如下所示:

@Component
@RequiredArgsConstructor
public class TSubscriptionResolver implements GraphQLSubscriptionResolver {

private final TPublisher tPublisher;

public Publisher<TDTO> getT() {
    return tPublisher.getPublisher();
 }
}

接下来的步骤是实际链接发布者。我一直在尝试遵循this repo example,它可以解决直到在内存中生成数据并正确将其发布到套接字为止。它正在使用reactx。 当我尝试使用已经存在的Spring服务来从数据库读取/生成数据时,就会出现问题。从下面的代码和示例中可以看到,它正在创建一个新线程。 当然,我知道这样的线程驻留在Spring的上下文之外,因此无法注入Spring服务。但是我无法找到合适的方法来配置线程服务以与应用程序上下文连接。

@Service
@Slf4j
public class TPublisher implements DisposableBean {

private static final Integer BROADCAST_RATE_SECONDS = 5;

private final ScheduledExecutorService executorService;
private final Flowable<TrDTO> publisher;

@Autowired
private TGenerator tGenerator;

public TPublisher() {
    executorService = Executors.newSingleThreadScheduledExecutor();
    Observable<TDTO> quoteObservable = Observable.create(emitter -> {
        executorService.scheduleAtFixedRate(broadcastMessage(emitter),BROADCAST_RATE_SECONDS,TimeUnit.SECONDS);
    });

    ConnectableObservable<TDTO> connectableObservable = quoteObservable.share().publish();
    connectableObservable.connect();

    publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}


private Runnable broadcastMessage(ObservableEmitter<TDTO> emitter) {
    return () -> {
        TDTO message = tGenerator.generate().get(0); // --> Cannot be autowired as i'm creating a new thread
        log.info("subscription was send");
        try {
            emitter.onNext(message);
        } catch (RuntimeException e) {
            log.error("Cannot publish message",e);
        }
    };
}

public Flowable<TDTO> getPublisher() {
    return publisher;
}

@Override
public void destroy() throws Exception {
    executorService.shutdown();
}
}

在此,我如何创建新线程以适应此结构?或者我应该采取哪种方法实现这一目标? 附言:我试图创建自己的Runnable,为其指定范围并将其传递给TaskExecutor。但是我没有满足通过发射器的需要。 我还没有发现GaphQL-SpringBoot-Reactivex完整流程的任何示例,其中数据来自底层服务(来自数据库或消息系统)。

任何提示将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)