将模块导入到气流上的虚拟环境

问题描述

我有一个打包的 DAG,我将模块存储在子文件夹中。 我正在使用 PythonVirtualenvOperator 并希望从虚拟环境访问此模块。

文件夹系统-

dags/
    packaged_dag.zip/
        dag.py
        package/
            my_module.py
            __init__.py

dag.py

from airflow.operators.python_operator import PythonVirtualenvOperator

def my_function(**kwargs):
    from package import my_module


with models.DAG(
default_args=default_dag_args) as dag:
virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",python_callable=my_function,system_site_packages=True,dag=dag,)

为此,我将找不到包模块。 如果我将导入移动到主 dag 文件(如 PythonVirtualenvOperator) - 它会工作正常,但我想要来自 virtualenv 的文件

解决方法

这对我有用:

dags/
  folder/
    my_module.py
  main.py

main.py

default_args = {
    "owner": "airflow"}

dag = DAG(
    dag_id='example',default_args=default_args,schedule_interval=None,start_date=days_ago(2),)

def my_module_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from dags.folder.my_module import main as my_module
    my_module()

check_database = PythonVirtualenvOperator(
    task_id="my_module",python_callable=my_module_virtualenv,requirements=["psycopg2-binary==2.8.6"],system_site_packages=True,dag=dag,)

这不是标准做法,因为没有 __init__.py 但它有效。当我尝试__init__.py

时,docker-compose up airflow-init给我带来了麻烦