问题描述
我有以下代码来简单地创建一个 excel 文件并仅返回所需的列。它被写成一个在 docker 上容器化的 luigi 任务,并且在创建 _SUCCESS 标志时它不返回 csv 文件。
def _save_datasets(simplified,outdir: Path,flag):
out_clean = outdir / 'transformed.csv/'
flag = outdir / flag
simplified.to_csv(str(out_clean),index=False)
# save as csv and create flag file
flag.touch()
@click.command()
@click.option('--in-csv')
@click.option('--out-dir')
@click.option('--flag')
def transform_data(in_csv,out_dir,flag):
out_dir = Path(out_dir)
data=pd.read_csv(in_csv)
req_dp = data[['description','points']]
#simplifying the points according to range
def transform_points_simplified(points):
if points < 84:
return 1
elif points >= 84 and points < 88:
return 2
elif points >= 88 and points < 92:
return 3
elif points >= 92 and points < 96:
return 4
else:
return 5
simplified = req_dp.assign(points_simplified = dp['points'].apply(transform_points_simplified))
_save_datasets(simplified,flag)
路易吉任务代码:
#Transform
class TransformData(DockerTask):
"""Task to simplify datasets"""
in_path = '/usr/share/data/created_csv/'
in_csv = luigi.Parameter(default= in_path + 'cleaned.csv')
out_dir = luigi.Parameter(default='/usr/share/data/created_csv/')
flag = luigi.Parameter('.SUCCESS_TransformData')
@property
def image(self):
return f'code-chal/transform-data:{VERSION}'
def requires(self):
return CleanData()
@property
def command(self):
return [
'python','clean_data.py','--in-csv',self.in_csv,'--out-dir',self.out_dir,'--flag',self.flag
]
def output(self):
return luigi.LocalTarget(
path=str(Path(self.out_dir) / self.flag)
)
由于 _SUCCESS 标志的创建,luigi 任务继续执行下一个任务,但下一个任务失败,因为它依赖于未创建的transformed.csv 文件。
谢谢
解决方法
在您的 LuigiTask 中,您需要一个运行函数,该函数需要使用输出函数的输出目标来保存您想要的文件。
所以你需要添加:
def run(self):
outfile = open(self.output().path,'wb') # Notice that it references to the path of the self.output function
transform_data(self.in_csv,outfile,self.flag)