问题描述
我正在尝试使用 Python Core API(通过 Apache Spark)将这些 RDD 连接在一起;但是,我没有运气尝试完成此操作。
- users_rdd:user_id
- reviews_rdd:review_id、company_id 和 user_id
- company_rdd:company_id
现在,当将两个 RDD 连接在一起时,它可以正常工作,没有任何问题:
user_rev_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
)
虽然,为了将所有三个连接在一起,我已经尝试过这个,但由于某种原因它对我根本不起作用:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
关于如何将我的三个 RDD 连接在一起的任何帮助都会非常有帮助,因为我不确定如何正确地做这样的事情。谢谢。
解决方法
第一次加入后,键是user_id
,但是你加入到companies_rdd
,键是company_id
,所以加入键不正确。您需要将密钥更改为 company_id
,例如
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'],r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
要合并三个RDD中的元素并在join后删除join键,可以在末尾添加一个map
:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'],r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
.map(lambda r: (*r[1][0],r[1][1]))
)