Luigi 任务未将 Pandas df 写入 csv

问题描述

我有以下代码来简单地创建一个 excel 文件并仅返回所需的列。它被写成一个在 docker 上容器化的 luigi 任务,并且在创建 _SUCCESS 标志时它不返回 csv 文件

功能代码

def _save_datasets(simplified,outdir: Path,flag):
    out_clean = outdir / 'transformed.csv/'
    flag = outdir / flag
    simplified.to_csv(str(out_clean),index=False)
     # save as csv and create flag file
    flag.touch()

@click.command()
@click.option('--in-csv')
@click.option('--out-dir')
@click.option('--flag')
def transform_data(in_csv,out_dir,flag):
    out_dir = Path(out_dir)
    data=pd.read_csv(in_csv)
    req_dp = data[['description','points']]
 #simplifying the points according to range 
    def transform_points_simplified(points):
        if points < 84:
            return 1
        elif points >= 84 and points < 88:
            return 2 
        elif points >= 88 and points < 92:
            return 3 
        elif points >= 92 and points < 96:
            return 4 
        else:
            return 5
    simplified = req_dp.assign(points_simplified = dp['points'].apply(transform_points_simplified))
    _save_datasets(simplified,flag)

路易吉任务代码

#Transform
class TransformData(DockerTask):
    """Task to simplify datasets"""

    in_path = '/usr/share/data/created_csv/'
    in_csv = luigi.Parameter(default= in_path + 'cleaned.csv')
    out_dir = luigi.Parameter(default='/usr/share/data/created_csv/')
    flag = luigi.Parameter('.SUCCESS_TransformData')

    @property
    def image(self):
        return f'code-chal/transform-data:{VERSION}'

    def requires(self):
        return CleanData()

    @property
    def command(self):
        return [
            'python','clean_data.py','--in-csv',self.in_csv,'--out-dir',self.out_dir,'--flag',self.flag
        ]

    def output(self):
        return luigi.LocalTarget(
            path=str(Path(self.out_dir) / self.flag)
        )

由于 _SUCCESS 标志的创建,luigi 任务继续执行下一个任务,但下一个任务失败,因为它依赖于未创建的transformed.csv 文件

谢谢

解决方法

在您的 LuigiTask 中,您需要一个运行函数,该函数需要使用输出函数的输出目标来保存您想要的文件。

所以你需要添加:

def run(self):
    outfile = open(self.output().path,'wb') # Notice that it references to the path of the self.output function 
    transform_data(self.in_csv,outfile,self.flag)