在 vertx GraphQL 中处理订阅

问题描述

我尝试使用 Vertx HttpClient/WebClient 来使用 GraphQLSubscritpion,但它没有按预期工作。

服务端相关代码(使用Vertx Web GraphQL编写)如下,当添加评论时,触发onNext评论发送到Publisher

    public VertxDataFetcher<UUID> addComment() {
        return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
            var commentInputArg = dfe.getArgument("commentInput");
            var jacksonMapper = DatabindCodec.mapper();
            var input = jacksonMapper.convertValue(commentInputArg,CommentInput.class);
            return this.posts.addComment(input)
                .onSuccess(id -> this.posts.getCommentById(id.toString())
                                        .onSuccess(c ->subject.onNext(c)));
        });
    }

    private BehaviorSubject<Comment> subject = BehaviorSubject.create();

    public  DataFetcher<Publisher<Comment>> commentAdded() {
        return (DataFetchingEnvironment dfe) -> {
            ConnectableObservable<Comment> connectableObservable = subject.share().publish();
            connectableObservable.connect();
            return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
        };
    }

在客户端,我混合使用HttpClient/WebClient,大多数时候,我想使用WebClient,它更容易处理表单发布。但它似乎没有 WebSocket 连接。

因此 websocket 部分将返回使用 HttpClient。


 var options = new HttpClientOptions()
            .setDefaultHost("localhost")
            .setDefaultPort(8080);

        var httpClient = vertx.createHttpClient(options);
        httpClient.webSocket("/graphql")
            .onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}",text));

                JsonObject messageInit = new JsonObject()
                    .put("type","connection_init")
                    .put("id","1");

                JsonObject message = new JsonObject()
                    .put("payload",new JsonObject()
                        .put("query","subscription onCommentAdded { commentAdded { id content } }"))
                    .put("type","start")
                    .put("id","1");

                ws.write(messageInit.toBuffer());
                ws.write(message.toBuffer());

})
            .onFailure(e -> log.error("error: {}",e));


// this client here is WebClient.
        client.post("/graphql")
            .sendJson(Map.of(
                "query","mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }","variables",Map.of(
                    "input",Map.of(
                        "postId",id,"content","comment content of post id" + LocalDateTime.Now()
                    )
                )
            ))
            .onSuccess(
                data -> log.info("data of addComment: {}",data.bodyAsstring())
            )
            .onFailure(e -> log.error("error: {}",e));

在运行客户端和服务器时,添加了注释,但 WebSocket 客户端不打印有关 WebSocket 消息的任何信息。在服务器控制台上,有这样一条消息。

2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors

似乎根本没有调用后端 commentAdded 数据提取器。

The complete codes 的 GraphQL 客户端和服务器在我的 Github 上共享。

解决方法

阅读some testing codes of Vertx Web GraphQL后,我发现我必须像这样在ConnectionInitHandler上添加ApolloWSHandler

.connectionInitHandler(connectionInitEvent -> {
        JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
        if (payload != null && payload.containsKey("rejectMessage")) {
          connectionInitEvent.fail(payload.getString("rejectMessage"));
          return;
        }
        connectionInitEvent.complete(payload);
      }
)

当客户端发送connection_init消息时,需要connectionInitEvent.complete来启动客户端和服务器之间的通信。