问题描述
我正在尝试执行反规范化操作,在该操作中,我需要使用以下逻辑重新组织表:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
考虑到我知道a-priori类的列表,然后加入结果的pcollection,我的方法是执行一个for循环,以便用class
进行过滤。
高级代码:
CLASSES = ["A","B","C"]
tables = [
(
data
| "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"],elem))
)
for cin CLASSES
]
和联接:
_ = (
tables
| "Flatten" >> beam.Flatten()
| "Join Collections" >> beam.GroupByKey()
| "Remove key" >> beam.MapTuple(lambda _,val: val)
| "Merge dicts" >> beam.ParDo(mergeDicts())
| "Write to GCS" >> beam.io.WritetoText(output_file)
)
with(根据Peter Kim的建议进行编辑):
class mergeDicts(beam.DoFn):
process(self,elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我的问题是,在Apache Beam计算引擎中执行管道时,我会获得由列表的最后一个元素(在本例中为C)过滤的相同pcollections。
[ADDED]似乎Apache Beam引擎在所有最终调用的分支中,将迭代变量置于最终状态,这意味着迭代列表的最后一个元素。
解决方法
您遇到的是关于closures,loops,and Python scoping的令人惊讶的陷阱。您可以通过分配变量而不是从闭包中拉出变量来解决此问题。例如
tables = [
(
data
# Pass it as a side input to Filter.
| "Filter by Language" >> beam.Filter(lambda elem,cls: elem["class"],c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"],elem))
)
for c in CLASSES
]
或
tables = [
(
data
# Explicitly capture it as a default value in the lambda.
| "Filter by Language" >> beam.Filter(lambda elem,cls=c: elem["class"])
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"],elem))
)
for c in CLASSES
]
分区在这里也很有效,既可以避免这种陷阱,也可以表达您的意图。
,根据您显示的结果表,我假设您希望输出看起来像这样:
{'itemid': '1','value B': 10.3,'value A': 0.2}
{'itemid': '2','value B': 0.2,'value A': 3.0}
{'itemid': '3','value B': 1.2,'value A': 0.0,'value C': 5.4}
您的mergeDicts正在覆盖值,因为每个键只能容纳一个值。将mergeDicts更新为类似这样的内容以指定键:
class mergeDicts(beam.DoFn):
process(self,elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
,
我在这里发布了一个由我自己找到的解决方案,但是由于我想更好地了解Beam引擎的执行逻辑,因此我并没有将其作为正确的答案。 >
为了使用条件获取单独的pcollection,我使用了beam.Partition
类,而不是通过循环中的项目过滤表。通过直接在文档中应用代码示例,我将pcollection划分为多个表,准备加入。
这样可以避免该问题,但是我不清楚为什么for循环无法按我的预期工作。