데이터과학 삼학년

[Airflow] Sequential task loop로 연결하기 본문

DevOps

[Airflow] Sequential task loop로 연결하기

Dan-k 2023. 11. 20. 19:29
반응형

Sequential task loop로 연결하기

예를 들어 단순 반복되어 길게 연결되는 task가 있다면...어떻게 연결할까?

t1 >> t2 >> t3 >> t4 >> t5 >> ... >> t100 까지 직접 연결하는 것은 매우 비효율적이다..

병렬 연결이야 start >> [t1, t2, t3,..] 로 묶으면 되지만...

이럴때 간단한 파이썬 코드로 구성하는 방법을 정리한다.

 

1. 파이썬코드 설정

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator


default_args = {
    "owner": "airflow",
    "start_date": datetime(2023, 1, 1),
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "sequential_task_dag",
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def my_python_function(task_name, **kwargs):
    print(f"Executing {task_name}")


# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]


first_task = PythonOperator(
    task_id=f"{task_names[0]}",
    python_callable=my_python_function,
    op_args=[task_names[0]],
    provide_context=True,
    dag=dag,
)

prev_task = first_task
for task_name in task_names[1:]:
    task = PythonOperator(
        task_id=task_name,
        python_callable=my_python_function,
        op_args=[task_name],
        provide_context=True,
        dag=dag,
    )
    prev_task >> task
    prev_task = task

 

2. chain 이용

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain


default_args = {
    "owner": "airflow",
    "start_date": datetime(2023, 1, 1),
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "sequential_task_dag",
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def my_python_function(task_name, **kwargs):
    print(f"Executing {task_name}")


# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]

tasks = [PythonOperator(
        task_id=task_name,
        python_callable=my_python_function,
        op_args=[task_name],
        provide_context=True,
        dag=dag,
    ) for task_name in task_names
]
chain(*task)
728x90
반응형
LIST
Comments