250x250
반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- airflow subdag
- top_k
- correlation
- flask
- spark udf
- TensorFlow
- hadoop
- API
- Airflow
- 유튜브 API
- XAI
- youtube data
- integrated gradient
- chatGPT
- tensorflow text
- login crawling
- requests
- subdag
- 공분산
- gather_nd
- UDF
- Retry
- GenericGBQException
- session 유지
- GCP
- Counterfactual Explanations
- API Gateway
- grad-cam
- BigQuery
- 상관관계
Archives
- Today
- Total
데이터과학 삼학년
[Airflow] Sequential task loop로 연결하기 본문
반응형
Sequential task loop로 연결하기
예를 들어 단순 반복되어 길게 연결되는 task가 있다면...어떻게 연결할까?
t1 >> t2 >> t3 >> t4 >> t5 >> ... >> t100 까지 직접 연결하는 것은 매우 비효율적이다..
병렬 연결이야 start >> [t1, t2, t3,..] 로 묶으면 되지만...
이럴때 간단한 파이썬 코드로 구성하는 방법을 정리한다.
1. 파이썬코드 설정
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sequential_task_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_python_function(task_name, **kwargs):
print(f"Executing {task_name}")
# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]
first_task = PythonOperator(
task_id=f"{task_names[0]}",
python_callable=my_python_function,
op_args=[task_names[0]],
provide_context=True,
dag=dag,
)
prev_task = first_task
for task_name in task_names[1:]:
task = PythonOperator(
task_id=task_name,
python_callable=my_python_function,
op_args=[task_name],
provide_context=True,
dag=dag,
)
prev_task >> task
prev_task = task
2. chain 이용
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sequential_task_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_python_function(task_name, **kwargs):
print(f"Executing {task_name}")
# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]
tasks = [PythonOperator(
task_id=task_name,
python_callable=my_python_function,
op_args=[task_name],
provide_context=True,
dag=dag,
) for task_name in task_names
]
chain(*task)
728x90
반응형
LIST
'DevOps' 카테고리의 다른 글
[GitHub] git revert 와 reset의 차이 (0) | 2023.12.13 |
---|---|
[Airflow] task, dag 우선순위 설정 (priority_weight) (0) | 2023.11.28 |
Airflow execution_date (logical_date) (0) | 2023.09.05 |
쿠버네티스(Kubernetes) (0) | 2023.07.24 |
[Jenkins] 파이프라인 빌드시 git branch목록을 선택하여 배포! (0) | 2023.05.03 |
Comments