问题描述
我是Apache Beam的新手,在这个看似非常简单的事情上停留了几个小时:
如何在Apache Beam中完成 pandas.DataFrame.pct_change ?
我正在从CSV中读取数据(使用 beam.io.ReadFromText ),说:
npm
我想将其转换为行之间的百分比变化,即
0 90
1 91
2 85
在 Apache Beam 管道中如何做到这一点?
祝一切顺利!
解决方法
Beam优于Pandas的主要优点是能够并行执行许多操作。并行性也在读取中发生,因此没有像熊猫那样简单的“下一个”概念。
这是为什么需要固定顺序的操作(例如,Pandas的所有滚动功能)在Beam(和其他并行ETL框架)中更难完成的主要原因。他们几乎需要将所有元素发送到同一工作人员并在那里执行操作,因此您将失去Beam的优势,而使用Pandas可能会更好。
但,因为您有一个row
字段来告诉我们订单,所以我们可以使用row
字段作为timestamps
和{{ 1}},而不会失去并行性。
由于组合器(我们对事物进行分组的方式)不是可交换/关联的,因此我们需要高级组合器。这两个答案1 2
中有关于此概念的更多信息SlidingWindows
输出为(请注意顺序可能会改变)
p = beam.Pipeline()
class RollingChange(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self,list,input):
list.append(input)
return list
def merge_accumulators(self,accumulators):
final_list = []
for list in accumulators:
final_list += list
return final_list
def extract_output(self,list_of_list):
if len(list_of_list) == 2:
first = list_of_list[0]
second = list_of_list[1]
second["change"] = second["value"] / first["value"] - 1
return second
elif len(list_of_list) == 1 and list_of_list[0]['row'] == 0:
list_of_list[0]["change"] = 0
return list_of_list[0]
else:
pass
elements = [
{"row": 0,"value": 90},{"row": 1,"value": 91},{"row": 2,"value": 85},{"row": 3,"value": 100},{"row": 4,"value": 200}
]
(p | Create(elements)
| Map(lambda x: window.TimestampedValue(x,x['row'])) # adds row as timestamp for windows
| WindowInto(window.SlidingWindows(2,1))
| beam.core.CombineGlobally(RollingChange()).without_defaults()
| beam.core.Filter(lambda x: x != None) # filters the last row (4)
| Map(print))
p.run()