问题描述
我有一个名为 from django.core.management.commands import migrate
class Command(migrate.Command):
def handle(self,*args,**options):
super(Command,self).handle(*args,**options)
# this is equal to python manage.py migrate logs --database=logs
# This will execute only the logs migrations in the logs database
options['app_label'] = options['database'] ='logs'
super(Command,**options)
的数据框,并且在该数据框的 Incito
列中由逗号分隔值组成。我需要通过使用 pyspark 适当地重复这些逗号分隔值来重新创建数据框。我为此使用了以下 python 代码。我可以将其转换为 pyspark 吗?是否可以通过 pyspark?
supplier Inv No
这是我在 pyspark 中尝试过的。但我没有得到预期的结果。
from itertools import chain
def chainer(s):
return list(chain.from_iterable(s.str.split(',')))
incito['supplier Inv No'] = incito['supplier Inv No'].astype(str)
# calculate lengths of splits
lens = incito['supplier Inv No'].str.split(',').map(len)
# create new dataframe,repeating or chaining as appropriate
dfnew = pd.DataFrame({'supplier Inv No': chainer(incito['supplier Inv No']),'Forwarder': np.repeat(incito['Forwarder'],lens),'Mode': np.repeat(incito['Mode'],'File No': np.repeat(incito['File No'],'ETD': np.repeat(incito['ETD'],'Flight No': np.repeat(incito['Flight No'],'Shipped Country': np.repeat(incito['Shipped Country'],'Port': np.repeat(incito['Port'],'Delivered_Country': np.repeat(incito['Delivered_Country'],'AirWeight': np.repeat(incito['AirWeight'],'FREIGHT CHARGE': np.repeat(incito['FREIGHT CHARGE'],lens)})
解决方法
像这样的东西,使用 repeat
?
from pyspark.sql import functions as F
df = (spark
.sparkContext
.parallelize([
('ABCD',),('EFGH',])
.toDF(['col_a'])
)
(df
.withColumn('col_b',F.repeat(F.col('col_a'),2))
.withColumn('col_c',F.repeat(F.lit('X'),10))
.show()
)
# +-----+--------+----------+
# |col_a| col_b| col_c|
# +-----+--------+----------+
# | ABCD|ABCDABCD|XXXXXXXXXX|
# | EFGH|EFGHEFGH|XXXXXXXXXX|
# +-----+--------+----------+