Apache Flink 1.11 流式接收器到 S3

问题描述

我正在使用 Flink FileSystem sql 连接器从 Kafka 读取事件并写入 S3(使用 MinIo)。这是我的代码

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
# start a checkpoint every 10 s
exec_env.enable_checkpointing(10000)
exec_env.set_state_backend(FsstateBackend("s3://test-bucket/checkpoints/"))
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env,t_config)

INPUT_TABLE = "source"
INPUT_TOPIC = "Rides"
LOCAL_KAFKA = 'kafka:9092'
OUTPUT_TABLE = "sink"

ddl_source = f"""
       CREATE TABLE {INPUT_TABLE} (
           `rideId` BIGINT,`isstart` BOOLEAN,`eventTime` STRING,`lon` FLOAT,`lat` FLOAT,`psgCnt` INTEGER,`taxiId` BIGINT
       ) WITH (
           'connector' = 'kafka','topic' = '{INPUT_TOPIC}','properties.bootstrap.servers' = '{LOCAL_KAFKA}','format' = 'json'
       )
   """

ddl_sink = f"""
       CREATE TABLE {OUTPUT_TABLE} (
           `rideId` BIGINT,`taxiId` BIGINT
       ) WITH (
           'connector' = 'filesystem','path' = 's3://test-bucket/kafka_output','format' = 'parquet'
       )
   """

t_env.sql_update(ddl_source)
t_env.sql_update(ddl_sink)

t_env.execute_sql(f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT * 
    FROM {INPUT_TABLE}
""")

我使用的是 Flink 1.11.3 和 flink-s3-fs-hadoop 1.11.3。我已将 flink-s3-fs-hadoop-1.11.3.jar 复制到 plugins 文件夹中。

cp /opt/flink/lib/flink-s3-fs-hadoop-1.11.3.jar /opt/flink/plugins/s3-fs-hadoop/;

我还在 flink-conf.yaml 中添加了以下配置。

    state.backend: filesystem
    state.checkpoints.dir: s3://test-bucket/checkpoints/
    s3.endpoint: http://127.0.0.1:9000
    s3.path.style.access: true
    s3.access-key: minio
    s3.secret-key: minio123

MinIo 运行正常,我在 MinIo 中创建了“test-bucket”。当我运行这个作业时,作业提交不会发生,Flink Dashboard 进入某种等待状态。 15-20 分钟后,我收到以下异常,

pyflink.util.exceptions.TableException: Failed to execute sql

这里似乎有什么问题?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)