问题描述
我正在尝试首次在笔记本电脑上配置Airflow(不使用docker,仅遵循文档)。我的目标是建立一个简单的ETL作业。
from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import Pythonoperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow','depends_on_past': False,'start_date': days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=1),}
dag = DAG(
'airflow_dag_tutorial-new',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1),)
run_etl = Pythonoperator(
task_id='main_task',python_callable=spotify_etl,dag=dag,)
run_etl
当我通过打印语句传递虚拟函数时,DAG成功运行。但是,当我传递调用Spotify API的函数spotify_etl时,DAG将失败。这是功能:
def spotify_etl():
token = 'xxx'
headers = {
'Accept' : "application/json",'Content-Type': "application/json",'Authorization': 'Bearer {token}'.format(token=token)
}
today = datetime.datetime.Now()
yesterday = today - datetime.timedelta(days=100)
yesterday_unix_timestamp = int(yesterday.timestamp()) *1000
r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp),headers=headers)
data = r.json()
print(data)
我得到的错误是:
[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT
有人知道如何正确地将Pythonoperator用于调用API的函数吗?是什么导致此错误?
我尝试设置:在我的静脉中导出OBJC_disABLE_INITIALIZE_FORK_SAFETY = YES(如此处建议的Airflow task running tweepy exits with return code -6和此处:https://github.com/ansible/ansible/issues/32499#issuecomment-341578864),但这似乎无法解决。
解决方法
事实证明未正确设置“导出OBJC_DISABLE_INITIALIZE_FORK_SAFETY = YES”。必须将其添加到.zshrc而不是.bash_profile。解决了。