气流docker命令通过xCom进行通信

问题描述

我有一个Airflow docker容器和另外两个容器(dc1和dc2)。我正在尝试在task1的dc1中执行命令(通过DockerOperator),并将其输出用于task2的dc2命令中。

我有一个可行的解决方案,但不幸的是,它并不可靠:(

我正在读取dc1日志,并且在99%的日志中都能正常工作

command1.py

# a simple version of the real script 
print({'date': '2020-05-03'})

airflow / dags / dag1.py

# a wrapper class 
class DOperator(DockerOperator):
    def __init__(self,task_id,command,dag,*args,**kwargs):
        super().__init__(
            image='docker_image:latest',task_id=task_id,command=command,api_version='auto',auto_remove=True,docker_url='unix://var/run/docker.sock',network_mode='bridge',tty=True,xcom_push=True,dag=dag,**kwargs
        )

    def execute(self,context):
        # get the last log line from docker stdout
        docker_log = super().execute(context)

        # push XComs from the json
        if docker_log:
            try:
                result = json.loads(docker_log)

                for key in result.keys():
                    context['ti'].xcom_push(key=key,value=result[key])
            except:
                pass

        return docker_log

# Dcocker container 1
task1 = DOperator(
    dag=dag,task_id='task1',command='python comand1.py',# its output is '2020-05-03'
)

# Dcocker container 2
task2 = DOperator(
    task_id='task2',command='python comand2.py --date={}'.format(
        "{{{{ task_instance.xcom_pull(dag_id='{}',task_ids='{}',key='{}') }}}}".format(
            dag.dag_id,task1.task_id,'date'
        )
)

task1 >> task2

dc1日志

[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - {"date": "2020-05-03"}

但没有时为1%

在那种情况下,dc1日志中包含一行多余的空行,我无法正确提取输出

dc1日志

[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - {"date": "2020-05-03"}
[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - 

所以我的问题是:

  • 您知道如何解决此问题
  • 还是您知道两个docker操作员之间进行通信的更好方法?

解决方法

您可以在实例化Docker Operator时尝试添加xcom_all=True。来自airflow DockerOperator docs

xcom_push(布尔)–是否使用XCom将stdout推送到下一步。默认值为False。

xcom_all(布尔)–推所有标准输出或仅推最后一行。默认值为False(最后一行)。

xcom_all设置为True会将所有日志行推送到XCOM,然后您可以在下游任务中解析该日志以检索所需的信息。这样,您就不必依赖位于日志末尾的数据。

,

以下内容适用于您的工作。我的解决方案使用XCOM流量在容器之间传递数据。它确实需要在dockerized任务之间执行任务,以便可以提取XCOM。以下是示例示例,我用它来演示其他人如何将XCOM通信传入和传出容器化的Airflow Task。我知道它适用于气流1.10.6。我刚刚升级到1.10.12,但由于XCOM_JSON没有传递到容器而遇到了一些问题。希望这会有所帮助..如果您有任何疑问,请提问,我会尽力阐述。

使用Docker的DAG示例

example_docker_dag.py

import time
import logging
import datetime
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG,Variable,XCom
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
import ast


logger = logging.getLogger(__name__)

args = {
        'start_date': days_ago(1),'ownder': 'mjcamp'

        }

dag = DAG(
        dag_id ='example-docker-dag',default_args=args,max_active_runs=6,schedule_interval='@daily'
        )

def make_xcom_traffic(**context):
    print("some task that does things..")
    time.sleep(2)
    data = dict(test="Pass some data to the next task or tasks.",number=545)
    Variable.set("Example-XCom",data)
    return data # conventional way of doing XCOM ... don't have to really return anything but for uniforimity sake


def read_xcom(**context):
    print("Read XCOM from DockerOperator.. and maybe do things..")
    time.sleep(1)
    # Grab xcom from previous task(dockerized task)..
    data = context['task_instance'].xcom_pull(task_ids=context['task'].upstream_task_ids,key='return_value')
    # Airflow seems to be broken and will return all 'stdoout' as the XCOM traffic so we have to parse it or
    # write our code to only `print` the serialized thing we want.. in this case we are just printing a directionary.
    # if you do something like this and have string data in your return value.. either don't use
    # new lines for your stirngs or choose a different way to break things..

    xcom_data = data[0].split('\n')[-1]
    print("THIS IS WHAT I WANT:",xcom_data)
    xcom_data = ast.literal_eval(xcom_data)
    # Showing we have a python dictionary now...
    print("Int =>",xcom_data["Int"])
    print("Str =>",xcom_data["Str"])
    print("Float =>",xcom_data["Float"])


t1 = PythonOperator(
        task_id='make-xcom-traffic',provide_context=True,python_callable=make_xcom_traffic,dag=dag)

t2 = DockerOperator(
        environment={
            "Example-XCom" : Variable.get("Example-XCom"),},task_id="docker-text",image="example-docker-task",auto_remove=True,xcom_push=True,xcom_all=False,# <<<=== things are broken in Airflow and this doesn't do what you expect.. it does nothing
        docker_url='unix:///var/run/docker.sock',# docker_conn_id='containeryard',api_version='auto',dag=dag
        )

t3 = PythonOperator(
        task_id='var-test',python_callable=read_xcom,dag=dag
        )

t1 >> t2 >> t3

容器化任务

Dockerfile

FROM python:3.6

ENV EXAMPLE_XCOM=not-set

COPY ./requirements.txt /tmp/requirements.txt

RUN pip install -r /tmp/requirements.txt

COPY ./code.py /tmp/code.py

CMD ["/usr/local/bin/python","/tmp/code.py"]

requirments.txt

pandas==0.24.2
numpy==1.17.1

code.py

import ast
import pandas as pd
import numpy as np
import os
from datetime import datetime
import json
import glob


xcom_json = os.getenv('XCOM_JSON')

print('Hello,from Airflow-Worker!')
print('Time on deck is...',datetime.now())
# Jst to show what things get passed in as environment variables
print('My ENV is >>',os.environ.keys())
# Data being passed in as XCom like variable
print("Received EXAMPLE XCOM TRAFFIC >>",os.getenv("Example-XCom"))
xcom_data = ast.literal_eval(os.getenv("Example-XCom"))

print("The Number =>",xcom_data["number"])
intvar = np.random.randint(0,100)
strvar = "test_string ===>>" + xcom_data["test"]
floatvar = np.random.rand() * xcom_data["number"]

# make a dictionary / json like object to return in `stdout`
result = dict(Int=intvar,Str=strvar,Float=floatvar)

# return the results as stdout for airflow
print(result)

气流变量

Key: Example-XCom
Value: {'test': 'Pass some data to the next task or tasks.','number': 545}

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...