如何为 ActiveMQ 主题创建 DGS GraphQL 订阅

问题描述

我有一些复杂的技术堆栈。我正在利用 Netflix DGS 提供 GraphQL 服务。在幕后是一堆从各种服务发送和接收数据的 JMS 组件。我可以在 GraphQL 订阅之外进行所有工作。

具体来说,我想要做的是为来自 ActiveMQ 主题的消息创建 GraphQL 订阅

所以我有一个 SubscriptionDataFetcher 如下:

@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {

    private final Publisher<SurveyResult> surveyResultsReactiveSource;

    @Autowired
    public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
        this.surveyResultsReactiveSource = surveyResultsReactiveSource;
    }

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME,field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        return surveyResultsReactiveSource;
    }
}

在我的 Spring 配置中,我使用以下 Spring 集成流程:

@Bean
@Scope(BeanDeFinition.ScopE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
    SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

    return Flux.from(
            IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .log()
                    .toReactivePublisher())
            .map((message) -> converter.fromMessage(message,SurveyResult.class));
}

我想说几件事:

  1. 我有一个单独的 @JmsListener 接收这些与主题无关的消息
  2. 看不到多个消费者,即使在建立网络套接字连接之后也是如此。
  3. 如果我将 Mongo Reactive Spring Data Repository 连接到这个 GraphQL 订阅,客户端会收到数据。

当我将客户端连接到订阅时,我看到以下日志:

PublishSubscribeChannel      : Channel 'unkNown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler          : Subscription started for 1

我怀疑在建立 Web 套接字连接时消息侦听器容器未激活。我应该“激活”通道适配器吗?我错过了什么?

技术栈:

    // spring boots - version 2.4.3
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-activemq"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
    implementation "org.springframework.boot:spring-boot-starter-integration"
    implementation 'org.springframework.boot:spring-boot-starter-security'

    // spring integration
    implementation group: 'org.springframework.integration',name: 'spring-integration-jms',version: '5.4.4'

    // dgs
    implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
    implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'

更新 1:

就其价值而言,如果我将订阅更新为以下内容,我会在客户端获得结果。

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME,field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        // repository is a ReactiveMongoRepository
        return repository.findAll();
    }

更新 2:

这是最终的 bean,以防它根据公认的解决方案帮助某人。我需要一个主题的监听器,而不是一个队列。

    @Bean
    public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
        SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
                Jms.container(connectionFactory(),surveyDestination).pubSubDomain(true))
                .jmsMessageConverter(converter))
                .toReactivePublisher();
    }

解决方法

您的问题在这里:

return Flux.from(
        IntegrationFlows.from(

框架只是没有看到内部 IntegrationFlow 实例来正确解析和注册。

要使其工作,您需要考虑将 IntegrationFlow 声明为顶级 bean。

像这样:

@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {  
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .transform([transform to SurveyResult])
                    .toReactivePublisher();
}

现在框架知道必须解析这个逻辑 IntegrationFlow 容器,并且必须注册和启动所有 bean。

如果您的转换器无法提供 SurveyResultMessageConverter,您可能需要将 transform() 逻辑重新考虑为普通的 Jms.messageDrivenChannelAdapter

然后在您的 SurveyResultsSubscriptionDataFetcher 中,您只需要:

 return surveyResultsReactiveSource.map(Message::getPayload);