问题描述
我已经阅读了这些帖子:
他们很有帮助,我最终为发布/订阅消息创建了类似的内容,例如:{"id": "1"}
(仅用于测试):
subscription = f"projects/{project}/subscriptions/test-subscriber"
with beam.Pipeline(options=pipeline_options) as p:
records = p | 'Read' >> beam.io.ReadFrompubSub(subscription=subscription)
_ = records | 'Write' >> beam.io.parquetio.WritetoParquet(
'gs://<bucket>/parquet/output/new',pa.schema([('id',pa.string())]),file_name_suffix=".parquet"
)
我只能看到错误“工作流程失败”。但仅对于 DataflowRunner,对于 DirectRunner 我没有问题。这里的“运行”命令:
python code/dataflow/pubsub_to_gcs.py \
--project=${PROJECT_NAME} \
--output_path=gs://"${BUCKET_NAME}"/dataflow_output \
--region=${REGION} \
--job_name=testdataflow \
--runner=DataflowRunner \
--staging_location gs://${BUCKET_NAME}/staging_location \
--temp_location=gs://${BUCKET_NAME}/temp \
这里是这个作业的日志(第一行是最后一次出现):
insertId,"labels.""dataflow.googleapis.com/job_id""","labels.""dataflow.googleapis.com/job_name""","labels.""dataflow.googleapis.com/log_type""","labels.""dataflow.googleapis.com/region""",logName,receiveTimestamp,resource.labels.job_id,resource.labels.job_name,resource.labels.project_id,resource.labels.region,resource.labels.step_id,resource.type,severity,textPayload,timestamp
kj98obbko,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,504796790819,dataflow_step,INFO,Worker pool stopped.,2021-01-13T14:39:45.784300690Z
kj98obbkn,DEBUG,Cleaning up.,2021-01-13T14:39:45.751440904Z
kj98obbkm,ERROR,Workflow Failed.,2021-01-13T14:39:45.733187585Z
kj98obbkl,Fusing consumer WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/StreamingPCollectionViewWriter into WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/Values,2021-01-13T14:39:45.666536683Z
kj98obbkk,Fusing consumer WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/Values into WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/MergeBuckets,2021-01-13T14:39:45.651338237Z
kj98obbkj,Fusing consumer WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/MergeBuckets into WritetoParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/ReadStream,2021-01-13T14:39:45.635839235Z
kj98obbki,projects/<proj>
目前,我已放弃使用数据流,但如果有人知道我应该在哪里检查,我将不胜感激。
解决方法
我最近遇到了类似的问题,我通过查看与工作相关的所有日志来解决它。在我的情况下,导致作业失败的原因是没有安装依赖项。我建议您检查日志以查看依赖项是否安装正常。