如何在Spark Streaming中添加具有相同键列值的2行?

问题描述

我的hdfs中有这些数据流:

nyu,-0.0,1.36,0.64,1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.494,1.506,0.0,-1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
cynthia,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,1.322,0.6779999999999999,1.8496000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
haha,0.921,1.079,1.5928,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.16499999999999998,1.419,0.417,1.6442999999999999,-0.076,1.608,0.317,1.334,-0.142,1.363,0.497,1.8187,15131c3f-7ad9-489d-bbc4-0f99305b7db0
american,-0.028,1.888,0.084,0.8658,15131c3f-7ad9-489d-bbc4-0f99305b7db0
middleburi,-0.148,1.6880000000000002,0.164,0.5698000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
metro,-0.096,1.654,0.249,1.3209,15131c3f-7ad9-489d-bbc4-0f99305b7db0
simon,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
korea,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anthoni,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anderson,1.7349999999999999,0.217,1.6118999999999999,-0.20700000000000002,1.6949999999999998,0.3662000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
america,1.338,0.614,1.679,15131c3f-7ad9-489d-bbc4-0f99305b7db0

我想对单词(第一列)和文档编号(最后一列)相同的分数求和。

到目前为止,我有以下代码

from pyspark.streaming import StreamingContext
import time
import pprint
from pyspark.sql import functions as F
ssc = StreamingContext(sc,60)
lines = ssc.textFileStream("hdfs://thesis:9000/user/kush/data/")
data = lines.map(lambda x: x.split(','))
// trying to do the task here
m_Data = data.reduceByKey(lambda x,y: (x[1] + y[1],x[2] + y[2],x[3]+y[3],x[4]+y[4]))
m_Data.pprint()
ssc.start()
time.sleep(5)

在pyspark流中这怎么可能?

解决方法

要使用reduce by key,您实际上需要帮助激发确定钥匙。我创建了一个名为 pair 的键/值rdd。

键由单词和文档编号确定。 值是与得分相对应的数据结构。分数也被强制浮动(或根据数据集您想要的其他值)进行计算。

data = lines.map(lambda x: x.split(','))
pair = data.map(lambda x: ( (x[0],x[5]),(float(x[1]),float(x[2]),float(x[3]),float(x[4])) ))
aggregation = pair.reduceByKey(lambda x,y: ( x[0]+y[0],x[1]+y[1],x[2]+y[2],x[3]+y[3] ))
aggregation.pprint(20)

样本输出:

(('haha','15131c3f-7ad9-489d-bbc4-0f99305b7db0'),(-0.0,0.921,1.079,1.5928))
(('american',(-0.028,1.888,0.084,0.8658))
(('madrid',(-0.617,4.968999999999999,0.41400000000000003,0.8935000000000002))
(('middleburi',(-0.148,1.6880000000000002,0.164,0.5698000000000001))
(('cynthia',(-0.085,1.4300000000000002,0.485,1.6916))
(('metro',(-0.096,1.654,0.249,1.3209))
(('korea',(-0.047,1.797,0.155,1.2171))
(('anthoni',1.855,0.097,0.9211))
(('anderson',0.9211))
(('spain',(-0.6549999999999999,9.806,1.538,7.8753))
(('nyu',1.36,0.64,1.3616))
(('rachel',2.7520000000000002,1.1629999999999998,3.5412))
(('simon',1.2171))
(('america',1.338,0.614,1.679))

单独说明,一次 t k (单词/文档)出现,然后在30分钟后又输入第二个条目的情况将不在此范围内