250x250
반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- gather_nd
- login crawling
- spark udf
- requests
- Retry
- TensorFlow
- session 유지
- flask
- correlation
- API
- tensorflow text
- 상관관계
- UDF
- Airflow
- hadoop
- Counterfactual Explanations
- grad-cam
- BigQuery
- top_k
- 유튜브 API
- 공분산
- chatGPT
- youtube data
- GCP
- integrated gradient
- XAI
- airflow subdag
- API Gateway
- GenericGBQException
- subdag
Archives
- Today
- Total
데이터과학 삼학년
[Airflow] Xcom을 이용한 task간 변수 전달 본문
반응형
Airflow Xcom
- Xcom : cross-communication의 약자
- Airflow에는 task의 실행에서 야기되는 결과물을 xcom이라는 곳에 넣어두고 다른 task 실행시 해당 결과물을 참조해서 사용할 수 있도록 지원
- 쉽게 말해 Xcom이라는 주머니에 key:value 형태로 데이터를 저장해 놓고 각 task가 해당 데이터를 참조해서 사용하도록 하는 것
# xcom에 데이터 넣기!
def _train_model(**context):
model_id = "model_sample_1"
context["task_instance"].xcom_push(key="model_id", value=model_id)
task_train_model = PythonOperator(
task_id="train_model",
python_callable=_train_model
)
# xcom에 있는 데이터 가져와 사용하기!
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
print(f"Deploying model {model_id}")
task_deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model
)
- python operator의 경우, 별도의 key, value를 구성하여 xcom_push를 하지 않아도 return값 자체가 xcom에 저장됨
# xcom에 데이터 넣기!
def _train_model(**context):
model_id = "model_sample_1"
return model_id
task_train_model = PythonOperator(
task_id="train_model",
python_callable=_train_model
)
# xcom에 있는 데이터 가져와 사용하기!
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model")
print(f"Deploying model {model_id}")
task_deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model
)
- template_dcit 에서 xcom을 사용할 수 도 있음
# xcom에 있는 데이터 가져와 사용하기!
def _deploy_model(template_dict, **context):
model_id = template_dict["model_id"]
print(f"Deploying model {model_id}")
task_deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
template_dict={
"model_id":"{{ task_instance.xcom_pull(task_ids='train_model', key='model_id') }}"
}
)
Xcom 로그 확인 방법(UI)
참조
https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-5/142
728x90
반응형
LIST
'DevOps' 카테고리의 다른 글
[Airflow] SubDag을 이용하여 MainDag과 스케쥴을 다르게 설정하기!!!!! (feat. ChatGPT) (0) | 2023.03.13 |
---|---|
[Airflow] SubDag 개념, 장단점, 샘플 코드 (feat. ChatGPT) (0) | 2023.03.11 |
[Airflow] Airflow context variable (0) | 2023.01.24 |
[GIT] LFS (Large File System) (0) | 2022.12.02 |
Load balancing을 위한 crontab - 젠킨스 스케쥴러 (h * * * * ) (0) | 2021.04.07 |
Comments