为什么ParDo无法在DataflowRunner上运行?

问题描述

当我从DirectRunner切换到Dataflowrunner 时遇到问题:Pardo显然不起作用。当我将Runner设置为Dataflowrunner时, def进程(自身,查询永远不会运行。 我可以看到该工作正在GCP上运行,但是,我的方法InsertPostgresql在使用Dataflowrunner时不起作用。

根据日志错误,显然ParDo无法识别“ psycopg2”。

进程NameError:名称“ psycopg2”未定义 上面的例外情况

psycopg2' is not defined

我想知道为什么吗?

def run_pipeline(): 
      
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\julianocm\Desktop\POC\<mycredentialfile>.json'
     
        optionsGCP = {
            'streaming': True,'project':"<myproject>",'region':"us-central1",'temp_location':"gs://poc360-bucket/temp",'staging_location':"gs://poc360-bucket/staging",'drivername':"postgresql",'save_main_session': True,'setup_file': r'C:\Users\julianocm\Desktop\POC\setup.py'     
        }
    
        paramsDB = {
            'database': '<mydatabase>','user': '<myuser>','password': '<mypassword>','host': 'localhost','port': '5000'
        }
    
        class InsertPostgresql(beam.DoFn):
    
            def __init__(self,**server_config):
                self.config = server_config
    
            def process(self,query):
                con = psycopg2.connect(**self.config)
    
                cur = con.cursor()
                cur.execute(query)
                con.commit()
                resultado=cur.fetchall() 
                cur.close()
                con.close()
                
                yield resultado
    
        runner='Dataflowrunner' 
        options = PipelineOptions(**optionsGCP)
        options.view_as(SetupOptions).save_main_session = True
        options.view_as(StandardOptions).streaming = True 
    
        sql="select public.Insert_tbCadastro('01','010',431,'A',501741,000000,'2020-10-26','A')"
     
            
        p = beam.Pipeline(runner=runner,options=options)
        data = (p
            | beam.Create([sql]) 
            | beam.ParDo(InsertPostgresql(**paramsDB)) 
        )
        
        data | 'teste' >> beam.Map(print)
        print("Lines: ",data)
    
        result = p.run()
        result.wait_until_finish()
    
    if __name__=='__main__':
        run_pipeline()

让工作继续执行,一段时间后,我得到了:

2020-11-12T11:54:12.718405344ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py",line 1213,in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",line 569,in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",line 1344,in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py",line 45,in process NameError: name 'psycopg2' is not defined During handling of the above exception,another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",line 256,in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",line 313,in <lambda> lambda: self.create_worker().do_instruction(request),request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",line 482,in do_instruction return getattr(self,request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",line 518,in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",line 982,in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",line 219,in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py",line 330,in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py",line 332,line 195,in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",line 670,in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py",line 671,in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",line 1215,line 1279,in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py",line 1371,in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",line 1294,in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",line 446,in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py",in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterandProcessBundleOperation.finish(RegisterandProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py",in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

除了以上所有内容,工作还包括

timeout

应用setup.py文件后,我得到:

warning

setup.py:

import setuptools 

setuptools.setup(
    name='psycopg2',version='2.8.6',install_requires=[],packages=setuptools.find_packages(),)

数据流日志错误

2020-11-12T21:11:50.946327861ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction -1396: Traceback (most recent call last): File "apache_beam/runners/common.py",in
 apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",in 
apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py",line 48,in process
 NameError: name 'psycopg2' is not defined During handling of the above exception,another exception occurred: Traceback
 (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",request) 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",in do_instruction return 
getattr(self,in process_bundle bundle_processor.process_bundle(instruction_id))
 File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",in process_bundle
 input_op_by_transform_id[element.transform_id].process_encoded( 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-1388'] 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterandProcessBundleOperation.finish(RegisterandProcessBundleOperation.java:333) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.RuntimeException: Error received from SDK harness for instruction -1396: 
Traceback (most recent call last): File "apache_beam/runners/common.py",

使用requirements.txt:

execution

我仍在等待作业日志...直到现在,我的Postgresql表仍然为空。

几分钟后,我得到了日志,但仍然无法定义psycopg2:

line 48,in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

我的要求。txt:

file

有关NameErros的故障排除指南

显然,我们修复了NameError。现在,我试图弄清楚为什么我没有执行sql。我的工作仍然有效,但是我的Postgresql没有任何记录。所以,还是有问题。

Job

当我调试时,“ process”中的行将被忽略。参数是否正确使用?

enter image description here

毕竟:

timeout

谢谢, 朱利安诺

解决方法

好像没有将psycopg2软件包提供给工作人员。有关如何在python中管理依赖项的信息,请参见https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/。在这种情况下,由于PsyPi中提供了psycopg2,因此您只需将其包含在通过--requirements_file管道选项传递的requirements.txt文件中即可。

,

该问题可能与全球进口有关。从此troubleshooting guide可以看出,从直接运行器切换到与您的实际情况匹配的数据流运行器时,这是一个常见问题。

鉴于您已经尝试了添加标志--save_main_session=True的解决方案,我将尝试将模块导入函数本身,而不是:

import psycopg2
(...)
def process(self,query):
  con = psycopg2.connect(**self.config)

尝试:

def process(self,query):
  import psycopg2
  con = psycopg2.connect(**self.config)

- 编辑

鉴于我们绕过了该错误,并且该错误在DirectRunner中正常运行,我怀疑在代码本身中是否发现了该问题。我将查看防火墙规则,以确保Dataflow Worker可以连接到PostgreSQL数据库,并检查控制器服务帐户是否具有必要的权限。

我看到您在代码中指定了一些凭据,并考虑到数据流工作人员使用默认的controller service account <project-number>-compute@developer.gserviceaccount.com或自定义的凭据,并且他们需要具有必要的权限才能连接到数据库。

这两种情况(工作人员使用的防火墙规则/服务帐户)都是直接运行程序和数据流运行程序之间的潜在差异,因此可能是问题的原因,请对其进行审查。