데이터과학 삼학년
Airflow 본문
python 파일을 이용하여 workflow 단위인 dag(directed acyclic graph)을 만들어 전체적인 파이프라인을 관리
~/airflow/dags/ 폴더 아래에 해당 python 파일을 넣으면 UI에 dag가 나타나게 됨
web에서 dag을 생성하고, code를 수정할 수 없음
python 파일을 직접 수정해서 환경 폴더내에 넣어줘야함
python 기반으로 쉽게 workflow를 생성할 수 있다는 장점은 있으나, 웹 ui에서 모두 수정이 가능한 젠킨스에 비해 활용성이 떨어지는 것으로 보임
GCP에서는 COMPOSER라는 개념으로 AIRFLOW를 제공해주고 있음
Import Modules
airflow 모듈을 통해 필요한 operator, 기능 등 import 해서 사용
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
Default Arguments
dag생성을 위한 정보들을 담을 수 있는 args입력
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
Intantiate a DAG
airflow의 dag 인스턴스를 생성
dag의 명칭과 참고할 정보인 args, 그리고 실행 schedule을 넣어줄 수 있음
dag = DAG(
description='A simple tutorial DAG',
workflow 안에서 실행될 task (job)들을 의미함
task 실행을 위한 Operator를 가지고 실행을 조절할 수 있음
bash, python, gcp 관련 등
t1 = BashOperator(
t2 = BashOperator(
bash_command='sleep 5',
Templating with Jinja
Jinja 템플릿 방식 적용이 가능함
파라미터를 {{ dd }} 식으로 넣어서 사용 가능
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
t3 = BashOperator(
params={'my_param': 'Parameter I passed in'},
Adding DAG and Tasks documentation
DAG과 각 Task에 대한 설명을 달 수 있음
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

Setting up Dependencies
각 task의 종속관계, 즉 실행 순서등을 입력할 수 있음
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
[ ] : 병렬 처리
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
dag = DAG(
description='A simple tutorial DAG',
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
t2 = BashOperator(
bash_command='sleep 5',
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
t3 = BashOperator(
params={'my_param': 'Parameter I passed in'},
t1 >> [t2, t3]
UI example
출처 : https://airflow.apache.org/docs/stable/tutorial.html#setting-up-dependencies
Tutorial — Airflow Documentation
Airflow 다양한 Tip
pip로 설치하면 생기는 기본 설정
sequential executor : 기본적으로 1개만 실행할 수 있음. 병렬 실행 불가능
celery executor를 사용해 병렬로 실행할 수 있음
이건 RabbitMQ나 Redis가 필요함
sqlite : 기본 meta store는 sqlite인데, 동시 접근이 불가능해서 병렬 불가능
병렬 실행하기 위해 mysql이나 postgresql을 사용해야 함
위 설정을 서버에 매번 하는 일은 번거로운 일
Docker로 만들면 쉽게 가능
docker-airflow Github에 보면 이미 만들어진 Dockerfile이 있음
docker-compose로 실행하는 방법도 있고, Airflow 버전이 올라가면 빠르게 업데이트하는 편
Airflow의 DAG 폴더(default는 ~/airflow/dags)를 더 쉽게 사용하려면 해당 폴더를 workspace로 하는 jupyter notebook을 띄워도 좋음
데이터 분석가도 쉽게 DAG 파일 생성 가능
Error: Already running on PID XXXX 에러 발생시
Airflow webserver 실행시 ~/airflow/airflow-webserver.pid에 process id를 저장함
쉘에서 빠져나왔지만 종료되지 않는 경우가 있음
아래 명령어로 pid를 죽이면 됨(다른 포트를 사용할 경우 8080쪽 수정)
kill -9 $(lsof -t -i:8080)
Task간 데이터를 주고 받아야 할 경우
xcom 사용
Admin - xcom에 들어가면 값이 보임
xcom에 데이터 저장(xcom_push)
task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str)
다른 task에서 데이터 불러오기(xcom_pull)
task_instance.xcom_pull(task_ids='my_task', key='the_key')
참고로 PythonOperator에서 사용하는 python_callable 함수에서 return하는 값은 xcom에 자동으로 push됨
DAG에서 다른 DAG에 종속성이 필요한 경우
1개의 DAG에서 하면 좋지만, 여러 사람이 만든 DAG이 있고 그 중 하나를 사용해야 할 경우도 있음
특정 DAG을 Trigger하고 싶은 경우
특정 Task의 성공/실패에 따라 다른 Task를 실행시키고 싶은 경우
Airflow Trigger Rule 사용
예를 들어 앞의 두 작업중 하나만 실패한 경우
Document 참고
Jinja Template이 작동하지 않는 경우
우선 provide_context=True 조건을 주었는지 확인
Jinja Template이 있는 함수의 파라미터가 어떤 것인지 확인
Operator마다 Jinja Template을 해주는 template_fields가 있는데, 기존 정의가 아닌 파라미터에서 사용하고 싶은 경우 새롭게 정의
class MyPythonOperator(PythonOperator): template_fields = ('templates_dict','op_args')
Airflow 변수를 저장하고 싶은 경우
Variable 사용
Admin - Variables에서 볼 수 있음
json 파일로 변수 저장해서 사용하는 방식을 자주 사용함
from airflow.models import Variable config=Variable.get(f"{HOME}/config.json", deserialize_json=True) environment=config["environment"] project_id=config["project_id"]
Task를 그룹화하고 싶은 경우
dummy_operator 사용
workflow 목적으로 사용하는 경우도 있음. 일부 작업을 건너뛰고 싶은 경우, 빈 작업 경로를 가질 수 없어서 dummy_operator를 사용하기도 함
1개의 Task이 완료된 후에 2개의 Task를 병렬로 실행하기
task1 >> [task2_1, task_2_2]
앞선 Task의 결과에 따라 True인 경우엔 A Task, False인 경우엔 B Task를 실행해야 하는 경우
BranchPythonOperator 사용
python_callable에 if문을 넣어 그 다음 task_id를 정의할 수 있음
단순하게 앞선 task가 성공, 1개만 실패 등이라면 trigger_rule만 정의해도 됨
Airflow에서 Jupyter Notebook의 특정 값만 바꾸며 실행하고 싶은 경우
Papermill 사용
UTC 시간대를 항상 생각 --> Airflow는 UTC 기준
execution_date이 너무 헷갈림
2017년에 Airflow 처음 사용할 때 매우 헷갈렸던 개념
박호균님의 블로그 참고
추후 다른 글로 정리할 예정
Task가 실패했을 경우 슬랙 메세지 전송하기
Integrating Slack Alerts in Airflow 참고하면 잘 나와있음
Hook은 외부 플랫폼, 데이터베이스(예: Hive, S3, MySQL, Postgres, Google Cloud Platfom 등)에 접근할 수 있도록 만든 인터페이스
대부분 Operator가 실행되기 전에 Hook을 통해 통신함
공식 문서 참고
머신러닝에서 사용한 예시는 Github 참고
airflow Github에 많은 예제 파일이 있음
Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤 하는 경우, Backfill을 사용할 수 있음
과거값 기준으로 재실행
단, 쿼리에 CURRENT_DATE() 등을 쓰면 Airflow에서 날짜를 컨트롤하지 않고 쿼리에서 날짜 컨트롤하는 경우라 backfill해도 CURRENT_DATE()이 있어서 현재 날짜 기준으로 될 것임
airflow backfill -s 2020-01-05 -e 2020-01-10 dag_id
재시도시 Exponential하게 실행되길 원하면
retry_exponential_backoff 참고
출처 : https://zzsza.github.io/data/2018/01/04/airflow-1/
Apache Airflow - Workflow 관리 도구(1)
오늘은 Workflow Management Tool인 Apache Airflow 관련 포스팅을 하려고 합니다. 이 글은 1.10.3 버전에서 작성되었습니다 최초 작성은 2018년 1월 4일이지만, 2020년 2월 9일에 글을 리뉴얼했습니다 슬라이드
