问题描述
我使用的是 flink 1.13。我正在尝试以下列方式将表结果转换为数据流,但一直出错。
public class HybridTrial {
public static class Address {
public String street;
public String houseNumber;
public Address() {}
public Address(String street,String houseNumber) {
this.street = street;
this.houseNumber = houseNumber;
}
}
public static class User {
public String name;
public Integer score;
public LocalDateTime event_time;
public Address address;
// default constructor for DataStream API
public User() {}
// fully assigning constructor for Table API
public User(String name,Integer score,LocalDateTime event_time,Address address) {
this.name = name;
this.score = score;
this.event_time = event_time;
this.address = address;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<User> dataStream =
env.fromElements(
new User("Alice",4,LocalDateTime.Now(),new Address()),new User("Bob",6,new Address("NBC","204")),new User("Alice",10,new Address("ABC","1033")))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(60)));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table =
tableEnv.fromDataStream(
dataStream,Schema.newBuilder().build());
table.printSchema();
Table t = table.select($("*"));
DataStream<User> dsRow = tableEnv.toDataStream(t,User.class);
dsRow.print();
env.execute();
}
}
我得到的错误是:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'event_time' at position 2.
Query schema: [name: STRING,score: INT,event_time: RAW('java.time.LocalDateTime','...'),address: *flinksqlExperiments.HybridTrial$Address<`street` STRING,`houseNumber` STRING>*]
Sink schema: [name: STRING,event_time: TIMESTAMP(9),`houseNumber` STRING>*]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:437)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:256)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:198)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:143)
我也尝试过从 DataStream 到 table 的自定义转换,但是在从 table 转换到 DataStream 时仍然遇到错误。我被卡住了,所以感谢任何帮助。
解决方法
DataStream 中基于反射的自动类型提取不如 Table API 强大。这也是由于 DataStream API 中的状态向后兼容性问题。
Traceback (most recent call last):
File "C:\Users\Ankit Chawrai\anaconda3\Scripts\jupyter-notebook-script.py",line 6,in <module>
from notebook.notebookapp import main
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\notebook\notebookapp.py",line 51,in <module>
from zmq.eventloop import ioloop
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\zmq\__init__.py",line 50,in <module>
from zmq import backend
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\zmq\backend\__init__.py",line 40,in <module>
reraise(*exc_info)
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\zmq\utils\sixcerpt.py",line 34,in reraise
raise value
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\zmq\backend\__init__.py",line 27,in <module>
_ns = select_backend(first)
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\zmq\backend\select.py",line 28,in select_backend
mod = __import__(name,fromlist=public_api)
File "C:\Users\Ankit Chawrai\anaconda3\lib\site-packages\zmq\backend\cython\__init__.py",in <module>
from . import (constants,error,message,context,ImportError: DLL load failed while importing error: The specified module could not be found.```
字段是 DataStream API 中的 event_time
,它导致 Table API 中的 GenericType
。您有以下几种可能:
- 在
RAW
中给出正确的TypeInformation
- 使用
fromElements
中的TypeInformation
覆盖DataType