Python - Apace Beam [GCP] - GCS 的 PubSub 与直接运行器一起使用,但不与 DataFlow 运行器一起使用

问题描述

我有一个用 python 编写的 Apache 光束管道,它从 pubSub 主题读取数据,然后聚合和转置数据并写入 GCS。

当我使用直接运行器运行 python 脚本时,管道工作并填充 GCS 文件。当我尝试在云数据流中部署时,它抛出了一个错误

Error Message for Dataflow runner

Dataflow UI 中的错误日志:

Dataflow logs

我尝试了相同的转换来写回 BigQuery 表,并且它与数据流运行器一起工作得很好。

Apache Beam 版本:apache-beam==2.27.0

用于执行的命令:

python3 pubSubToGCS.py --runner DataFlow --project <PROJECT_ID> \
--temp_location gs://<PROJECT_ID>-temp-bucket/tmp \
--staging_location gs://<PROJECT_ID>-temp-bucket/staging \
--streaming --job_name dataloading --region us-central1 \
--SUBSCRIPTION_NAME projects/<PROJECT_ID>/subscriptions/RawSensorReadings-sub \
--GCS_PATH gs://<PROJECT_ID>/RawDataReadings/TEST

pubSubToGCS.py :

import argparse
import datetime
import logging
import time
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window
import pandas as pd
from apache_beam.io import WritetoText

def convertToCsv(sensorValues):
  (timestamp,values) =  sensorValues
  df = pd.DataFrame(values)
  df.columns = ["Sensor","Value"]
  csvStr =  str(timestamp)+","+",".join(str(x) for x in list(df.groupby(["Sensor"]).mean().T.iloc[0]))
  return csvStr

def roundTime(dt=None,roundTo=60):
   if dt == None : dt = datetime.datetime.Now()
   seconds = (dt.replace(tzinfo=None) - dt.min).seconds
   rounding = (seconds+roundTo/2) // roundTo * roundTo
   return str(dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond))

def run(subscription_path,output_gcs_path,interval=1,pipeline_args=None):

    with beam.Pipeline(options=PipelineOptions( pipeline_args,streaming=True,save_main_session=True)) as p:
      data = (p
        | 'ReadData' >> beam.io.ReadFrompubSub(subscription=subscription_path)
        | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
        #RAW messages comes in format :

        # ....
        # 2021-02-15 11:58:25.637077,Pressure_2,2064.6582700689737
        # 2021-02-15 11:58:25.638462,Pressure_3,2344.5496650659934
        # 2021-02-15 11:58:25.639584,Pressure_4,2124.8628543586183
        # 2021-02-15 11:58:25.640560,Pressure_5,2026.8313489428847
        # ....      
        | "Convert to list" >> beam.Map(lambda x: x.split(","))
        | "to tuple" >> beam.Map(lambda x: (roundTime(datetime.datetime.strptime(x[0],'%Y-%m-%d %H:%M:%s.%f'),roundTo = interval),[x[1],float(x[2]) ] ))
        # After converting to tuple : 
        # ('2021-02-15 11:58:23',['Pressure_4',2053.5058784228463])
        # ('2021-02-15 11:58:23',['Pressure_5',1554.4377708096179])
        # ('2021-02-15 11:58:23',1584.4305417337491])
        # ('2021-02-15 11:58:24',['Pressure_1',2392.3361259850535])
        # ('2021-02-15 11:58:24',['Pressure_2',1975.1719957768883])
        # ('2021-02-15 11:58:24',['Pressure_3',2713.306279034812])
        # ('2021-02-15 11:58:24',1902.3089540631288])
        # ('2021-02-15 11:58:24',1932.9791896923427])
        # ('2021-02-15 11:58:26',2022.983225929031])
        # ('2021-02-15 11:58:26',1799.7312618480091])
        # ('2021-02-15 11:58:26',2166.0263912901733])

        | "Window" >> beam.WindowInto(window.FixedWindows(15))
        | "Groupby" >> beam.GroupByKey()
        | "convert to csv" >> beam.Map(convertToCsv)
        # After aggegation and Transpose : (Header just for reference,not included in PCOLL)
        # Timestamp,Pressure_1,Pressure_5
        # 2021-02-15 12:09:35,1965.012352391227,2004.429036333092,2251.4681479138094,1771.5563402502842,1756.392145648632
        # 2021-02-15 12:09:36,1825.3075719663318,2259.6443056432713,2072.838378531564,2120.7269395022995,1783.3072412809652
        # 2021-02-15 12:09:37,2275.9356184022963,1989.6216653552221,2157.401713549938,1657.4214496485536,1923.4706666235156
        # 2021-02-15 12:09:38,1892.8002062318985,1945.9966592074343,2317.891325891871,1636.2570086301153,1971.247401688967
        # 2021-02-15 12:09:39,1944.2184082592953,1913.9882349839,2244.7614223797464,1893.985945158099,1579.50215334007
        | "Write to GCS" >> WritetoText(output_gcs_path)
        # This works with Direct runner,but with dataflow runner doesn't work
      )

if __name__ == "__main__": 
    logging.getLogger().setLevel(logging.INFO)
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--SUBSCRIPTION_NAME",help="The Cloud Pub/Sub subscription to read from.\n"
        '"projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>".',)
    parser.add_argument(
        "--GCS_PATH",help = "Path to GCP cloud storage buket\n"
        '"<gs://<PROJECT_ID>/<BUCKET>/<PATH>"')
    parser.add_argument(
        "--AGGREGATION_INTERVAL",type = int,default = 1,help="Number of seconds to aggregate.\n",)
    args,pipeline_args = parser.parse_kNown_args()
    run(
        args.SUBSCRIPTION_NAME,args.GCS_PATH,args.AGGREGATION_INTERVAL,pipeline_args
      )

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)