问题描述
我有一个打包的 DAG,我将模块存储在子文件夹中。 我正在使用 PythonVirtualenvOperator 并希望从虚拟环境访问此模块。
文件夹系统-
dags/
packaged_dag.zip/
dag.py
package/
my_module.py
__init__.py
dag.py
from airflow.operators.python_operator import PythonVirtualenvOperator
def my_function(**kwargs):
from package import my_module
with models.DAG(
default_args=default_dag_args) as dag:
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",python_callable=my_function,system_site_packages=True,dag=dag,)
为此,我将找不到包模块。 如果我将导入移动到主 dag 文件(如 PythonVirtualenvOperator) - 它会工作正常,但我想要来自 virtualenv 的文件。
解决方法
这对我有用:
dags/
folder/
my_module.py
main.py
main.py
default_args = {
"owner": "airflow"}
dag = DAG(
dag_id='example',default_args=default_args,schedule_interval=None,start_date=days_ago(2),)
def my_module_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from dags.folder.my_module import main as my_module
my_module()
check_database = PythonVirtualenvOperator(
task_id="my_module",python_callable=my_module_virtualenv,requirements=["psycopg2-binary==2.8.6"],system_site_packages=True,dag=dag,)
这不是标准做法,因为没有 __init__.py
但它有效。当我尝试__init__.py
docker-compose up airflow-init
给我带来了麻烦