Pyspark 和 Graphframes:聚合消息的力量平均值

问题描述

给定下图:

example graph

其中 A 的值为 20B 的值为 5C 的值为 10,我想使用 pyspark/graphframes 来计算 power mean。也就是说,

power mean

在这种情况下,n 是项目的数量(在我们的情况下为 3,对于 A 处的三个顶点 - 包括 A) ,我们的p被认为是n * 2,归一化因子是1/n,或1/3。因此A的结果值应该是:

n = 3
norm_factor = 1/n
p = n * 2
result = (norm_factor * (20^p + 5^p + 10^p))^(1/p) = 16.697421658890875

所以问题是,我如何用 pyspark/graphframes 计算这个?我有以下图表:

spark = SparkSession.builder.appName('get-the-power').getOrCreate()
vertices = spark.createDataFrame([('1','A',20),('2','B',5),('3','C',10)],['id','name','value'])

edges = spark.createDataFrame([('1','2'),('1','3')],['src','dst'])

g = GraphFrame(vertices,edges)

我假设我需要聚合来自孩子的值,并且一直在玩消息聚合。

agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("totalValue"),sendToSrc=AM.dst['value'],sendToDst=AM.dst['value'])

agg.show()

结果是

+---+----------+
| id|totalValue|
+---+----------+
|  3|        10|
|  1|        15|
|  2|         5|
+---+----------+

我如何将totalValuesqlsum)替换为power mean?肯定有一种方法可以使用 pyspark 的 Spark 函数来做到这一点?

--- 更新---

似乎我可以用 UDF 来近似这个。

def power_mean(values):
    n = len(values)
    norm_factor = 1/n
    p = n * 2
    return (norm_factor * sum([(x)**p for x in values]))**(1/p)

udf_power_mean = func.udf(power_mean,returnType=DoubleType())

# Aggregate the values from child vertices,as I was doing before.
agg = g.aggregateMessages(
    collect_list(AM.msg).alias("totalValue"),sendToDst=None)

# `concat` the value for this vertex with its children values. 
# We end up with an `array<int>` that we then pass to `udf_power_mean`.
new_vertices = agg.join(vertices,vertices.id == agg.id,"left")\
                .select(vertices.id,\
                        'name',\
                        'value',\
                        concat(array(col('value')),'totalValue').alias("allValues"))\
                .withColumn('totalScore',udf_power_mean(col('allValues')))\
                .drop('allValues')

new_vertices.show()

这会产生:

+---+----+-----+------------------+
| id|name|value|        totalScore|
+---+----+-----+------------------+
|  1| foo|   20|16.697421658890875|
+---+----+-----+------------------+

无论如何要在没有 UDF 的情况下做到这一点?只是简单的火花函数?

解决方法

对于 Spark 2.4+,您可以使用 aggregate 函数:

一个简单的版本:

power_mean = lambda col: func.expr(f"""
    aggregate(`{col}`,0D,(acc,x) -> acc+power(x,2*size(`{col}`)),acc -> power(acc/size(`{col}`),0.5/size(`{col}`)))
""")

上述解决方案的一个问题是,如果任何数组元素为NULL,则结果totalScore将为NULL,为避免这种情况,您可以执行以下操作: >

power_mean = lambda col: func.expr(f"""
    aggregate(
      /* expr: array column to iterate through */
      `{col}`,/* start: set zero value and the accumulator as an struct<psum:double,n:int> */
      (0D as psum,size(filter(`{col}`,x -> x is not null)) as n),/* merge: calculate `sum([(x)**p for x in values])` */
      (acc,x) -> (acc.psum+power(coalesce(x,0),2*acc.n) as psum,acc.n as n),/* finish: post processing */
      acc -> power(acc.psum/acc.n,0.5/acc.n)
    ) 
""")

取决于你想如何设置n,上面会跳过n中的空值, 如果你想计算它们,只需改变第二个参数:

(0D as psum,

(0D as psum,size(`{col}`) as n),

示例:

df = spark.createDataFrame([([20,5,None,10],)],['value'])
df.select("value",power_mean("value").alias('totalScore')).show(truncate=False)
+------------+------------------+
|value       |totalScore        |
+------------+------------------+
|[20,10]|16.697421658984894|
+------------+------------------+

顺便说一句。如果您想 concat() totalScore 即使是其他列的 NULL 值,只需使用 coalesce() 函数,或 concat_ws()(如果可能)。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...