问题描述
我是 pyspark RDD 的新手,并且有一个从 JSON 文件中获取的数据帧:
Row(created_at='2021-05-05 23:37:51',hash_tags=None,id=1390088382659895296,replyto_id=None,replyto_user_id=None,retweet_id=13900297910tweet_id=13900297910tweet_id=1390027950tweet_tweet_time=1390088382659895296 :突发新闻:拜登政府将支持取消对 Covid-19 疫苗的专利保护,这是全球的突破……',user_id=17799542,user_mentions=[Row(id=807095,index=[3,11])])>
这是我的全部代码:
spark = SparkSession \
.builder \
.appName("Python Spark sql basic example") \
.config("spark.some.config.option","some-value") \
.getorCreate()
data_rdd = spark.read.option("multiline","true")\
.json("tweets.json")
print(data_rdd.collect()[0])
def extractColumns(record):
return (record[8],[record[4],record[6]])
ddata_frame = data_rdd.rdd.map(extractColumns)\
.groupByKey()\
.map(lambda r: (r[0],list(r[1])))
我以如下形式获取了 RDD 数据: [(17799542,[[无,807095]]),
...
(3094649957,[[None,3094649957],[None,None],None]])]
如何消除值中的 None 以实现以下目标: [(17799542,[807095]),[3094649957,3094649957,3094649957])]
我在下面尝试过但没有工作:
def eliminateNone(record):
s = list(filter(lambda s: each != None for each in s))
return (record[0],s)
data_frame.mapValues(eliminateNone)
print(data_frame.collect())
感谢您的帮助。
解决方法
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option","some-value") \
.getOrCreate()
data_rdd = spark.read.option("multiline","true")\
.json("tweets.json")
print(data_rdd.collect()[0])
def extractColumns(record):
return (record[8],[record[3],record[5]])
def merge_values(data):
result = []
for l in data:
for x in l:
if x != None:
result.append(x)
return result
data_frame = data_rdd.rdd.map(extractColumns)\
.groupByKey()\
.map(lambda r: (r[0],list(r[1])))
data_frame = data_frame.mapValues(merge_values)
print(data_frame.collect())
你可以试试这个。