250x250
반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- youtube data
- Airflow
- 상관관계
- BigQuery
- XAI
- login crawling
- hadoop
- GCP
- gather_nd
- Counterfactual Explanations
- API
- Retry
- chatGPT
- top_k
- integrated gradient
- 유튜브 API
- tensorflow text
- spark udf
- UDF
- TensorFlow
- correlation
- 공분산
- subdag
- grad-cam
- flask
- requests
- API Gateway
- GenericGBQException
- airflow subdag
- session 유지
Archives
- Today
- Total
데이터과학 삼학년
[Airflow] Sequential task loop로 연결하기 본문
반응형
Sequential task loop로 연결하기
예를 들어 단순 반복되어 길게 연결되는 task가 있다면...어떻게 연결할까?
t1 >> t2 >> t3 >> t4 >> t5 >> ... >> t100 까지 직접 연결하는 것은 매우 비효율적이다..
병렬 연결이야 start >> [t1, t2, t3,..] 로 묶으면 되지만...
이럴때 간단한 파이썬 코드로 구성하는 방법을 정리한다.
1. 파이썬코드 설정
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sequential_task_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_python_function(task_name, **kwargs):
print(f"Executing {task_name}")
# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]
first_task = PythonOperator(
task_id=f"{task_names[0]}",
python_callable=my_python_function,
op_args=[task_names[0]],
provide_context=True,
dag=dag,
)
prev_task = first_task
for task_name in task_names[1:]:
task = PythonOperator(
task_id=task_name,
python_callable=my_python_function,
op_args=[task_name],
provide_context=True,
dag=dag,
)
prev_task >> task
prev_task = task
2. chain 이용
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sequential_task_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_python_function(task_name, **kwargs):
print(f"Executing {task_name}")
# Define tasks in a loop
task_names = ["Task_1", "Task_2", "Task_3", "Task_4"]
tasks = [PythonOperator(
task_id=task_name,
python_callable=my_python_function,
op_args=[task_name],
provide_context=True,
dag=dag,
) for task_name in task_names
]
chain(*task)
728x90
반응형
LIST
'DevOps' 카테고리의 다른 글
[GitHub] git revert 와 reset의 차이 (0) | 2023.12.13 |
---|---|
[Airflow] task, dag 우선순위 설정 (priority_weight) (0) | 2023.11.28 |
Airflow execution_date (logical_date) (0) | 2023.09.05 |
쿠버네티스(Kubernetes) (0) | 2023.07.24 |
[Jenkins] 파이프라인 빌드시 git branch목록을 선택하여 배포! (0) | 2023.05.03 |
Comments