Dataflow WriteToParquet 失败,没有明确消息:“Worflow Failed”

问题描述

我已经阅读了这些帖子:

他们很有帮助,我最终为发布/订阅消息创建了类似的内容,例如:{"id": "1"}(仅用于测试):

subscription = f"projects/{project}/subscriptions/test-subscriber"
with beam.Pipeline(options=pipeline_options) as p:
   records = p | 'Read' >> beam.io.ReadFrompubSub(subscription=subscription)
   _ = records | 'Write' >> beam.io.parquetio.WritetoParquet(
            'gs://<bucket>/parquet/output/new',pa.schema([('id',pa.string())]),file_name_suffix=".parquet"
        )

我只能看到错误“工作流程失败”。但仅对于 DataflowRunner,对于 DirectRunner 我没有问题。这里的“运行”命令:

python code/dataflow/pubsub_to_gcs.py \
      --project=${PROJECT_NAME} \
      --output_path=gs://"${BUCKET_NAME}"/dataflow_output \
      --region=${REGION} \
      --job_name=testdataflow \
      --runner=DataflowRunner \
      --staging_location gs://${BUCKET_NAME}/staging_location \
      --temp_location=gs://${BUCKET_NAME}/temp \

这里是这个作业的日志(第一行是最后一次出现):

insertId,"labels.""dataflow.googleapis.com/job_id""","labels.""dataflow.googleapis.com/job_name""","labels.""dataflow.googleapis.com/log_type""","labels.""dataflow.googleapis.com/region""",logName,receiveTimestamp,resource.labels.job_id,resource.labels.job_name,resource.labels.project_id,resource.labels.region,resource.labels.step_id,resource.type,severity,textPayload,timestamp
kj98obbko,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,504796790819,dataflow_step,INFO,Worker pool stopped.,2021-01-13T14:39:45.784300690Z
kj98obbkn,DEBUG,Cleaning up.,2021-01-13T14:39:45.751440904Z
kj98obbkm,ERROR,Workflow Failed.,2021-01-13T14:39:45.733187585Z
kj98obbkl,Fusing consumer WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/StreamingPCollectionViewWriter into WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/Values,2021-01-13T14:39:45.666536683Z
kj98obbkk,Fusing consumer WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/Values into WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/MergeBuckets,2021-01-13T14:39:45.651338237Z
kj98obbkj,Fusing consumer WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/MergeBuckets into WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/ReadStream,2021-01-13T14:39:45.635839235Z
kj98obbki,projects/<proj>

目前,我已放弃使用数据流,但如果有人知道我应该在哪里检查,我将不胜感激。

解决方法

我最近遇到了类似的问题,我通过查看与工作相关的所有日志来解决它。在我的情况下,导致作业失败的原因是没有安装依赖项。我建议您检查日志以查看依赖项是否安装正常。