数据流模板“Pub/Sub Avro to Bigquery”无法解码

问题描述

我正在尝试通过数据流模板“Pub/Sub Avro to Bigquery”将数据从 Pub/Sub 流式传输到 Bigquery。 Pub/Sub 中的数据采用 AVRO 格式并来自 Kafka 主题。我从架构注册表中获得的相应架构文件。这是它的样子:

{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"ID","type":["null","string"],"default":null},{"name":"TIMESTAMP","default":null}]}

保存的 schema.avsc 中没有换行符,我在数据流中收到此错误

2021-01-22 10:31:28.231 MEZError message from worker: java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null,"TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(UnkNown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: Could not decode Pubsub message org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1139) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) Caused by: org.apache.beam.sdk.coders.CoderException: 47 unexpected extra bytes after decoding {"ID": null,"TIMESTAMP": null} org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137) org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(UnkNown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

当我手动使用主题中的消息时,我能够使用完全相同的模式进行解码,但是我需要处理消息前面的五个额外字节。来自 Pub/Sub 的原始消息如下所示:

b'\x00\x00\x00\x00\x0c\x02\x1656173684800\x02:2021-01-22T10:21:40.384+01:00'

我怀疑,我需要对我的 schema.avsc 文件进行一些更改,以便 Dataflow 正确处理额外的字节,但我不确定如何,也可能不确定它是否是正确的方法

我希望有人能指出我正确的方向,提前致谢。

解决方法

因此,根据您提供的异常和信息,问题似乎出在您从 Pub/Sub 主题使用的输入数据中。

模板最终使用 readAvroGenericRecords 方法读取。 https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L515

这最终使用 Beam 的 AvroCoder 对数据进行二进制解码,以使用 Java Avro 库生成 Avro GenericRecord 对象。 https://github.com/apache/beam/blob/9e0920c401258ec38bddbd9298747378126b8fe4/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java#L327

因此,作为手动测试,您可以尝试使用 Java Avro 库解码数据,以确定是否可以生成 Avro GenericRecord 对象。