일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- BigQuery
- flask
- top_k
- GCP
- API
- 유튜브 API
- TensorFlow
- Airflow
- login crawling
- grad-cam
- GenericGBQException
- chatGPT
- subdag
- youtube data
- hadoop
- Counterfactual Explanations
- 공분산
- session 유지
- integrated gradient
- 상관관계
- UDF
- API Gateway
- spark udf
- gather_nd
- airflow subdag
- requests
- Retry
- XAI
- correlation
- tensorflow text
- Today
- Total
데이터과학 삼학년
Airflow 본문
Airflow
-
python 파일을 이용하여 workflow 단위인 dag(directed acyclic graph)을 만들어 전체적인 파이프라인을 관리
-
~/airflow/dags/ 폴더 아래에 해당 python 파일을 넣으면 UI에 dag가 나타나게 됨
-
web에서 dag을 생성하고, code를 수정할 수 없음
-
python 파일을 직접 수정해서 환경 폴더내에 넣어줘야함
-
python 기반으로 쉽게 workflow를 생성할 수 있다는 장점은 있으나, 웹 ui에서 모두 수정이 가능한 젠킨스에 비해 활용성이 떨어지는 것으로 보임
-
GCP에서는 COMPOSER라는 개념으로 AIRFLOW를 제공해주고 있음
Import Modules
-
airflow 모듈을 통해 필요한 operator, 기능 등 import 해서 사용
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
Default Arguments
-
dag생성을 위한 정보들을 담을 수 있는 args입력
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
Intantiate a DAG
-
airflow의 dag 인스턴스를 생성
-
dag의 명칭과 참고할 정보인 args, 그리고 실행 schedule을 넣어줄 수 있음
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
Tasks
-
workflow 안에서 실행될 task (job)들을 의미함
-
task 실행을 위한 Operator를 가지고 실행을 조절할 수 있음
-
bash, python, gcp 관련 등
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
Templating with Jinja
-
Jinja 템플릿 방식 적용이 가능함
-
파라미터를 {{ dd }} 식으로 넣어서 사용 가능
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
Adding DAG and Tasks documentation
-
DAG과 각 Task에 대한 설명을 달 수 있음
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
Setting up Dependencies
-
각 task의 종속관계, 즉 실행 순서등을 입력할 수 있음
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
-
[ ] : 병렬 처리
Recap
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
t1 >> [t2, t3]
UI example
출처 : https://airflow.apache.org/docs/stable/tutorial.html#setting-up-dependencies
Airflow 다양한 Tip
-
pip로 설치하면 생기는 기본 설정
-
sequential executor : 기본적으로 1개만 실행할 수 있음. 병렬 실행 불가능
-
celery executor를 사용해 병렬로 실행할 수 있음
-
이건 RabbitMQ나 Redis가 필요함
-
-
sqlite : 기본 meta store는 sqlite인데, 동시 접근이 불가능해서 병렬 불가능
-
병렬 실행하기 위해 mysql이나 postgresql을 사용해야 함
-
-
-
위 설정을 서버에 매번 하는 일은 번거로운 일
-
Docker로 만들면 쉽게 가능
-
docker-airflow Github에 보면 이미 만들어진 Dockerfile이 있음
-
docker-compose로 실행하는 방법도 있고, Airflow 버전이 올라가면 빠르게 업데이트하는 편
-
-
Airflow의 DAG 폴더(default는 ~/airflow/dags)를 더 쉽게 사용하려면 해당 폴더를 workspace로 하는 jupyter notebook을 띄워도 좋음
-
데이터 분석가도 쉽게 DAG 파일 생성 가능
-
-
Error: Already running on PID XXXX 에러 발생시
-
Airflow webserver 실행시 ~/airflow/airflow-webserver.pid에 process id를 저장함
-
쉘에서 빠져나왔지만 종료되지 않는 경우가 있음
-
아래 명령어로 pid를 죽이면 됨(다른 포트를 사용할 경우 8080쪽 수정)
kill -9 $(lsof -t -i:8080)
-
-
Task간 데이터를 주고 받아야 할 경우
-
xcom 사용
-
Admin - xcom에 들어가면 값이 보임
-
xcom에 데이터 저장(xcom_push)
task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str)
-
다른 task에서 데이터 불러오기(xcom_pull)
task_instance.xcom_pull(task_ids='my_task', key='the_key')
-
참고로 PythonOperator에서 사용하는 python_callable 함수에서 return하는 값은 xcom에 자동으로 push됨
-
-
DAG에서 다른 DAG에 종속성이 필요한 경우
-
1개의 DAG에서 하면 좋지만, 여러 사람이 만든 DAG이 있고 그 중 하나를 사용해야 할 경우도 있음
-
특정 DAG을 Trigger하고 싶은 경우
-
특정 Task의 성공/실패에 따라 다른 Task를 실행시키고 싶은 경우
-
Airflow Trigger Rule 사용
-
예를 들어 앞의 두 작업중 하나만 실패한 경우
-
Document 참고
-
-
Jinja Template이 작동하지 않는 경우
-
우선 provide_context=True 조건을 주었는지 확인
-
Jinja Template이 있는 함수의 파라미터가 어떤 것인지 확인
-
Operator마다 Jinja Template을 해주는 template_fields가 있는데, 기존 정의가 아닌 파라미터에서 사용하고 싶은 경우 새롭게 정의
class MyPythonOperator(PythonOperator): template_fields = ('templates_dict','op_args')
-
-
Airflow 변수를 저장하고 싶은 경우
-
Variable 사용
-
Admin - Variables에서 볼 수 있음
-
json 파일로 변수 저장해서 사용하는 방식을 자주 사용함
from airflow.models import Variable config=Variable.get(f"{HOME}/config.json", deserialize_json=True) environment=config["environment"] project_id=config["project_id"]
-
-
Task를 그룹화하고 싶은 경우
-
dummy_operator 사용
-
workflow 목적으로 사용하는 경우도 있음. 일부 작업을 건너뛰고 싶은 경우, 빈 작업 경로를 가질 수 없어서 dummy_operator를 사용하기도 함
-
-
1개의 Task이 완료된 후에 2개의 Task를 병렬로 실행하기
task1 >> [task2_1, task_2_2]
-
앞선 Task의 결과에 따라 True인 경우엔 A Task, False인 경우엔 B Task를 실행해야 하는 경우
-
BranchPythonOperator 사용
-
python_callable에 if문을 넣어 그 다음 task_id를 정의할 수 있음
-
단순하게 앞선 task가 성공, 1개만 실패 등이라면 trigger_rule만 정의해도 됨
-
-
Airflow에서 Jupyter Notebook의 특정 값만 바꾸며 실행하고 싶은 경우
-
Papermill 사용
-
-
UTC 시간대를 항상 생각 --> Airflow는 UTC 기준
-
execution_date이 너무 헷갈림
-
2017년에 Airflow 처음 사용할 때 매우 헷갈렸던 개념
-
박호균님의 블로그 참고
-
추후 다른 글로 정리할 예정
-
-
Task가 실패했을 경우 슬랙 메세지 전송하기
-
Integrating Slack Alerts in Airflow 참고하면 잘 나와있음
-
-
Hook이란?
-
Hook은 외부 플랫폼, 데이터베이스(예: Hive, S3, MySQL, Postgres, Google Cloud Platfom 등)에 접근할 수 있도록 만든 인터페이스
-
대부분 Operator가 실행되기 전에 Hook을 통해 통신함
-
공식 문서 참고
-
-
머신러닝에서 사용한 예시는 Github 참고
-
airflow Github에 많은 예제 파일이 있음
-
Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤 하는 경우, Backfill을 사용할 수 있음
-
과거값 기준으로 재실행
-
단, 쿼리에 CURRENT_DATE() 등을 쓰면 Airflow에서 날짜를 컨트롤하지 않고 쿼리에서 날짜 컨트롤하는 경우라 backfill해도 CURRENT_DATE()이 있어서 현재 날짜 기준으로 될 것임
airflow backfill -s 2020-01-05 -e 2020-01-10 dag_id
-
-
재시도시 Exponential하게 실행되길 원하면
-
retry_exponential_backoff 참고
-
출처 : https://zzsza.github.io/data/2018/01/04/airflow-1/
'DevOps' 카테고리의 다른 글
젠킨스 workspace clean-up 설정 변경 (feat. 난 clean-up을 원치 않아!) (1) | 2020.06.05 |
---|---|
Airflow VS Jenkins (0) | 2020.06.04 |
jenkins workspace cleanup 설정 (0) | 2020.05.08 |
Docker 명령어 정리 (3) | 2020.04.07 |
Crontab (스케쥴러 설정) (0) | 2020.03.26 |