当PythonOperator尝试调用API并下载数据时,Airflow DAG失败

问题描述

我正在尝试首次在笔记本电脑上配置Airflow(不使用docker,仅遵循文档)。我的目标是建立一个简单的ETL作业。

我用一个Pythonoperator编写了最简单的DAG:

from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import Pythonoperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow','depends_on_past': False,'start_date': days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=1),}
dag = DAG(
    'airflow_dag_tutorial-new',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1),)


run_etl = Pythonoperator(
        task_id='main_task',python_callable=spotify_etl,dag=dag,)

run_etl

当我通过打印语句传递虚拟函数时,DAG成功运行。但是,当我传递调用Spotify API的函数spotify_etl时,DAG将失败。这是功能

def spotify_etl():
    token = 'xxx'

    headers = {
    'Accept' : "application/json",'Content-Type': "application/json",'Authorization': 'Bearer {token}'.format(token=token)    
    }  


    today = datetime.datetime.Now()
    yesterday = today - datetime.timedelta(days=100)
    yesterday_unix_timestamp = int(yesterday.timestamp()) *1000


    r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp),headers=headers)
    data = r.json()
    print(data)

我得到的错误是:

[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT

有人知道如何正确地将Pythonoperator用于调用API的函数吗?是什么导致此错误

我尝试设置:在我的静脉中导出OBJC_disABLE_INITIALIZE_FORK_SAFETY = YES(如此处建议的Airflow task running tweepy exits with return code -6和此处:https://github.com/ansible/ansible/issues/32499#issuecomment-341578864),但这似乎无法解决

解决方法

事实证明未正确设置“导出OBJC_DISABLE_INITIALIZE_FORK_SAFETY = YES”。必须将其添加到.zshrc而不是.bash_profile。解决了。​​