问题描述
spark = SparkSession.builder.getorCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc,10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)
这是结果的一小部分:
[['0.02703300','1.30900000'],['0.02703300','0.61800000'],['0.02704600','3.90800000'],['0.02704700','4.00000000'],'7.44600000']
我想获得每个键的最大值和最小值,如何?
解决方法
您可以使用reduceByKey
:
minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)
,
正如@mck 所说,您可以使用 reduceByKey,但如果您从未使用过函数式编程,理解起来可能有点复杂。
该方法的作用是将函数应用于执行 groupByKey
的结果值。我们来一步一步分析。
>>> rdd.groupByKey().take(1)
[('0.02704600',<pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]
这样做我们得到一个 RDD,每个键一个条目(配对 RDD 中的第一列),并且值是可迭代的。我们可以将其视为一个列表。
我们从基础 RDD 得到
[['0.02703300','1.30900000'],['0.02703300','0.61800000'],['0.02704600','3.90800000'],['0.02704700','4.00000000'],'7.44600000']]
以一组为一组
[('0.02704600',<pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),('0.02704700',<pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>),('0.02703300',<pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]
然后我们必须做的是对值应用所需的函数。我们可以将所需的函数传递给 mapValues
method(在我的例子中,我直接传递一个 lambda 函数)
>>> rdd.groupByKey().mapValues(lambda k: (max(k),min(k))).collect()
[('0.02704600',('3.90800000','3.90800000')),('7.44600000','4.00000000')),('1.30900000','0.61800000'))]
有一些注意事项:
-
reducebyKey
更加简洁高效。虽然它可能会令人困惑 - 如果你想要最大值和最小值,试着像我展示的那样同时做(你也可以使用 reduceByKey 来做)。这样,您只需执行一次,而不是对数据进行两次传递。
- 尝试使用 DataFrame (SQL) API。它更现代,并尝试为您优化计算。
-
reduceByKey
函数需要有点不同,因为它得到两个项目而不是一个可迭代的
>>> rdd.reduceByKey(lambda a,b: (max(a,b),min(a,b))).collect()
[('0.02704600','3.90800000'),'0.61800000'))]