Apache flink 全外连接中的错误结果

问题描述

我有 2 个数据流,它们是从 2 个表创建的,例如:

Table orderRes1 = ste.sqlQuery(
               "SELECT orderId,userId,SUM(bidPrice) as q FROM " + tble +
                       " Group by orderId,userId");

Table orderRes2 = ste.sqlQuery(
               "SELECT orderId,SUM(askPrice) as q FROM " + tble +
                       " Group by orderId,userId");

DataStream<Tuple2<Boolean,Row>> ds1 = ste.toRetractStream(orderRes1,Row.class).
               filter(order-> order.f0);

DataStream<Tuple2<Boolean,Row>> ds2 = ste.toRetractStream(orderRes2,Row.class).
               filter(order-> order.f0);

我想对这 2 个流执行完全外部联接,我同时使用了 orderRes1.fullOuterJoin(orderRes2,$(exp)) 以及一个包含完整外连接的 sql 查询,如下所示:

  Table bidOrdr = ste.fromDataStream(bidTuple,$("orderId"),$("userId"),$("price"));
  
  Table askOrdr = ste.fromDataStream(askTuple,$("price"));

 Table result = ste.sqlQuery(
                "SELECT COALESCE(bidTbl.orderId,askTbl.orderId)," +
                        " COALESCE(bidTbl.userId," +
                        " COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice," +
                        " COALESCE(askTbl.askTotalPrice,0) as askTotalPrice," + 
                        " FROM " +
                        " (SELECT orderId," +
                        " SUM(price) AS bidTotalPrice " +
                        " FROM " + bidOrdr +
                        " Group by orderId,userId) bidTbl full outer JOIN " +
                        " (SELECT orderId," +
                        " SUM(price) AS askTotalPrice" +
                        " FROM " + askOrdr +
                        " Group by orderId,userId) askTbl " +
                        " ON (bidTbl.orderId = askTbl.orderId" +
                        " AND bidTbl.userId= askTbl.userId) ") ;

 DataStream<Tuple2<Boolean,Row>> =  ste.toRetractStream(result,Row.class).filter(order -> order.f0);

然而,在某些情况下的结果是不正确的:假设用户 A 以价格向 B 出售 3 次,在用户 B 向 A 出售 2 次之后,第二次结果是:

7> (true,123,a,300.0,0.0)

7> (true,200.0)

10> (true,b,0.0,300.0)

10> (true,200.0,300.0)

第二行和第四行是流的预期结果,但它也会生成第一行和第三行。 值得一提的是,coGroup 是另一种解决方案,但我不想在这种情况下使用窗口化,非窗口化解决方案只能在有界流(DataSet)中访问。

提示:orderId 和 userId 将在两个流中重复,我想在每个操作中生成 2 行,包含: orderId、userId1、bidTotalPrice、askTotalPrice AND orderId、userId2、bidTotalPrice、askTotalPrice

解决方法

流式查询(或者换句话说,在动态表上执行查询)会出现这样的情况。与在查询执行期间查询的输入关系保持静态的传统数据库不同,流式查询的输入不断更新 - 因此结果也必须不断更新。

如果我理解这里的设置,第 1 行和第 3 行的“不正确”结果是正确的,直到 orderRes2 中的相关行被处理。如果这些行从未到达,则第 1 行和第 3 行将保持正确。

您应该期待的是最终正确的结果,包括必要时撤回。您可以通过打开 mini-batch aggregation 来减少中间结果的数量。

mailing list thread 提供了更多见解。如果我误解了您的情况,请提供一个可重现的示例来说明问题。