如何连接由循环逻辑产生的两个pyspark数据帧?

问题描述

我曾在堆栈溢出中查看过类似的问题和答案,但问题的循环方面带来了我无法弄清的挑战。

我试图将一些代码输出连接起来,这些代码在下面的spark数据框中循环遍历不同的产品,并选择特定的产品,例如12个月和2个月的订阅

我有这个spark数据框(df)

+--------------+-------------------+------------------------+
|user_id       |purchase_date_all  |product                 |
+--------------+-------------------+------------------------+
|226575        |2018-04-04 17:41:23|12 months of global news|
|227729        |2018-04-19 16:50:09|2  months of global news|
|228544        |2018-04-28 17:01:16|18 months of global news|
|231795        |2018-06-11 20:27:48|36 months of global news|
|234206        |2018-07-19 00:52:10|12 months of global news|
|234607        |2018-07-23 20:41:47|12 months of global news|
|235133        |2018-07-30 02:34:58|12 months of global news|
|237883        |2018-08-07 18:52:53|1 months of global news | 
|237924        |2018-08-08 01:31:13|6 months of global news |
|238892        |2018-08-14 02:45:51|9 months of global news |
|242200        |2018-08-19 21:22:05|3 months of global news |
|242413        |2018-08-21 06:26:57|13 months of global news|
|249034        |2018-10-11 15:01:06|16 months of global news|
|254415        |2018-12-28 12:13:18|16 months of global news|
|256866        |2019-02-02 16:34:41|36 months of global news|
|257317        |2019-02-09 18:49:12|11 months of global news|
+--------------+-------------------+------------------------+

例如,这是我选择“ 12个月的全球新闻”产品的功能

def renewals(df,n):
    prod_type = str(n)+' months of firecracker md'
    df_first_xmo = df[df['product']== prod_type]
    return df_first_xmo.show()

如果我运行该函数,则会得到所需的结果:

renewals(df,12)
+--------------+-------------------+------------------------+
|user_id       |purchase_date_all  |product                 |
+--------------+-------------------+------------------------+
|226575        |2018-04-04 17:41:23|12 months of global news|
|234206        |2018-07-19 00:52:10|12 months of global news|
|234607        |2018-07-23 20:41:47|12 months of global news|
|235133        |2018-07-30 02:34:58|12 months of global news|
+--------------+-------------------+------------------------+

如果我循环浏览多个月(就像在熊猫中那样),它将产生两个未命名的数据帧:

temp=[]
month = [12,2]
for x in month:
    temp.append(renewals(all_direct_renew,x))
+--------------+-------------------+------------------------+
|user_id       |purchase_date_all  |product                 |
+--------------+-------------------+------------------------+
|226575        |2018-04-04 17:41:23|12 months of global news|
|234206        |2018-07-19 00:52:10|12 months of global news|
|234607        |2018-07-23 20:41:47|12 months of global news|
|235133        |2018-07-30 02:34:58|12 months of global news|
+--------------+-------------------+------------------------+
+--------------+-------------------+------------------------+
|user_id       |purchase_date_all  |product                 |
+--------------+-------------------+------------------------+
|227729        |2018-04-19 16:50:09|2  months of global news|
+--------------+-------------------+------------------------+

我试图弄清楚如何将上述输出连接到一个spark数据帧中。在大熊猫中,我可以通过运行以下命令来做到这一点:

foo = pd.concat(temp)
foo
user_id     purch_date_all          product 
226575      2018-04-04 17:41:23     12 months of global news
234206      2018-07-19 00:52:10     12 months of global news
234607      2018-07-23 20:41:47     12 months of global news
235133      2018-07-30 02:34:58     12 months of global news
227729      2018-04-19 16:50:09     2 months of global news

如何更改循环逻辑以将输出连接到一个数据帧(例如我对熊猫所做的操作)并显示出来?

解决方法

如果列名相同,则可以使用 union

df = spark.createDataFrame(
    [
        (226575,'2018-04-04 17:41:23','12 months of global news'),(234206,'2018-07-19 00:52:10','12 months of global news' ),(234607,'2018-07-23 20:41:47',(235133,'2018-07-30 02:34:58','12 months of global news')
       
    ],['user_id','purchase_date_all','product' ]
)
df.show(truncate=False)

df2 = spark.createDataFrame(
    [
        (227729,'2018-04-19 16:50:09','2 months of global news')
       
    ],'product' ]
)
df2.show()

/*
+-------+-------------------+------------------------+
|user_id|purchase_date_all  |product                 |
+-------+-------------------+------------------------+
|226575 |2018-04-04 17:41:23|12 months of global news|
|234206 |2018-07-19 00:52:10|12 months of global news|
|234607 |2018-07-23 20:41:47|12 months of global news|
|235133 |2018-07-30 02:34:58|12 months of global news|
+-------+-------------------+------------------------+

+-------+-------------------+-----------------------+
|user_id|purchase_date_all  |product                |
+-------+-------------------+-----------------------+
|227729 |2018-04-19 16:50:09|2 months of global news|
+-------+-------------------+-----------------------+
*/

df.union(df2).show()

/*
+-------+-------------------+------------------------+
|user_id|purchase_date_all  |product                 |
+-------+-------------------+------------------------+
|226575 |2018-04-04 17:41:23|12 months of global news|
|234206 |2018-07-19 00:52:10|12 months of global news|
|234607 |2018-07-23 20:41:47|12 months of global news|
|235133 |2018-07-30 02:34:58|12 months of global news|
|227729 |2018-04-19 16:50:09|2 months of global news |
+-------+-------------------+------------------------+
*/