问题描述
我是 Apache Beam 的新手,所以我在以下场景中遇到了一些困难:
- 使用流模式的发布/订阅主题
- 转化为取出customerId
- 带有 Transform/ParDo 的并行 PCollection,它根据 Pub/Sub 主题中收到的“customerId”从 Firestore 获取数据(使用侧输入)
- ...
尝试获取 Firestore 数据的 ParDo 转换根本没有运行。如果使用“customerId”固定值,一切都按预期工作......尽管没有使用正确的 Firestore 提取(简单的 ParDo),它仍然有效。我在做不应该做的事情吗? 包括我的代码如下:
class getFirestoreUsers(beam.DoFn):
def process(self,element,customerId):
print(f'Getting Users from Firestore,ID: {customerId}')
# Call function to initialize Database
db = intializefirebase()
""" # get customer information from the database
doc = db.document(f'Customers/{customerId}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = db.collection(
f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn',u'==',True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: Couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
主要代码
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--topic',type=str,help='Pub/Sub topic to read from')
parser.add_argument(
'--output',help=('Output local filename'))
args,pipeline_args = parser.parse_kNown_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
users = (p | 'Create chars' >> beam.Create([
{
"clientMac": "7c:d9:5c:b8:6f:38","username": "Louis"
},{
"clientMac": "48:fd:8e:b0:6f:38","username": "Paul"
}
]))
# Get Dictionary from Pub/Sub
data = (p | 'Read from PubSub' >> beam.io.ReadFrompubSub(topic=args.topic)
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
)
# Get customerId from Pub/Sub information
PcustomerId = (data | 'get customerId from Firestore' >>
beam.ParDo(lambda x: [x.get('customerId')]))
PcustomerId | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (users | 'Read from Firestore' >> beam.ParDo(
getFirestoreUsers(),customerId=beam.pvalue.AsSingleton(PcustomerId)))
custUsers | 'print Users from Firestore' >> beam.Map(print)
为了避免运行该函数时出错,我必须初始化“users”字典,之后我完全忽略了它。 我想我在这里有几个错误,因此非常感谢您的帮助。
解决方法
我不清楚示例代码中如何使用 users
PCollection(因为 element
未在 process
定义中处理)。我使用窗口重新排列了代码,并使用 customer_id
作为主要输入。
class GetFirestoreUsers(beam.DoFn):
def setup(self):
# Call function to initialize Database
self.db = intializeFirebase()
def process(self,element):
print(f'Getting Users from Firestore,ID: {element}')
""" # get customer information from the database
doc = self.db.document(f'Customers/{element}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = self.db.collection(
f'Customers/{element}/DevicesWiFi_v3').where(u'optIn',u'==',True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
| beam.WindowInto(window.FixedWindow(60))
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))
# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)
来自您的评论:
使用来自 Pub/Sub 的原始 JSON 数据运行“主”PCollection 时,所需的数据(先客户 ID 后客户数据)未准备好
您的意思是在阅读 Pub/Sub 主题时 firestore 中的数据还没有准备好?
您始终可以在主函数中将逻辑拆分为 2 个管道,然后一个接一个地运行它们。