데이터과학 삼학년

[Airflow] Xcom을 이용한 task간 변수 전달 본문

DevOps

[Airflow] Xcom을 이용한 task간 변수 전달

Dan-k 2023. 1. 26. 20:29
반응형

Airflow Xcom

- Xcom : cross-communication의 약자

- Airflow에는 task의 실행에서 야기되는 결과물을 xcom이라는 곳에 넣어두고 다른 task 실행시 해당 결과물을 참조해서 사용할 수 있도록 지원

- 쉽게 말해 Xcom이라는 주머니에 key:value 형태로 데이터를 저장해 놓고 각 task가 해당 데이터를 참조해서 사용하도록 하는 것

# xcom에 데이터 넣기!
def _train_model(**context):
    model_id = "model_sample_1"
    context["task_instance"].xcom_push(key="model_id", value=model_id)

task_train_model = PythonOperator(
    task_id="train_model",
    python_callable=_train_model
)

# xcom에 있는 데이터 가져와 사용하기!
def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
    print(f"Deploying model {model_id}")

task_deploy_model = PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy_model
)

- python operator의 경우, 별도의 key, value를 구성하여 xcom_push를 하지 않아도 return값 자체가 xcom에 저장됨

# xcom에 데이터 넣기!
def _train_model(**context):
    model_id = "model_sample_1"
    return model_id
    
task_train_model = PythonOperator(
    task_id="train_model",
    python_callable=_train_model
)

# xcom에 있는 데이터 가져와 사용하기!
def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(task_ids="train_model")
    print(f"Deploying model {model_id}")

task_deploy_model = PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy_model
)

- template_dcit 에서 xcom을 사용할 수 도 있음

# xcom에 있는 데이터 가져와 사용하기!
def _deploy_model(template_dict, **context):
    model_id = template_dict["model_id"]
    print(f"Deploying model {model_id}")

task_deploy_model = PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy_model,
    template_dict={
    	"model_id":"{{ task_instance.xcom_pull(task_ids='train_model', key='model_id') }}"
    }
)

Xcom 로그 확인 방법(UI)

 

참조

https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-5/142

 

5 Defining dependencies between tasks · Data Pipelines with Apache Airflow

Examining how to define task dependencies in an Airflow DAG · Explaining how to implement joins using trigger rules · Showing how to make tasks conditional on certain conditions · Giving a basic idea of how trigger rules affect the execution of your tas

livebook.manning.com

 

728x90
반응형
LIST
Comments