Contents
- 1 Что такое DAG Context?
- 2 Концепция планирования (Airflow Schedule)
- 3 Разбор кейса с переменными из контекста с запуском 1 раз в день
- 4 Статьи на английском по Airflow
Что такое DAG Context?
Контекст DAG в Apache Airflow — это словарь, содержащий информацию о работающем DAG и его среде Airflow, к которой можно получить доступ из задачи. Одним из наиболее распространенных значений для извлечения из контекста Airflow является ключевое слово ti/task_instance, которое позволяет получить доступ к атрибутам и методам task instance объекта.
Другими распространенными причинами доступа к контексту Airflow являются:
- Вы хотите использовать параметры уровня DAG в своих задачах Airflow.
- Вы хотите использовать logical date запуска DAG в задаче Airflow, например, как часть имени файла.
- Вы хотите явно передавать и извлекать значения в XCom с помощью custom key.
- Вы хотите, чтобы действие в вашей задаче зависело от настройки определенной конфигурации Airflow.
Контекст DAG в Apache Airflow — это набор переменных и метаданных, которые доступны внутри каждой задачи и самого DAG во время выполнения.
Эти переменные могут содержать полезную информацию, такую как:
- dag_run: Информация о текущем запуске DAG.
- task_instance: Текущая задача и информация о её состоянии.
- logical_date: Логическая дата выполнения, которая совпадает с execution_date (Дата и время запуска DAG).
- ds: Дата запуска DAG в формате YYYY-MM-DD.
Контекст DAG полезен для получения доступа к этим переменным и метаданным внутри задач. Например, вы можете использовать их для логирования, настройки условий выполнения задач и передачи информации между задачами.
Доступ к контексту
Контекст Airflow доступен во всех задачах Airflow. Вы можете получить доступ к информации из контекста, используя следующие методы:
- Передайте
**context
аргумент функции, используемой в@task
декорированной задаче или PythonOperator. - Используйте шаблоны Jinja в традиционных операторах Airflow.
- Доступ к контексту
kwarg
в.execute
методе любого традиционного или custom operator.
Доступ к контекстному словарю Airflow за пределами задачи невозможен.
Извлечение контекста Airflow с помощью @task декоратора
Чтобы получить доступ к контексту Airflow в @task
декорированной задаче или задаче PythonOperator, вам нужно добавить **context аргумент в функцию задачи. Это сделает контекст доступным как словарь в вашей задаче.
1 2 3 4 5 |
from pprint import pprint @task def print_context(**context) pprint(context) |
В этой вызываемой функции мы извлекаем компоненты datetime из для execution_date динамического построения URL.
Извлечение контекста Airflow с использованием Jinja шаблона
Доступ ко многим элементам контекста Airflow можно получить с помощью шаблонизации Jinja. Вы можете получить список всех параметров, которые допускают шаблоны для любого оператора, распечатав его .template_fields
атрибут.
Например, вы можете получить доступ к логической дате запуска DAG в формате, YYYY-MM-DD
используя шаблон {{ ds }}
в bash_command параметре BashOperator.
1 2 3 4 |
print_logical_date = BashOperator( task_id="print_logical_date", bash_command="echo {{ ds }}", ) |
Также распространено использование шаблонов Jinja для доступа к значениям XCom в параметре традиционной задачи.
В приведенном ниже фрагменте кода первая задача return_greeting передаст строку «Hello» в XCom, а вторая задача greet_friend будет использовать шаблон Jinja для извлечения этого значения из ti
объекта (экземпляра задачи) контекста Airflow и печати Hello friend! 🙂 в log.
1 2 3 4 5 6 7 8 9 10 |
@task def return_greeting(): return "Hello" greet_friend = BashOperator( task_id="greet_friend", bash_command="echo '{{ ti.xcom_pull(task_ids='return_greeting') }} friend! :)'", ) return_greeting() >> greet_friend |
Пример DAG Airflow с выводом контекста на печать
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonOperator import pprint with DAG( "print_dag_context", default_args={ "retries": 2, 'max_active_runs': 1, }, description="Airflow DAG Context Tutorial — Best Practices", schedule='0 9 1 * *', start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), catchup=True, tags=["dag_context"], ) as dag: def print_dag_context(**context): print("Контекст DAG:") pprint.pprint(context) print_dag_context = PythonOperator( task_id="print_dag_context", python_callable=print_dag_context, ) |
Пример выведенных значений из словаря context
Для вышеприведенного DAG значения взяты из запуска для дага от 1 февраля 2024 года:
Выведенный DAG context в лог:
Название | Значение |
conf | <***.configuration.AirflowConfigParser object at 0x762213008cb0> |
conn | None |
dag | <DAG: print_dag_context> |
dag_run | <DagRun print_dag_context @ 2024-02-01 09:00:00+00:00: scheduled__2024-02-01T09:00:00+00:00, state:running, queued_at: 2024-08-31 19:09:52.687357+00:00. externally triggered: False> |
data_interval_end | DateTime(2024, 3, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)) |
data_interval_start | DateTime(2024, 2, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)) |
ds | ‘2024-02-01’ |
ds_nodash | ‘20240201’ |
execution_date | <Proxy at 0x7621f884ed00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘execution_date’, DateTime(2024, 2, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)))> |
inlet_events | InletEventsAccessors(_inlets=[], _datasets={}, _dataset_aliases={}, _session=<sqlalchemy.orm.session.Session object at 0x762211e20590>) |
inlets | [] |
logical_date | DateTime(2024, 2, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)) |
macros | <module ‘***.macros’ from ‘/home/***/.local/lib/python3.12/site-packages/***/macros/__init__.py’> |
map_index_template | None |
next_ds | <Proxy at 0x7621f8a90700 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘next_ds’, ‘2024-03-01’)> |
next_ds_nodash | <Proxy at 0x7621f8c0c7c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘next_ds_nodash’, ‘20240301’)> |
next_execution_date | <Proxy at 0x7621f86d7f00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘next_execution_date’, DateTime(2024, 3, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)))> |
outlet_events | <***.utils.context.OutletEventAccessors object at 0x7621f85b2930> |
outlets | [] |
params | {} |
prev_data_interval_end_success | None |
prev_data_interval_start_success | None |
prev_ds | <Proxy at 0x7621f85d1b40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_ds’, ‘2024-01-01’)> |
prev_ds_nodash | <Proxy at 0x7621f87017c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_ds_nodash’, ‘20240101’)> |
prev_end_date_success | None |
prev_execution_date | <Proxy at 0x7621f871f300 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_execution_date’, DateTime(2024, 1, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)))> |
prev_execution_date_success | <Proxy at 0x7621f8697f40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_execution_date_success’, None)> |
prev_start_date_success | None |
run_id | ‘scheduled__2024-02-01T09:00:00+00:00’ |
task | <Task(PythonOperator): print_dag_context> |
task_instance | <TaskInstance: print_dag_context.print_dag_context scheduled__2024-02-01T09:00:00+00:00 [running]> |
task_instance_key_str | ‘print_dag_context__print_dag_context__20240201’ |
templates_dict | None |
test_mode | False |
ti | <TaskInstance: print_dag_context.print_dag_context scheduled__2024-02-01T09:00:00+00:00 [running]> |
tomorrow_ds | <Proxy at 0x7621f85e1f00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘tomorrow_ds’, ‘2024-02-02’)> |
tomorrow_ds_nodash | <Proxy at 0x7621f86403c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘tomorrow_ds_nodash’, ‘20240202’)> |
triggering_dataset_events | <Proxy at 0x7621f85d2ec0 with factory <function _get_template_context.<locals>.get_triggering_events at 0x7621f85c18a0>> |
ts | ‘2024-02-01T09:00:00+00:00’ |
ts_nodash | ‘20240201T090000’ |
ts_nodash_with_tz | ‘20240201T090000+0000’ |
var | {‘json’: None, ‘value’: None} |
yesterday_ds | <Proxy at 0x7621f8640240 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘yesterday_ds’, ‘2024-01-31’)> |
yesterday_ds_nodash | <Proxy at 0x7621f8640540 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘yesterday_ds_nodash’, ‘20240131’)> |
Описание параметров контекста
- conf: Объект, представляющий текущие конфигурации Airflow. Содержит настройки и параметры, используемые Airflow для выполнения задач.
- conn: Переменная для хранения соединений (connections). Может быть None, если соединения не требуются или не используются в контексте.
- dag: Объект текущего DAG, в рамках которого выполняется задача.
- dag_run: Объект текущего выполнения DAG, который содержит информацию о запуске DAG, состоянии выполнения, времени постановки в очередь и т.д.
- data_interval_end: Конечная точка временного интервала, к которому относится текущий DAG run (запуск).
- data_interval_start: Начальная точка временного интервала, к которому относится текущий DAG run.
- ds: Дата запуска DAG в формате YYYY-MM-DD.
- ds_nodash: Дата запуска DAG в формате YYYYMMDD (без тире).
- execution_date: Дата и время запуска DAG. Может быть проксированным объектом для отслеживания изменений.
- expanded_ti_count: Число развернутых задач, используется в контексте динамических задач.
- inlet_events: Объект, содержащий события, связанные с входными данными для текущей задачи.
- inlets: Список входных данных или задач, от которых зависит текущая задача.
- logical_date: Логическая дата выполнения, которая совпадает с execution_date.
- macros: Модуль, содержащий макросы Airflow, которые можно использовать для шаблонизации команд и выражений.
- map_index_template: Шаблон для индексов маппинга задач, используется в контексте динамически распределяемых задач.
- next_ds: Дата следующего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
- next_ds_nodash: Дата следующего запуска DAG в формате YYYYMMDD (без тире). Может быть проксированным объектом.
- next_execution_date: Дата и время следующего выполнения DAG. Может быть проксированным объектом.
- outlet_events: Объект, содержащий события, связанные с выходными данными для текущей задачи.
- outlets: Список выходных данных или задач, на которые влияет текущая задача.
- params: Словарь параметров, передаваемых в DAG и задачи. Обычно используется для передачи динамических значений.
- prev_data_interval_end_success: Конечная точка временного интервала предыдущего успешного запуска DAG.
- prev_data_interval_start_success: Начальная точка временного интервала предыдущего успешного запуска DAG.
- prev_ds: Дата предыдущего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
- prev_ds_nodash: Дата предыдущего запуска DAG в формате YYYYMMDD (без тире). Может быть проксированным объектом.
- prev_end_date_success: Дата и время окончания предыдущего успешного выполнения DAG.
- prev_execution_date: Дата и время предыдущего выполнения DAG. Может быть проксированным объектом.
- prev_execution_date_success: Дата и время предыдущего успешного выполнения DAG. Может быть проксированным объектом.
- prev_start_date_success: Дата и время начала предыдущего успешного выполнения DAG.
- run_id: Уникальный идентификатор текущего выполнения DAG.
- task: Объект текущей задачи (Task), которая выполняется в рамках DAG.
- task_instance: Объект TaskInstance, представляющий текущее выполнение задачи, содержащий информацию о её состоянии и параметрах.
- task_instance_key_str: Строковое представление ключа текущего выполнения задачи, уникальное для каждой задачи и запуска.
- templates_dict: Словарь с динамическими значениями, используемыми в шаблонах задач.
- test_mode: Булев флаг, указывающий, выполняется ли задача в тестовом режиме.
- ti: Короткий алиас для task_instance.
- tomorrow_ds: Дата завтрашнего дня относительно текущего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
- tomorrow_ds_nodash: Дата завтрашнего дня в формате YYYYMMDD (без тире). Может быть проксированным объектом.
- triggering_dataset_events: События, которые активировали текущий запуск DAG, связанные с наборами данных.
- ts: Метка времени запуска DAG в формате YYYY-MM-DDTHH:MM:SS+00:00.
- ts_nodash: Метка времени запуска DAG в формате YYYYMMDDTHHMMSS (без тире).
- ts_nodash_with_tz: Метка времени запуска DAG в формате YYYYMMDDTHHMMSS+ZZZZ, включая временную зону.
- var: Словарь переменных Airflow, включающий значения в виде JSON и простых строк.
- yesterday_ds: Дата вчерашнего дня относительно текущего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
- yesterday_ds_nodash: Дата вчерашнего дня в формате YYYYMMDD (без тире). Может быть проксированным объектом.
Airflow 2.10.0 по умолчанию передает несколько переменных для работы с интервалами:
Переменная | Описание |
---|---|
{{ data_interval_start }} |
Начало интервала данных. |
{{ data_interval_end }} |
Конец интервала данных |
{{ logical_date }} |
Дата-время, которое логически идентифицирует текущий запуск DAG. Это значение не содержит никакой семантики, а является просто значением для идентификации.
Используйте
data_interval_start and data_interval_end вместо этого, если вам нужно значение, имеющее реальную семантику,например, для получения среза строк из базы данных на основе временных меток.
|
{{ ds }} |
Логическая дата запуска DAG —
YYYY-MM-DD .То же, что и
{{ logical_date | ds }} . |
{{ ds_nodash }} |
То же, что и {{ logical_date | ds_nodash }} . |
{{ ts }} |
То же, что и
{{ logical_date | ts }} .Пример:
2018-01-01T00:00:00+00:00 . |
{{ ts_nodash_with_tz }} |
То же, что и
{{ logical_date | ts_nodash_with_tz }} Пример:
20180101T000000+0000 . |
{{ ts_nodash }} |
То же, что и
{{ logical_date | ts_nodash }} .Пример:
20180101T000000 . |
{{ prev_data_interval_start_success }} |
Начало интервала данных предыдущего успешного
DagRun . |
{{ prev_data_interval_end_success }} |
Конец интервала данных предыдущего успешного
DagRun . |
{{ prev_start_date_success }} |
Дата начала с предыдущего успешного выполнения DagRun (если доступно). |
{{ prev_end_date_success }} |
Дата окончания предыдущего успешного периода DagRun(если имеется). |
Устаревшие переменные (Deprecated variables) Apache Airflow 2.10.0
Следующие переменные устарели. Они сохранены для обратной совместимости, но вам следует преобразовать существующий код, чтобы использовать вместо них другие переменные.
Deprecated Variable | Описание |
---|---|
{{ execution_date }} |
the execution date (logical date), same as logical_date |
{{ next_execution_date }} |
the logical date of the next scheduled run (if applicable); you may be able to use data_interval_end instead |
{{ next_ds }} |
the next execution date as YYYY-MM-DD if exists, else None |
{{ next_ds_nodash }} |
the next execution date as YYYYMMDD if exists, else None |
{{ prev_execution_date }} |
the logical date of the previous scheduled run (if applicable) |
{{ prev_ds }} |
the previous execution date as YYYY-MM-DD if exists, else None |
{{ prev_ds_nodash }} |
the previous execution date as YYYYMMDD if exists, else None |
{{ yesterday_ds }} |
the day before the execution date as YYYY-MM-DD |
{{ yesterday_ds_nodash }} |
the day before the execution date as YYYYMMDD |
{{ tomorrow_ds }} |
the day after the execution date as YYYY-MM-DD |
{{ tomorrow_ds_nodash }} |
the day after the execution date as YYYYMMDD |
{{ prev_execution_date_success }} |
execution date from prior successful DAG run; you may be able to use prev_data_interval_start_success instead if the timetable/schedule you use for the DAG defines data_interval_start compatible with the legacy execution_date . |
Концепция планирования (Airflow Schedule)
Чтобы лучше понять DAG Schedule, важно ознакомиться со следующими терминами и параметрами:
- Data Interval (Интервал данных): Cвойство каждого запуска DAG, представляющее период данных, с которым должна работать каждая задача. Например, для DAG, запланированного на час, каждый интервал данных начинается в начале часа (минута 0) и заканчивается в конце часа (минута 59). Запуск DAG обычно выполняется в конце интервала данных.
- Logical Date (Логическая дата): Начало интервала данных. Она не показывает, когда будет выполнена группа DAG. До Airflow 2.2 это называлось датой выполнения (execution date).
Logical Date используется для идентификации конкретного выполнения DAG-а и помогает с версиями данных, которые обрабатываются. - Timetable (Расписание): Cвойство DAG, которое определяет интервал данных и логическую дату для каждого запуска DAG, а также определяет, когда планируется запуск DAG. Timetable заменяет стандартный schedule_interval и дает более гибкий контроль над запуском DAG-а, позволяя точно задать моменты времени или сложные интервалы для запуска.
- Run After — это фактическое время, когда задача (DAG Run) была запущена или должна была быть запущена. Это время указывает на то, когда Airflow реально пытается выполнить задачу. Эта дата отображается в пользовательском интерфейсе Airflow и может совпадать с концом интервала данных в зависимости от расписания вашего DAG.
- Backfilling — это процесс запуска задач за прошедшие периоды времени, которые были пропущены, например, из-за недоступности DAG-а или отказа системы. Он позволяет «догнать» все пропущенные данные.
- Catchup — это возможность Airflow автоматически запускать пропущенные выполнения DAG-а за предыдущие периоды, если в них были пропуски. Этот механизм включен по умолчанию и помогает поддерживать данные актуальными.
Разбор кейса с переменными из контекста с запуском 1 раз в месяц
Рассмотрим пример, для DAG:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonOperator import pprint with DAG( "print_dag_context", default_args={ "retries": 2, 'max_active_runs': 1, }, description="Airflow DAG Context Tutorial — Best Practices", schedule='0 9 1 * *', start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), catchup=True, tags=["dag_context"], ) as dag: def print_dag_context(**context): print("Контекст DAG:") print(f"logical_date = { context['logical_date'] }") print(f"ds = { context['ds'] }") print(f"data_interval_start = { context['data_interval_start'] }") print(f"data_interval_end = { context['data_interval_end'] }") print_dag_context = PythonOperator( task_id="print_dag_context", python_callable=print_dag_context, ) |
В Airflow запуск будет выглядеть так:
Тогда схема выполнения дага будет следущей:
Разбор кейса с переменными из контекста с запуском 1 раз в день
Рассмотрим DAG с запуском каждый день в 9:00 по UTC:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonOperator import pprint with DAG( "print_dag_context", default_args={ "retries": 1, 'max_active_runs': 1, }, description="Airflow DAG Context Tutorial — Best Practices", schedule='0 9 * * *', start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), end_date=pendulum.datetime(2024, 2, 1, tz="UTC"), catchup=True, tags=["dag_context"], ) as dag: def print_dag_context(**context): print("Контекст DAG:") print(f"logical_date = { context['logical_date'] }") print(f"ds = { context['ds'] }") print(f"data_interval_start = { context['data_interval_start'] }") print(f"data_interval_end = { context['data_interval_end'] }") print_dag_context = PythonOperator( task_id="print_dag_context", python_callable=print_dag_context, ) |
Получаем:
Leave a Reply