Airflow DockerOperator无法找到某些图像,但可以找到其他图像

问题描述

尝试在Airflow中使用Docker运算符时出现以下错误。气流设置对我不可见(它由另一个团队在我无法访问的计算机上运行,​​并且负责的团队没有响应)。我从自己编写的docker文件创建了docker映像。 cmprod名称是指docker映像。

ImageNotFound: 404 Client Error: Not Found ("pull access denied for cmprod,repository does not exist or may require 'docker login': denied: requested access to the resource is denied")

我不熟悉docker login的使用,并且我不确定它是否适用于这种情况,因为我能够运行某些映像,而不能运行其他映像。 起初,我虽然输入了不正确的docker映像名称,但是我检查并再次检查。以下是docker images的输出。我能够通过气流成功运行图像。

REPOSITORY               TAG                 IMAGE ID            CREATED             SIZE
cm_prod                  latest              08f408557eb7        15 hours ago        2.12GB
cmprod                   latest              08f408557eb7        15 hours ago        2.12GB
<none>                   <none>              4af8c991ea19        15 hours ago        730MB
<none>                   <none>              9da4759a3316        15 hours ago        64.2MB
condatest                latest              e24563f9bb48        5 days ago          2.12GB

我以为我可能错误地使用了docker运算符,但是我可以运行其他图像。我认为可能存在一个气流配置问题,其中不允许某些操作系统或不允许使用某些权限运行,但是我无法找到有关是否可行的任何文档。

我的测试未显示上述任何因素,无法确定使用docker操作员通过气流是否可以找到docker镜像的100%。这个问题似乎不适合反复试验。关于可能发生的事情的任何建议将不胜感激。

我能够在浏览器中看到气流UI并触发dags,并且有一个共享目录可以在其中转储我的dag规范脚本。气流是Version:1.10.3。

码头工人的版本信息遵循docker version

Client: Docker Engine - Community
 Version:           19.03.6
 API version:       1.40
 Go version:        go1.12.16
 Git commit:        369ce74a3c
 Built:             Thu Feb 13 01:29:29 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.6
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.16
  Git commit:       369ce74a3c
  Built:            Thu Feb 13 01:28:07 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.10
  GitCommit:        b34a5c8af56e510852c35414db4c1f4fa6172339
 runc:
  Version:          1.0.0-rc8+dev
  GitCommit:        3e425f80a8c931f88e6d94a8c831b9d5aa481657
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

编辑: 请求了气流DAG代码。我犹豫要发布整件事,因为我从一个离开的团队成员那里继承了一些代码,我觉得最好将dag中的某些代码实现为单独的脚本。以下是最相关的代码块。让我知道是否缺少任何东西。为了清楚起见,在这些块之间有一个部分,但如果似乎没有任何作用,则可以包括在内。

代码块1:导入依赖项

from functools import reduce
import os,os.path
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.helpers import chain

代码块2:DAG和操作符实例化

# create SQL operators
def create_SQL_operator(taskfile,dag):
    """
    Creates a MsSQL operator for a given DAG.
    """
    op = MsSqlOperator(
        task_id=taskfile,sql=readSQL(os.path.join(ProjDir,taskfile)),mssql_conn_id='clarity',autocommit=True,database='clarity',dag=dag
        )
    return op

# Airflow arguments
default_args = {
    'owner': 'airflow','description': 'Parallel SQL DAG','depend_on_past': False,'start_date': datetime(2020,1,1),'email': ['*PERSONTOEMAIL*'],'email_on_failure': False,'email_on_retry': True
}

# DAG definition
DAG = DAG(ProjName + '_and_infer',description='Running parallel SQLs for project: {} and inference on the data'.format(ProjName),default_args=default_args,schedule_interval=CronTime,# '0 */2 * * *',#every 2 hours
          concurrency=50,# setup to allow 50 concurrent parallel tasks
          catchup=False)
t_predict = DockerOperator(
        task_id='dockerPredict',image='cmprod',api_version='auto',auto_remove=True,volumes=['*ABSOLUTEPATHTOMOUNT*:/ds-cm'],command='bash inference.sh ',docker_url='unix://var/run/docker.sock',network_mode='bridge',dag=DAG)

# Create SQL task operators in Airflow global space
ops = []
ops = [(order,create_SQL_operator(taskfile,DAG)) for order,taskfile in sql_rank]
ops.sort(key=lambda tup: tup[0])

# create cluster ops list
from itertools import groupby
from operator import itemgetter
opsList = []
opsList = [[j for i,j in grouper] for order,grouper in groupby(ops,key=itemgetter(0))]

# flatten list with only 1 element: Airflow chain() cannot accept list of lists!!
chainList = []
chainList = [reduce(plus,list) if len(list) == 1 else list for list in opsList]
chainList.append(t_predict)

# create final DAG graph
exec(r' >> '.join([r'chainList['+str(i)+r']' for i in range(len(chainList))]))

更新 自从我最初发布此问题以来,我将condatest图像替换为上述代码,并设法以另一种方式出错:挂载的目录中缺少Shell脚本。

当我复制丢失的文件并再次运行时,气流无法再找到最新的图像。我检查了一下,发现新复制的脚本没有执行权限,并添加了该权限。 Airflow仍然找不到以前正常工作的Docker容器。

我删除了shell脚本,气流可以再次找到该容器。这是否意味着问题与Linux权限有关?我不清楚安装的驱动器中的物品如何影响气流检测容器的能力。此外,我知道我过去可以使用dockerobject在气流中启动的docker容器运行相同的脚本。

解决方法

将气流升级到气流 2 后,日志提供了一些附加信息。 Airflow 已配置为在多台服务器上运行,并且已在每台服务器上设置了 docker,但未使用任何映像注册表。似乎当作业调度程序尝试在我构建 docker 映像的服务器以外的服务器上执行 dag 时,该映像不可用。看来我之前找到的解决方法恰逢我的工作安排在哪个服务器上的幸运抽奖。

为了解决这个问题,我们将调度程序配置为仅使用一台服务器。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...