DevOps

Airflow

Dan-k 2020. 6. 3. 15:45
반응형

Airflow

  • python 파일을 이용하여 workflow 단위인 dag(directed acyclic graph)을 만들어 전체적인 파이프라인을 관리

  • ~/airflow/dags/ 폴더 아래에 해당 python 파일을 넣으면 UI에 dag가 나타나게 됨

    • web에서 dag을 생성하고, code를 수정할 수 없음

    • python 파일을 직접 수정해서 환경 폴더내에 넣어줘야함

  • python 기반으로 쉽게 workflow를 생성할 수 있다는 장점은 있으나, 웹 ui에서 모두 수정이 가능한 젠킨스에 비해 활용성이 떨어지는 것으로 보임

  • GCP에서는 COMPOSER라는 개념으로 AIRFLOW를 제공해주고 있음

 

Import Modules

  • airflow 모듈을 통해 필요한 operator, 기능 등 import 해서 사용

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago


Default Arguments

  • dag생성을 위한 정보들을 담을 수  있는 args입력

# You can override them on a per-task basis during operator initialization

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

 


Intantiate a DAG

  • airflow의 dag 인스턴스를 생성

  • dag의 명칭과 참고할 정보인 args, 그리고 실행 schedule을 넣어줄 수 있음

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

 

Tasks

  • workflow 안에서 실행될 task (job)들을 의미함

  • task 실행을 위한 Operator를 가지고 실행을 조절할 수 있음

    • bash, python, gcp 관련 등

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)


t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

 

Templating with Jinja

  • Jinja 템플릿 방식 적용이 가능함

    • 파라미터를 {{ dd }} 식으로 넣어서 사용 가능

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""


t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)


Adding DAG and Tasks documentation

  • DAG과 각 Task에 대한 설명을 달 수 있음

dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""


Setting up Dependencies

  • 각 task의 종속관계, 즉 실행 순서등을 입력할 수 있음

t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:

t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:

t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:

t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

 

  • [ ] : 병렬 처리

Recap

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

 

 

 

UI example

 

 

 

출처 : https://airflow.apache.org/docs/stable/tutorial.html#setting-up-dependencies

 

Tutorial — Airflow Documentation

 

airflow.apache.org

 


Airflow 다양한 Tip

  • pip로 설치하면 생기는 기본 설정

    • sequential executor : 기본적으로 1개만 실행할 수 있음. 병렬 실행 불가능

      • celery executor를 사용해 병렬로 실행할 수 있음

      • 이건 RabbitMQ나 Redis가 필요함

    • sqlite : 기본 meta store는 sqlite인데, 동시 접근이 불가능해서 병렬 불가능

      • 병렬 실행하기 위해 mysql이나 postgresql을 사용해야 함

  • 위 설정을 서버에 매번 하는 일은 번거로운 일

    • Docker로 만들면 쉽게 가능

    • docker-airflow Github에 보면 이미 만들어진 Dockerfile이 있음

    • docker-compose로 실행하는 방법도 있고, Airflow 버전이 올라가면 빠르게 업데이트하는 편

  • Airflow의 DAG 폴더(default는 ~/airflow/dags)를 더 쉽게 사용하려면 해당 폴더를 workspace로 하는 jupyter notebook을 띄워도 좋음

    • 데이터 분석가도 쉽게 DAG 파일 생성 가능

  • Error: Already running on PID XXXX 에러 발생시

    • Airflow webserver 실행시 ~/airflow/airflow-webserver.pid에 process id를 저장함

    • 쉘에서 빠져나왔지만 종료되지 않는 경우가 있음

    • 아래 명령어로 pid를 죽이면 됨(다른 포트를 사용할 경우 8080쪽 수정)

      kill -9 $(lsof -t -i:8080)

  • Task간 데이터를 주고 받아야 할 경우

    • xcom 사용

    • Admin - xcom에 들어가면 값이 보임

    • xcom에 데이터 저장(xcom_push)

      task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str)

    • 다른 task에서 데이터 불러오기(xcom_pull)

      task_instance.xcom_pull(task_ids='my_task', key='the_key')

    • 참고로 PythonOperator에서 사용하는 python_callable 함수에서 return하는 값은 xcom에 자동으로 push됨

  • DAG에서 다른 DAG에 종속성이 필요한 경우

    • ExternalTaskSensor 사용

    • 1개의 DAG에서 하면 좋지만, 여러 사람이 만든 DAG이 있고 그 중 하나를 사용해야 할 경우도 있음

  • 특정 DAG을 Trigger하고 싶은 경우

  • 특정 Task의 성공/실패에 따라 다른 Task를 실행시키고 싶은 경우

    • Airflow Trigger Rule 사용

    • 예를 들어 앞의 두 작업중 하나만 실패한 경우

    • Document 참고

  • Jinja Template이 작동하지 않는 경우

    • 우선 provide_context=True 조건을 주었는지 확인

    • Jinja Template이 있는 함수의 파라미터가 어떤 것인지 확인

    • Operator마다 Jinja Template을 해주는 template_fields가 있는데, 기존 정의가 아닌 파라미터에서 사용하고 싶은 경우 새롭게 정의

      class MyPythonOperator(PythonOperator): template_fields = ('templates_dict','op_args')

  • Airflow 변수를 저장하고 싶은 경우

    • Variable 사용

    • Admin - Variables에서 볼 수 있음

    • json 파일로 변수 저장해서 사용하는 방식을 자주 사용함

    from airflow.models import Variable config=Variable.get(f"{HOME}/config.json", deserialize_json=True) environment=config["environment"] project_id=config["project_id"]

  • Task를 그룹화하고 싶은 경우

    • dummy_operator 사용

    • workflow 목적으로 사용하는 경우도 있음. 일부 작업을 건너뛰고 싶은 경우, 빈 작업 경로를 가질 수 없어서 dummy_operator를 사용하기도 함

  • 1개의 Task이 완료된 후에 2개의 Task를 병렬로 실행하기

    task1 >> [task2_1, task_2_2]

  • 앞선 Task의 결과에 따라 True인 경우엔 A Task, False인 경우엔 B Task를 실행해야 하는 경우

    • BranchPythonOperator 사용

    • python_callable에 if문을 넣어 그 다음 task_id를 정의할 수 있음

    • 단순하게 앞선 task가 성공, 1개만 실패 등이라면 trigger_rule만 정의해도 됨

  • Airflow에서 Jupyter Notebook의 특정 값만 바꾸며 실행하고 싶은 경우

  • UTC 시간대를 항상 생각 --> Airflow는 UTC 기준

  • execution_date이 너무 헷갈림

    • 2017년에 Airflow 처음 사용할 때 매우 헷갈렸던 개념

    • 박호균님의 블로그 참고

    • 추후 다른 글로 정리할 예정

  • Task가 실패했을 경우 슬랙 메세지 전송하기

  • Hook이란?

    • Hook은 외부 플랫폼, 데이터베이스(예: Hive, S3, MySQL, Postgres, Google Cloud Platfom 등)에 접근할 수 있도록 만든 인터페이스

    • 대부분 Operator가 실행되기 전에 Hook을 통해 통신함

    • 공식 문서 참고

  • 머신러닝에서 사용한 예시는 Github 참고

  • airflow Github에 많은 예제 파일이 있음

  • Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤 하는 경우, Backfill을 사용할 수 있음

    • 과거값 기준으로 재실행

    • 단, 쿼리에 CURRENT_DATE() 등을 쓰면 Airflow에서 날짜를 컨트롤하지 않고 쿼리에서 날짜 컨트롤하는 경우라 backfill해도 CURRENT_DATE()이 있어서 현재 날짜 기준으로 될 것임

    airflow backfill -s 2020-01-05 -e 2020-01-10 dag_id

  • 재시도시 Exponential하게 실행되길 원하면

    • retry_exponential_backoff 참고

 

출처 : https://zzsza.github.io/data/2018/01/04/airflow-1/

 

Apache Airflow - Workflow 관리 도구(1)

오늘은 Workflow Management Tool인 Apache Airflow 관련 포스팅을 하려고 합니다. 이 글은 1.10.3 버전에서 작성되었습니다 최초 작성은 2018년 1월 4일이지만, 2020년 2월 9일에 글을 리뉴얼했습니다 슬라이드

zzsza.github.io

 

728x90
반응형
LIST