본문 바로가기

airflow

[airflow] argument 전달

airflow graph view

1. 직접 trigger할때 dag_run 입력

airflow ui를 통해서 인자를 전달할 수 있다.
json 으로 입력해야 하며, opreator에 provide_context=True로 옵션을 주면 함수에 전달된다.

def print_test0(**context):
    dag_run: DagRun = context['dag_run']
    print(dag_run.conf[name])

t0 = PythonOperator(
    task_id='print_test0',
    python_callable=print_test0,
    execution_timeout=timedelta(minutes=2),
    provide_context=True,
    dag=dag
)

 

2. default 값 입력

def print_test1(*op_args, **op_kwargs):
    print(f'op_args : {op_args}')
    print(f'context : {context}')
    
t1 = PythonOperator(
    task_id='print_test1',
    python_callable=print_test1,
    execution_timeout=timedelta(minutes=2),
    op_args=['a', 'b'],
    op_kwargs={'op_kwargs': 'op_kwargs'},
    dag=dag
)

 

3. xcom 이용 (cross-communication)

argumentxcom은 데이터를 전달하는 용도가 아니고, 간단히 상태를 전달할 수 있다.
operator를 사용할때 macro로 변수를 전달하고 싶을때 쓸 수 있다.

def init_task(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key='prefix', value='test_prefix')

init_task = PythonOperator(
    task_id='init_task',
    python_callable=init_task,
    execution_timeout=timedelta(minutes=10),
    provide_context=True,
    op_kwargs={},
    dag=dag
)
example_task = AWSAthenaOperator(
    query = """
        SELECT * 
        FROM {{ task_instance.xcom_pull(task_ids='init_task', key='prefix') }}__tablename
        """
)  # 대충 예제

init_task >> example_task

 

4. 조합

직접실행할때는 dag_run을 사용하고 싶고, 그렇지 않을때에는 기본값을 사용하고 싶다면,
context와 op_kwargs를 조합한다.

default_op_args = {
    'interval_day': 5,
    'n': 0.6,
}

def get_arg(context, name):
    dag_run: DagRun = context['dag_run']
    if dag_run.conf is not None and dag_run.conf.__contains__(name):
        return dag_run.conf[name]

    if context.__contains__(name):
        return context[name]

    return None

def print_test2(**context):
	print(get_arg(context, 'interval_day'))

t2 = PythonOperator(
    task_id='print_test2',
    python_callable=print_test2,
    execution_timeout=timedelta(minutes=2),
    provide_context=True,
    op_kwargs=default_op_args,
    dag=dag
)