1. 직접 trigger할때 dag_run 입력
airflow ui를 통해서 인자를 전달할 수 있다.
json 으로 입력해야 하며, opreator에 provide_context=True로 옵션을 주면 함수에 전달된다.
def print_test0(**context):
dag_run: DagRun = context['dag_run']
print(dag_run.conf[name])
t0 = PythonOperator(
task_id='print_test0',
python_callable=print_test0,
execution_timeout=timedelta(minutes=2),
provide_context=True,
dag=dag
)
2. default 값 입력
def print_test1(*op_args, **op_kwargs):
print(f'op_args : {op_args}')
print(f'context : {context}')
t1 = PythonOperator(
task_id='print_test1',
python_callable=print_test1,
execution_timeout=timedelta(minutes=2),
op_args=['a', 'b'],
op_kwargs={'op_kwargs': 'op_kwargs'},
dag=dag
)
3. xcom 이용 (cross-communication)
argumentxcom은 데이터를 전달하는 용도가 아니고, 간단히 상태를 전달할 수 있다.
operator를 사용할때 macro로 변수를 전달하고 싶을때 쓸 수 있다.
def init_task(**context):
task_instance = context['task_instance']
task_instance.xcom_push(key='prefix', value='test_prefix')
init_task = PythonOperator(
task_id='init_task',
python_callable=init_task,
execution_timeout=timedelta(minutes=10),
provide_context=True,
op_kwargs={},
dag=dag
)
example_task = AWSAthenaOperator(
query = """
SELECT *
FROM {{ task_instance.xcom_pull(task_ids='init_task', key='prefix') }}__tablename
"""
) # 대충 예제
init_task >> example_task
4. 조합
직접실행할때는 dag_run을 사용하고 싶고, 그렇지 않을때에는 기본값을 사용하고 싶다면,
context와 op_kwargs를 조합한다.
default_op_args = {
'interval_day': 5,
'n': 0.6,
}
def get_arg(context, name):
dag_run: DagRun = context['dag_run']
if dag_run.conf is not None and dag_run.conf.__contains__(name):
return dag_run.conf[name]
if context.__contains__(name):
return context[name]
return None
def print_test2(**context):
print(get_arg(context, 'interval_day'))
t2 = PythonOperator(
task_id='print_test2',
python_callable=print_test2,
execution_timeout=timedelta(minutes=2),
provide_context=True,
op_kwargs=default_op_args,
dag=dag
)