Luigi/SQLite:初始加载后如何更新数据库?

问题描述

我正在使用以下代码通过 luigi 将数据加载到 sqlite 数据库中:

class LoadData(luigi.Task):   
    
    def requires(self):
        return TransformData()
        
    def run(self):
        with sqlite3.connect('database.db') as db:
            cursor = db.cursor()
            cursor.execute("INSERT INTO prod SELECT * FROM staging;")
    def output(self):
        return luigi.LocalTarget('database.db')

这可行,但是当我想更新或插入新数据时,任务不会执行,因为 luigi 认为它已完成(database.db 已存在)

也许我没有理解LocalTarget的好用处。解决这个问题的正确方法是什么?

///EDIT:我的问题适用于 this page 上给出的示例(le_create_db.py代码)。您如何解决该示例中的更新和插入问题?

///EDIT: This question 关于附加到文件是类似的,但使用标记文件解决方案不起作用,因为 sqla 需要 sqlAlchemyTarget 输出。还有其他答案吗,特别是关于附加到数据库的答案?

解决方法

考虑使用模拟文件: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html

在每次执行中,您将创建一个新文件。

另一种解决方案可能是使用在数据库内创建标记表的策略,例如:https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres.PostgresTarget