问题描述
我正在使用 pyflink 表 api 从 Kafka 读取数据。现在我想将结果表转换为 Pandas 数据框。这是我的代码,
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = StreamTableEnvironment \
.create(exec_env,environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
INPUT_TABLE = "source"
t_env \
.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect","zookeeper:2181")
.property("bootstrap.servers","kafka:9092")) \
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId",DataTypes.BIGINT()),DataTypes.FIELD("isstart",DataTypes.BOOLEAN()),DataTypes.FIELD("eventTime",DataTypes.STRING()),DataTypes.FIELD("lon",DataTypes.FLOAT()),DataTypes.FIELD("lat",DataTypes.FIELD("psgCnt",DataTypes.INT()),DataTypes.FIELD("taxiId",DataTypes.BIGINT())]))) \
.with_schema( # declare the schema of the table
Schema()
.field("rideId",DataTypes.BIGINT())
.field("taxiId",DataTypes.BIGINT())
.field("isstart",DataTypes.BOOLEAN())
.field("lon",DataTypes.FLOAT())
.field("lat",DataTypes.FLOAT())
.field("psgCnt",DataTypes.INT())
.field("eventTime",DataTypes.STRING())) \
.in_append_mode() \
.create_temporary_table(INPUT_TABLE)
table = t_env.from_path(INPUT_TABLE)
df = table.to_pandas()
但在这里我既没有得到错误也没有得到结果。我正在使用 Flink 1.11.3。有没有办法将这个动态表转换成静态表或使 table.to_pandas() 工作的东西?
解决方法
我们不能在无界流表中调用 to_pandas
。 to_pandas
只能在有界表中调用。