如何在pyspark中按列值而不是按行测试/训练拆分

问题描述

我想为机器学习生成训练和测试集。假设我有一个包含以下列的数据框:

account_id | session_id | feature_1 | feature_2 | label

在此数据集中,每行将具有唯一的session_id,但是account_id可以显示多次。但是,我希望我的训练和测试集具有互斥的account_id。 (几乎似乎是分层抽样的对立面。)

在熊猫中,这很简单。我有类似以下内容

def train_test_split(df,split_col,feature_cols,label_col,test_fraction=0.2):
    """
    While sklearn train_test_split splits by each row in the dataset,this function will split by a specific column. In that way,we can 
    separate account_id such that train and test sets will have mutually
    exclusive accounts,to minimize cross-talk between train and test sets.
    """
    split_values = df[split_col].drop_duplicates()
    test_values = split_values.sample(frac=test_fraction,random_state=42)
    
    df_test = df[df[split_col].isin(test_values)]
    df_train = df[~df[split_col].isin(test_values)]

    return df_test,df_train

现在,我的数据集已经足够大,无法容纳到内存中,我必须从熊猫切换到在pyspark中进行所有这些操作。如何在pyspark中拆分火车和测试集以具有互斥的account_id,而又不将所有内容都放入内存中?

解决方法

您可以使用rand()中的pyspark.sql.functions函数为每个不同 account_id生成一个随机数,并创建train和{ {1}}个基于此随机数的数据帧。

test

由于一次from psypark.sql import functions as F TEST_FRACTION = 0.2 train_test_split = (df.select("account_id") .distinct() # removing duplicate account_ids .withColumn("rand_val",F.rand()) .withColumn("data_type",F.when(F.col("rand_val") < TEST_FRACTION,"test") .otherwise("train"))) train_df = (train_test_split.filter(F.col("data_type") == "train") .join(df,on="account_id")) # inner join removes all rows other than train test_df = (train_test_split.filter(F.col("data_type") == "test") .join(df,on="account_id")) 不能同时是account_idtrain,因此testtrain_df将具有互斥的test_df