GCP Dataflow Kafka作为 Azure 事件中心-> 大查询 工作指令GCP 数据流中的错误更新配置管道相关问题

问题描述

TDLR;

我有一个支持 Kafka 的 Azure 事件中心,我试图从 Google Cloud 的 Dataflow 服务连接到它,以将数据流式传输到 Google Big Query。我成功地可以使用 Kafka CLI 与 Azure 事件中心通信。但是,使用 GCP,5 分钟后,我在 GCP 数据流作业窗口中收到超时错误

启用了 Kafka 的 Azure EH -> GCP 数据流 -> GCP Big Query 表

详情

为了设置支持 Kafka 的事件中心,我遵循了 this GitHub page 上的详细信息。它让开发者添加 jaas.confclient_common.propertiesjaas.conf 包含对登录模块的引用以及用户名/密码。带有 Kafka 的事件中心的用户名$ConnectionString。密码是从 CLI 复制的连接字符串。 client_common.properties 包含两个标志:security.protocol=SASL_SSLsasl.mechanism=PLAIN。通过配置这些文件,我能够使用 Kafka CLI 工具和 Azure 事件中心发送和接收数据。我可以通过 Azure 事件中心看到从生产者到消费者的数据流。

export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"

(echo -n "1|"; cat message.json | jq . -c) | kafka-conle-producer.sh --topic test-event-hub --broker-list test-eh-namespace.servicebus.windows.net:9093 --producer.config client_common.properties --property "parse.key=true" --property "key.separator=|"

kafka-console-consumer.sh --topic test-event-hub --bootstrap-server test-eh-namespace.servicebus.windows.net:9093 --consumer.config client_common.properties --property "print.key=true"
# prints: 1 { "transaction_time": "2020-07-20 15:14:54","first_name": "Joe","last_name": "Smith" }

修改了 Kafka -> Big Query 的 Google's Data Flow template。已经有一个用于重置偏移量的配置映射。我添加了额外的配置来匹配 Azure 事件中心和 Kafka 教程。虽然不是最佳实践,但我将连接字符串添加到密码字段进行测试。当我将其上传到 GCP 数据流引擎并运行作业时,我每 5 分钟就会在日志中收到一次超时错误,而 Google Big Query 中没有任何结果。

工作指令

gcloud dataflow jobs run kafka-test --gcs-location=<removed> --region=us-east1 --worker-zone=us-east4-a --parameters bootstrapServers=test-eh-namespace.servicebus.servicebus.windows.net:9093,inputTopic=test-event-hub,outputTableSpec=project:Kafka_Test.test --service-account-email my-service-account.iam.gserviceaccount.com

GCP 数据流中的错误

# these errors show up in the worker logs
Operation ongoing in step ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1481) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.updatedSpecWithAssignedPartitions(KafkaUnboundedSource.java:85) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:125) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReader.iterator(WorkerCustomSources.java:433) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:186) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Execution of work for computation 'S4' on key '0000000000000001' Failed with uncaught exception. Work will be retried locally.

# this error shows up in the Job log
Error message from worker: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic Metadata

更新配置

Map<String,Object> props = new HashMap<>();
// azure event hub authentication
props.put("sasl.mechanism","PLAIN");
props.put("security.protocol","SASL_SSL")
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<removed>\";");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");

// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONfigURATION.md
props.put("request.timeout.ms",60000);
props.put("session.timeout.ms",15000);
props.put("max.poll.interval.ms",30000);
props.put("offset.Metadata.max.bytes",1024);
props.put("connections.max.idle.ms",180000);
props.put("Metadata.max.age.ms",180000);

管道

    PCollectionTuple convertedTableRows =
                pipeline
                        /*
                         * Step #1: Read messages in from Kafka
                         */
                        .apply(
                                "ReadFromKafka",KafkaIO.<String,String>read()
                                        .withConsumerConfigUpdates(ImmutableMap.of(props))
                                        .withBootstrapServers(options.getBootstrapServers())
                                        .withTopics(topicslist)
                                        .withKeyDeserializerAndCoder(
                                                StringDeserializer.class,NullableCoder.of(StringUtf8Coder.of()))
                                        .withValueDeserializerAndCoder(
                                                StringDeserializer.class,NullableCoder.of(StringUtf8Coder.of()))
                                        .withoutMetadata())

                        /*
                         * Step #2: Transform the Kafka Messages into TableRows
                         */
                        .apply("ConvertMessagetoTableRow",new MessagetoTableRow(options));

相关问题

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)