Flink Table API 中如何为表中的每一行分配唯一 ID?

问题描述

我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也用于存储在 S3 中。这样就可以在计算中查看每个中间步骤的数据,并查看每个操作的效果

我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在接下来的步骤中再次出现时(可能在不同的列中)我知道两行彼此关联。

一个明显的候选似乎是 ROW_NUMBER() 函数,但是:

  1. 它似乎没有出现在表表达式 API 中的任何地方。我必须构造 sql 字符串吗?

  2. 我如何使用它?当我尝试此查询时:

    SELECT *,ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp

    我收到此错误

    org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.

  3. 是否总是需要对表格进行排序?这似乎是我宁愿避免的开销。

一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,同一个 UUID 从来没有使用过两次,所以它完全没用。举个例子:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object SandBox {
  def main(args: Array[String]): Unit = {

    val env = StreamTableEnvironment.create(
      StreamExecutionEnvironment.getExecutionEnvironment
    )

    val inp = env.fromValues(1.as("id"))
    val out1 = inp.addColumns(uuid().as("u"))
    val out2 = out1.addColumns($"u".as("u2"))

    env.executesql("""
      CREATE TABLE out1 ( id INTEGER,u VARCHAR(36) )
      WITH ('connector' = 'print')
    """)

    env.executesql("""
      CREATE TABLE out2 ( id INTEGER,u VARCHAR(36),u2 VARCHAR(36) )
      WITH ('connector' = 'print')
    """)

    env.createStatementSet()
      .addInsert("out1",out1)
      .addInsert("out2",out2)
      .execute()

    // Equivalent to the createStatementSet method:
    out1.executeInsert("out1")
    out2.executeInsert("out2")
  }
}

我得到的输出

[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)

我需要 out1 的 UUID 重新出现在 out2 的两列中,例如:

[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)

我想这是由于 docs 中的这个注释:

函数不是确定性的,这意味着将为每条记录重新计算值。

如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到 out1out2

我用用户定义的函数得到了类似的结果:

    class uuidUdf extends ScalarFunction {
      def eval(): String = UUID.randomUUID().toString
    }

    val out1 = inp.addColumns(call(new uuidUdf()).as("u"))

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)