DevOps
[Airflow] Sequential task loop로 연결하기
Dan-k
2023. 11. 20. 19:29
반응형
Sequential task loop로 연결하기
예를 들어 단순 반복되어 길게 연결되는 task가 있다면...어떻게 연결할까?
t1 >> t2 >> t3 >> t4 >> t5 >> ... >> t100 까지 직접 연결하는 것은 매우 비효율적이다..
병렬 연결이야 start >> [t1, t2, t3,..] 로 묶으면 되지만...
이럴때 간단한 파이썬 코드로 구성하는 방법을 정리한다.
1. 파이썬코드 설정
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sequential_task_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_python_function(task_name, **kwargs):
print(f"Executing {task_name}")
# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]
first_task = PythonOperator(
task_id=f"{task_names[0]}",
python_callable=my_python_function,
op_args=[task_names[0]],
provide_context=True,
dag=dag,
)
prev_task = first_task
for task_name in task_names[1:]:
task = PythonOperator(
task_id=task_name,
python_callable=my_python_function,
op_args=[task_name],
provide_context=True,
dag=dag,
)
prev_task >> task
prev_task = task
2. chain 이용
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sequential_task_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_python_function(task_name, **kwargs):
print(f"Executing {task_name}")
# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]
tasks = [PythonOperator(
task_id=task_name,
python_callable=my_python_function,
op_args=[task_name],
provide_context=True,
dag=dag,
) for task_name in task_names
]
chain(*task)
728x90
반응형
LIST