将Python脚本提交为数据流作业

问题描述

我有一个简单的处理作业,想作为数据流作业提交。它只是一个将csv作为输入,将两个模型应用于输入并将输出添加到csv的python文件。如何将其作为数据流作业提交?我可以在本地计算机上执行此操作,但是每当我尝试在GCP上提交时,都会出现错误

WARNING:root:Make sure that locally built Python SDK docker image has 
Python 3.7 interpreter.
Traceback (most recent call last):
File "predict_DF.py",line 119,in <module>
run()
File "predict_DF.py",line 113,in run
p.run()
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/pipeline.py",line 513,in run
allow_proto_holders=True).run(False)
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/pipeline.py",line 526,in run
return self.runner.run_pipeline(self,self._options)
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/runners/dataflow/dataflow_runner.py",line 582,in run_pipeline
self.dataflow_client.create_job(self.job),self)
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/utils/retry.py",line 236,in wrapper
return fun(*args,**kwargs)
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/runners/dataflow/internal/apiclient.py",line 
659,in create_job
self.create_job_description(job)
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/runners/dataflow/internal/apiclient.py",line 
712,in create_job_description
io.BytesIO(job.proto_pipeline.SerializetoString()))
File "/opt/anaconda3/lib/python3.7/site- 
packages/apache_beam/runners/dataflow/internal/apiclient.py",line 
637,in stage_file
response = self._storage_client.objects.Insert(request,upload=upload)
File "/opt/anaconda3/lib/python3.7/site- packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",line 1156,in Insert
upload=upload,upload_config=upload_config)
File "/opt/anaconda3/lib/python3.7/site-packages/apitools/base/py/base_api.py",line 729,in _RunMethod
http,HTTP_Request,**opts)
File "/opt/anaconda3/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py",line 350,in MakeRequest
check_response_func=check_response_func)
File "/opt/anaconda3/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py",line 400,in _MakeRequestnoretry
redirections=redirections,connection_type=connection_type)
File "/opt/anaconda3/lib/python3.7/site-packages/oauth2client/transport.py",line 169,in new_request
redirections,connection_type)
File "/opt/anaconda3/lib/python3.7/site-packages/oauth2client/transport.py",connection_type)
File "/opt/anaconda3/lib/python3.7/site-packages/httplib2/__init__.py",line 1991,in request
cachekey,File "/opt/anaconda3/lib/python3.7/site-packages/httplib2/__init__.py",line 1651,in _request
conn,request_uri,method,body,headers
File "/opt/anaconda3/lib/python3.7/site-packages/httplib2/__init__.py",line 1558,in _conn_request
conn.request(method,headers)
File "/opt/anaconda3/lib/python3.7/http/client.py",line 1252,in request
self._send_request(method,url,headers,encode_chunked)
File "/opt/anaconda3/lib/python3.7/http/client.py",line 1298,in _send_request
self.endheaders(body,encode_chunked=encode_chunked)
File "/opt/anaconda3/lib/python3.7/http/client.py",line 1247,in endheaders
self._send_output(message_body,line 1065,in _send_output
self.send(chunk)
File "/opt/anaconda3/lib/python3.7/http/client.py",line 987,in send
self.sock.sendall(data)
File "/opt/anaconda3/lib/python3.7/ssl.py",line 1034,in sendall
v = self.send(byte_view[count:])
File "/opt/anaconda3/lib/python3.7/ssl.py",line 1003,in send
return self._sslobj.write(data)
socket.timeout: The write operation timed out

这是除导入代码外的代码

命令行

python predict_DF.py \ 
--region $REGION \
--input $INPUT \
--output $OUTPUT \
--text_extraction_model $TEXT_EXTRACTION_MODEL \
--model $MODEL \
--translation_dictionary $TRANSLATION_DICTIONARY \
--runner $RUNNER \
--project $PROJECT \
--temp_location $TEMP_LOCATION \
--staging_location $STAGING

predict_DF.py

   def run(argv=None,save_main_session=True):
   """Main entry point; defines and runs the prediction pipeline."""
       parser = argparse.ArgumentParser()
       parser.add_argument(
       '--input',dest='input',required=True,default='/Users/me/input.csv',help='Input file for predictions'
    )
    parser.add_argument(
        '--output',dest='output',default='/Users/me/output.csv,help='Output file to write results to'
    )
    parser.add_argument(
        '--model',dest='model',default='/Users/me/model.joblib'
    )
    parser.add_argument(
        '--text_extraction_model',dest='text_extraction_model',default='/Users/me/tfidfit',help='Text extraction model for processing input'
    )
    parser.add_argument(
        '--translation_dictionary',dest='translation_dictionary',default='/Users/me/id_to_category',help='Dictionary relating numerical id to classification group'
    )
    kNown_args,pipeline_args = parser.parse_kNown_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    p = beam.Pipeline(options=pipeline_options)

    model = joblib.load(kNown_args.model)
    tfidfit = pickle.load(open(kNown_args.text_extraction_model,'rb'))
    id_to_category = pickle.load(open(kNown_args.translation_dictionary,'rb'))



    def get_pred(x):
        pred = id_to_category[model.predict(tfidfit.transform([x]).toarray()).tolist()[0]]
        return pred

    def parse_method(line):
        import csv
        reader = csv.reader(line.split('\n'))
        for csv_row in reader:
            values = [x for x in csv_row]
            row = []
            for value in csv_row:
                if value != None:
                    row.append(value)
            row.append(get_pred(row[16]))
        return ",".join(row)

    lines = (p | beam.io.ReadFromText(kNown_args.input) |
         'Split' >> beam.Map(lambda s: parse_method(s)) |
         'Output to file' >> beam.io.WritetoText(kNown_args.output)
         )

    p.run()

run()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)