Apache Beam / Dataflow pub/sub 端输入与 python

问题描述

我是 Apache Beam 的新手,所以我在以下场景中遇到了一些困难:

  • 使用流模式的发布/订阅主题
  • 转化为取出customerId
  • 带有 Transform/ParDo 的并行 PCollection,它根据 Pub/Sub 主题中收到的“customerId”从 Firestore 获取数据(使用侧输入)
  • ...

尝试获取 Firestore 数据的 ParDo 转换根本没有运行。如果使用“customerId”固定值,一切都按预期工作......尽管没有使用正确的 Firestore 提取(简单的 ParDo),它仍然有效。我在做不应该做的事情吗? 包括我的代码如下:

class getFirestoreUsers(beam.DoFn):
    def process(self,element,customerId):

        print(f'Getting Users from Firestore,ID: {customerId}')

        # Call function to initialize Database
        db = intializefirebase()

        """ # get customer information from the database
        doc = db.document(f'Customers/{customerId}').get()
        customer = doc.to_dict() """
        usersList = {}

        # Get Optin Users
        try:
            docs = db.collection(
                f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn',u'==',True).stream()
            usersList = {user.id: user.to_dict() for user in docs}
        except Exception as err:
            print(f"Error: Couldn't retrieve OPTIN users from DevicesWiFi")
            print(err)

        return([usersList])

主要代码

def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--topic',type=str,help='Pub/Sub topic to read from')
    parser.add_argument(
        '--output',help=('Output local filename'))

    args,pipeline_args = parser.parse_kNown_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    p = beam.Pipeline(options=options)

    users = (p | 'Create chars' >> beam.Create([
        {
             "clientMac": "7c:d9:5c:b8:6f:38","username": "Louis"
             },{
            "clientMac": "48:fd:8e:b0:6f:38","username": "Paul"
        }
    ]))


    # Get Dictionary from Pub/Sub
    data = (p | 'Read from PubSub' >> beam.io.ReadFrompubSub(topic=args.topic)
            | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
            )

    # Get customerId from Pub/Sub information
    PcustomerId = (data | 'get customerId from Firestore' >>
                   beam.ParDo(lambda x: [x.get('customerId')]))
    PcustomerId | 'print customerId' >> beam.Map(print)

    # Get Users from Firestore
    custUsers = (users | 'Read from Firestore' >> beam.ParDo(
        getFirestoreUsers(),customerId=beam.pvalue.AsSingleton(PcustomerId)))
    custUsers | 'print Users from Firestore' >> beam.Map(print)

为了避免运行该函数时出错,我必须初始化“users”字典,之后我完全忽略了它。 我想我在这里有几个错误,因此非常感谢您的帮助。

解决方法

我不清楚示例代码中如何使用 users PCollection(因为 element 未在 process 定义中处理)。我使用窗口重新排列了代码,并使用 customer_id 作为主要输入。

class GetFirestoreUsers(beam.DoFn):
  def setup(self):
    # Call function to initialize Database
    self.db = intializeFirebase()

  def process(self,element):
    print(f'Getting Users from Firestore,ID: {element}')

    """ # get customer information from the database
    doc = self.db.document(f'Customers/{element}').get()
    customer = doc.to_dict() """
    usersList = {}

    # Get Optin Users
    try:
        docs = self.db.collection(
            f'Customers/{element}/DevicesWiFi_v3').where(u'optIn',u'==',True).stream()
        usersList = {user.id: user.to_dict() for user in docs}
    except Exception as err:
        print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
        print(err)

    return([usersList])



data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
          | beam.WindowInto(window.FixedWindow(60))
          | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))

# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
               beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)

# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
    GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)

来自您的评论:

使用来自 Pub/Sub 的原始 JSON 数据运行“主”PCollection 时,所需的数据(先客户 ID 后客户数据)未准备好

您的意思是在阅读 Pub/Sub 主题时 firestore 中的数据还没有准备好?

您始终可以在主函数中将逻辑拆分为 2 个管道,然后一个接一个地运行它们。