如何基于pyspark中包含元组数组的rdd中的第一个元素进行过滤?

问题描述

从rdd过滤元组列表时遇到问题。

示例business.json

{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}
from pyspark import SparkContext
sc = SparkContext.getorCreate()

stars = "4.0"
input_business_lines = sc.textFile('data/business.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .map(lambda kv: (kv['business_id'],kv['stars'],kv['state'])) \
    .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0],kv[2])).collect()

上面的代码返回每个元组具有(first element = business_id,second element = state)元组列表

[('gnKjwL_1w79qoiV3IC_xQQ','NC'),('xvX2CttrVhyG2z1dFg_0xw','AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ','NC')]

到目前为止,一切都很好。 现在,我需要与评论表进行联接,并希望使用评论的rdd筛选所有匹配的业务ID。如果那是一个数据帧,那就容易多了。但是如果是元组,我不确定我们怎么做。

这是我的尝试

-注意:但是仅当business_ids是business_id而不是元组的列表时有效

示例review.json

{"review_id": "c-6aA9Bd7JxpmMroRoas9A","user_id": "bK4Y_GZUoAUTXIrmeEUGYw","business_id": "gnKjwL_1w79qoiV3IC_xQQ","stars": 4.0,"text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.","date": "2014-07-13 20:28:18"},{"review_id": "EhvpZ1-MzemK1EMBUf19gQ","user_id": "p0IderpL5zE4D021cxvxtA","business_id": "KWywu2tTEPWmR9JnBc0WyQ","stars": 5.0,"text": "I came here for my 40th .First of they offer free limo service to and from which was really cool.  When we first arrived we were greeted by the manager he walked us up to the top floor where the male review was and told us we had the choice of seats wherever we wanted.  We were also given two drink coupons per person so we ordered a drink and got seated. The show started off slowly but what I really liked is the guys came out to sit and talk to you,also they were really cool about taking pictures. By the end of the night it was so crowded but it was amazing had the best time I definitely will be back. I hope soon.","date": "2015-11-20 05:24:45"}

代码

input_review_lines = sc.textFile('data/review.json').map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines \
                    .map(lambda kv: (kv['user_id'],kv['business_id'])) \
                    .filter(lambda kv: kv[1] in business_ids).collect()
rew_ids_bus_ids

解决方法

您可以加入这些rdd

import json

stars = 4.0
input_business_lines = sc.textFile('test.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .filter(lambda kv: kv['stars'] >= stars) \
    .map(lambda kv: (kv['business_id'],kv['state']))

print(business_ids.collect())

input_review_lines = sc.textFile('test2.json') \
    .map(lambda lines: json.loads(lines))

rew_ids_bus_ids = input_review_lines \
    .map(lambda kv: (kv['business_id'],kv['user_id']))

joined = business_ids \
    .join(rew_ids_bus_ids)

print(joined.collect())


# [('gnKjwL_1w79qoiV3IC_xQQ','NC'),('xvX2CttrVhyG2z1dFg_0xw','AZ')]
# [('gnKjwL_1w79qoiV3IC_xQQ',('NC','bK4Y_GZUoAUTXIrmeEUGYw'))]
,
import json
from pyspark import SparkContext

if __name__ == '__main__':
    input_review_json_path = "publicdata/review.json"
    input_business_json_path = "publicdata/business.json"
    output_csv_path = "outputs/user_state.csv"
    stars = "4.0"

    sc = SparkContext.getOrCreate()

    input_business_lines = sc.textFile(input_business_json_path) \
                             .map(lambda lines: json.loads(lines))

    business_ids = input_business_lines \
                        .map(lambda kv: (kv['business_id'],kv['stars'],kv['state'])) \
                        .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0],kv[2]))

    input_review_lines = sc.textFile(input_review_json_path) \
                            .map(lambda lines: json.loads(lines))

    rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'],kv['user_id']))
    finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0],kv[1][0]))

    review_rdd = finalRdd.collect()