基于 Apache Beam KafkaIO python SDK 处理 Avro 消息

问题描述

我目前正在尝试将消息读取为类似于以下示例 KafkaIO 的 ByteDeserializer。我的测试设置如下:

选项 1: 配置为使用 --runner=PortableRunner

选项 2: 启动本地flink作业服务器,

docker run --net=host apache/beam_flink1.10_job_server:latest

发布测试 kafka avro 消息

管道参数定义为,

pipeline_args = ['--runner','FlinkRunner','--job_endpoint','localhost:8099','--environment_type','LOOPBACK','--flink_version','1.10','--flink_master','localhost:8081']
pipeline_options = PipelineOptions(pipeline_args,save_main_session=True,streaming=True)   

管道设置,

_ = (pipeline | ReadFromKafka(
                consumer_config= {'bootstrap.servers':'localhost:9092'},topic = ['beam-test-topic'])
              | beam.Flatmap(lambda kv: log_topic_contents(kv[1])))

当我执行pipeline时,使用认的扩展服务SDK镜像(apache/beam_python3.7_sdk:2.29.0),并将作业提交到flink作业服务器。 flink 作业服务器失败并显示消息“无法提交 JobGraph”和“Rest 端点关闭”。

我是否会遗漏管道的任何运行时配置?

解决方法

“LOOPBACK”环境类型目前不支持跨语言转换。您可以使用“DOCKER”类型重试吗?