如何在Google Cloud Dataflow中设置到外部数据库服务器的SSH隧道?

问题描述

使用DataflowRunner使我的Apache Beam管道在Cloud Dataflow上工作时,我面临一个问题。

管道的第一步是连接到VM上托管的外部Postgresql服务器,该服务器只能通过SSH端口22从外部进行访问,并提取一些数据。我无法更改这些防火墙规则,因此只能通过SSH隧道(也称为端口转发)通过 连接到数据库服务器。

在我的代码中,我使用了python库sshtunnel。当使用 DirectRunner 从我的开发计算机启动管道时,它可以完美工作:

from sshtunnel import open_tunnel

with open_tunnel(
        (user_options.ssh_tunnel_host,user_options.ssh_tunnel_port),ssh_username=user_options.ssh_tunnel_user,ssh_password=user_options.ssh_tunnel_password,remote_bind_address=(user_options.dbhost,user_options.dbport)
    ) as tunnel:
        with beam.Pipeline(options=pipeline_options) as p:
            (p | "Read data" >> ReadFromsql(
                host=tunnel.local_bind_host,port=tunnel.local_bind_port,username=user_options.dbusername,password=user_options.dbpassword,database=user_options.dbname,wrapper=PostgresWrapper,query=select_query
            )
                | "Format CSV" >> DictToCSV(headers)
                | "Write CSV" >> WritetoText(user_options.export_location)
            )

在非认VPC中使用 DataflowRunner 启动的相同代码(其中所有入口均被拒绝但没有出口限制,并且配置了CloudNAT)失败,并显示以下消息:

psycopg2.OperationalError:无法连接到服务器:连接被拒绝服务器是否在主机“ 0.0.0.0”上运行并在端口41697上接受TCP / IP连接? [在运行“读取数据/读取”时]

因此,很明显,我的隧道出了点问题,但我无法确切地发现是什么。我开始怀疑是否甚至可以通过CloudNAT进行直接SSH隧道设置,直到找到这篇博客文章https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1指出:

Cloud Dataflow的核心优势在于您可以调用外部服务来进行数据丰富。例如,您可以调用微服务以获取元素的其他数据。 在DoFn中,调出服务(通常通过HTTP完成)。 只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制自己选择的任何类型的连接

因此应该可以建立此隧道!我不想放弃,但是我不知道下一步该怎么做。有什么主意吗?

感谢阅读

解决方法

问题解决了!我不敢相信我已经花了整整两天的时间...我的方向完全错误。

问题不在于某些Dataflow或GCP网络配置,据我所知...

只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制自己选择的任何类型的连接

是真的。

问题当然在我的代码中:只有在分布式环境中才发现问题。我犯了从主管道处理器而不是工人那里打开隧道的错误。因此SSH隧道已建立,但不在工作服务器与目标服务器之间,仅在主管道与目标服务器之间!

要解决此问题,我必须更改请求的DoFn,以使用隧道包装查询执行:

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self,*args,**kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args,**kwargs)

def process(self,query,**kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost,self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'],self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,ssh_username=self.kwargs["ssh_user"],ssh_password=self.kwargs["ssh_password"],remote_bind_address=remote_address,set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args,**self.kwargs)
        sql.SQLSouceInput._build_value(source,source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records,schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row,schema)

如您所见,我不得不重写pysql_beam库的某些位。

最后,每个工作人员为每个请求打开自己的隧道。可能可以优化此行为,但足以满足我的需求。