获取 RDD 中每个键的最大值和最小值

问题描述

spark = SparkSession.builder.getorCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc,10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)

这是结果的一小部分:

[['0.02703300','1.30900000'],['0.02703300','0.61800000'],['0.02704600','3.90800000'],['0.02704700','4.00000000'],'7.44600000']

我想获得每个键的最大值和最小值,如何?

解决方法

您可以使用reduceByKey

minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)
,

正如@mck 所说,您可以使用 reduceByKey,但如果您从未使用过函数式编程,理解起来可能有点复杂。

该方法的作用是将函数应用于执行 groupByKey 的结果值。我们来一步一步分析。

>>> rdd.groupByKey().take(1)
[('0.02704600',<pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]

这样做我们得到一个 RDD,每个键一个条目(配对 RDD 中的第一列),并且值是可迭代的。我们可以将其视为一个列表。

我们从基础 RDD 得到

[['0.02703300','1.30900000'],['0.02703300','0.61800000'],['0.02704600','3.90800000'],['0.02704700','4.00000000'],'7.44600000']]

以一组为一组

[('0.02704600',<pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),('0.02704700',<pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>),('0.02703300',<pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]

然后我们必须做的是对值应用所需的函数。我们可以将所需的函数传递给 mapValues method(在我的例子中,我直接传递一个 lambda 函数)

>>> rdd.groupByKey().mapValues(lambda k: (max(k),min(k))).collect()
[('0.02704600',('3.90800000','3.90800000')),('7.44600000','4.00000000')),('1.30900000','0.61800000'))]

有一些注意事项:

  1. reducebyKey 更加简洁高效。虽然它可能会令人困惑
  2. 如果你想要最大值和最小值,试着像我展示的那样同时做(你也可以使用 reduceByKey 来做)。这样,您只需执行一次,而不是对数据进行两次传递。
  3. 尝试使用 DataFrame (SQL) API。它更现代,并尝试为您优化计算。
  4. reduceByKey 函数需要有点不同,因为它得到两个项目而不是一个可迭代的
>>> rdd.reduceByKey(lambda a,b: (max(a,b),min(a,b))).collect()
[('0.02704600','3.90800000'),'0.61800000'))]