问题描述
我正在尝试将我的Scala UDF(Map[String,String]
)的scala.collection.immutable.map
对象输出映射到Table API中的某些有效数据类型,即通过java类型(java.util.Map
)映射为在这里推荐:Flink Table API & SQL and map types (Scala)。但是我得到下面的错误。
关于正确的前进方式的任何想法吗?如果是,是否有办法将转换概括为Map[String,Any]
类型的(嵌套)Scala对象?
Scala UDF
class dummyMap() extends ScalarFunction {
def eval() = {
val whatevermap = Map("key1" -> "val1","key2" -> "val2")
whatevermap.asInstanceOf[java.util.Map[java.lang.String,java.lang.String]]
}
}
水槽
my_sink_ddl = f"""
create table mySink (
output_of_dummyMap_udf MAP<STRING,STRING>
) with (
...
)
"""
Py4JJavaError: An error occurred while calling o430.execute.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf: GenericType<java.util.Map>]
TableSink schema: [output_of_my_scala_udf: Map<String,String>]
谢谢!
解决方法
Wei Zhong的原始答案。 我只是记者谢谢魏!
此时(Flink 1.11),两种方法都可以使用:
- 当前:UDF定义中的DataTypeHint +用于UDF注册的SQL
- 已过时:在UDF定义中覆盖getResultType +用于UDF注册的t_env.register_java_function
代码
Scala UDF
package com.dummy
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
class dummyMap extends ScalarFunction {
// If the udf would be registered by the SQL statement,you need add this typehint
@DataTypeHint("ROW<s STRING,t STRING>")
def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"),java.lang.String.valueOf("bar"))
}
// If the udf would be registered by the method 'register_java_function',you need override this
// method.
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s","t"),Array[TypeInformation[_]](Types.STRING(),Types.STRING()))
}
}
Python代码
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)
# load the scala udf jar file,the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars","file:///Users/zhongwei/the-dummy-udf.jar")
# register the udf via
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE SCALA")
# or register via the method
# st_env.register_java_function("dummyMap","com.dummy.dummyMap")
# prepare source and sink
t = st_env.from_elements([(1,'hi','hello'),(2,'hello')],['a','b','c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW<s STRING,t STRING>
) with (
'connector' = 'print'
)""")
# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()