将PubSub流传输到Spanner-等待步骤

问题描述

要求是在从pubsub消息中插入数据之前删除扳手表中的数据。由于MutationGroup不保证执行顺序,因此将Delete突变分为单独的集合,因此有两组,一组用于Delete,另一组用于AddReplace Mutations。

PCollection<Data> dataJson =
        pipeLine
            .apply(PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
            .apply("ParsePubSubMessage",ParDo.of(new PubSubToDataFn()))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
        ;
SpannerWriteResult deleteResult = dataJson
        .apply("DeleteDataMutation",MapElements.via(......))
        .apply("DeleteData",SpannerIO.write().withSpannerConfig(spannerConfig).grouped());

    dataJson
        .apply("WaitOnDeleteMutation",Wait.on(deleteResult.getOutput()))
        .apply("AddReplaceMutation",MapElements.via(...))
        .apply("UpsertInfoToSpanner",SpannerIO.write().withSpannerConfig(spannerConfig).grouped());

这是一个流数据流作业,我尝试了多个Windowing,但是它从未执行“ UpsertInfoToSpanner”步骤。

如何解决此问题?有人可以建议前进的道路。

更新: 要求是对相同的输入数据依次应用两个突变组,即从PubSub消息中读取JSON,以从具有突变组的多个表中删除现有数据,然后插入从JSON PubSub消息中读取的数据。

解决方法

尽早重新粘贴评论以提高可见度:

保证单个MutationGroup中的Mutation操作可以在单个事务中按顺序执行,所以我看不出这里是什么问题... Wait.on()从未释放的原因是因为输出正在等待的流在全局窗口中,因此永远不会在流管道中关闭。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...