如何计算Apache Beam中的百分比变化?即pandas.DataFrame.pct_change

问题描述

我是Apache Beam的新手,在这个看似非常简单的事情上停留了几个小时:

如何在Apache Beam中完成 pandas.DataFrame.pct_change

我正在从CSV中读取数据(使用 beam.io.ReadFromText ),说:

npm

我想将其转换为行之间的百分比变化,即

0    90 
1    91 
2    85

Apache Beam 管道中如何做到这一点?

祝一切顺利!

解决方法

Beam优于Pandas的主要优点是能够并行执行许多操作。并行性也在读取中发生,因此没有像熊猫那样简单的“下一个”概念。

这是为什么需要固定顺序的操作(例如,Pandas的所有滚动功能)在Beam(和其他并行ETL框架)中更难完成的主要原因。他们几乎需要将所有元素发送到同一工作人员并在那里执行操作,因此您将失去Beam的优势,而使用Pandas可能会更好。

,因为您有一个row字段来告诉我们订单,所以我们可以使用row字段作为timestamps和{{ 1}},而不会失去并行性。

由于组合器(我们对事物进行分组的方式)不是可交换/关联的,因此我们需要高级组合器。这两个答案1 2

中有关于此概念的更多信息
SlidingWindows

输出为(请注意顺序可能会改变)

p = beam.Pipeline()

class RollingChange(beam.CombineFn):
    def create_accumulator(self):
        return []

    def add_input(self,list,input):
        list.append(input)
        return list

    def merge_accumulators(self,accumulators):
        final_list = []
        for list in accumulators:
            final_list += list
        return final_list

    def extract_output(self,list_of_list):
        if len(list_of_list) == 2:
            first = list_of_list[0]
            second = list_of_list[1]
            second["change"] = second["value"] / first["value"] - 1
            return second
        elif len(list_of_list) == 1 and list_of_list[0]['row'] == 0:
            list_of_list[0]["change"] = 0
            return list_of_list[0]
        else:
            pass

elements = [
    {"row": 0,"value": 90},{"row": 1,"value": 91},{"row": 2,"value": 85},{"row": 3,"value": 100},{"row": 4,"value": 200}
]

(p | Create(elements)
 | Map(lambda x: window.TimestampedValue(x,x['row'])) # adds row as timestamp for windows
 | WindowInto(window.SlidingWindows(2,1))
 | beam.core.CombineGlobally(RollingChange()).without_defaults()
 | beam.core.Filter(lambda x: x != None) # filters the last row (4)
 | Map(print))

p.run()