问题描述
我想在 Flink 的“Select .. From .. GROUP BY ..”查询中保留每个键的所有原始行。我定义了一个名为 RowToJsonAgg 的 AggregateFunction,它将行聚合成一个 Json 字符串。
class RowToJsonAgg extends AggregateFunction[String,ListBuffer[String]]{
def accumulate(accumulator: ListBuffer[String],row: Any*): Unit = {
....
// 假设该行看起来像“ $field1_name,$field1_value,$field2_name,$field2_value,... ” // 尝试从行生成 json。但是当我运行查询时,Flink 似乎找不到这个功能 }
def merge(accumulator: ListBuffer[String],its: java.lang.Iterable[ListBuffer[String]]): Unit = {
accumulator.append(
WrapAsScala.iterableAsScalaIterable(its).flatten.toList:_*
)
}
def resetAccumulator(accumulator: ListBuffer[String]): Unit = {
accumulator.clear()
}
override def getValue(accumulator: ListBuffer[String]): String = {
accumulator.mkString("{",","}")
}
override def createAccumulator(): ListBuffer[String] = ListBuffer.empty
override def getAccumulatorType(): Typeinformation[ListBuffer[String]] = {
Typeinformation.of(classOf[ListBuffer[String]])
}
override def getResultType: Typeinformation[String] = Typeinformation.of(classOf[String])
}
数据类和查询如下所示:
case class Stock(id:Int,price: Int,volumn: Int,ts: Long)
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
bbTableEnv.createTemporarySystemFunction("row_to_json_agg",classOf[RowToJsonAgg])
val table = bbTableEnv.fromValues(...)
bbTableEnv.createTemporaryView("Stock",table)
bbTableEnv.executesql(
"select price,row_to_json_agg('volumn',volumn,'ts',ts) as details from Stock group by price"
)
当我运行应用程序时,我得到了 sql 验证异常,详细消息是“找不到匹配的函数签名 row_to_json_agg(CHaraCTER,NUMERIC,CHaraCTER,NUMERIC) "
似乎 Flink 找不到合适的 accumulate 函数来调用。
如果我声明累加函数如下
def accumulate(accumulator: ListBuffer[String],volumn: Integer,ts: Long)
并更改查询如
"select price,row_to_json_agg(volumn,ts) from Stock group by price"
我遇到了同样的异常,消息是“找不到匹配的函数签名 row_to_json_agg(NUMERIC,NUMERIC)”
任何想法如何使聚合函数工作?
解决方法
我自己想出来的。
-
通过运行 SQL 注册 UDF,如下所示:
bbTableEnv.executeSQL( String.format("创建临时函数 $udf_name 为 '%s'","$full_class_name_of_your_udf") )
代替
bbTableEnv.createTemporarySystemFunction("row_to_json_agg",classOf[RowToJsonAgg])
- 更喜欢用Java来实现UDF而不是Scala