Airflow DAG Context Tutorial — Best Practices

Что такое DAG Context?

Контекст DAG в Apache Airflow — это словарь, содержащий информацию о работающем DAG и его среде Airflow, к которой можно получить доступ из задачи. Одним из наиболее распространенных значений для извлечения из контекста Airflow является ключевое слово ti/task_instance, которое позволяет получить доступ к атрибутам и методам task instance объекта.

Другими распространенными причинами доступа к контексту Airflow являются:

  • Вы хотите использовать параметры уровня DAG в своих задачах Airflow.
  • Вы хотите использовать logical date запуска DAG в задаче Airflow, например, как часть имени файла.
  • Вы хотите явно передавать и извлекать значения в XCom с помощью custom key.
  • Вы хотите, чтобы действие в вашей задаче зависело от настройки определенной конфигурации Airflow.

Контекст DAG в Apache Airflow — это набор переменных и метаданных, которые доступны внутри каждой задачи и самого DAG во время выполнения.

Эти переменные могут содержать полезную информацию, такую как:

  • dag_run: Информация о текущем запуске DAG.
  • task_instance: Текущая задача и информация о её состоянии.
  • logical_date: Логическая дата выполнения, которая совпадает с execution_date (Дата и время запуска DAG).
  • ds: Дата запуска DAG в формате YYYY-MM-DD.

Контекст DAG полезен для получения доступа к этим переменным и метаданным внутри задач. Например, вы можете использовать их для логирования, настройки условий выполнения задач и передачи информации между задачами.

Доступ к контексту

Контекст Airflow доступен во всех задачах Airflow. Вы можете получить доступ к информации из контекста, используя следующие методы:

  • Передайте **context аргумент функции, используемой в @task декорированной задаче или PythonOperator.
  • Используйте шаблоны Jinja в традиционных операторах Airflow.
  • Доступ к контексту kwarg в .execute методе любого традиционного или custom operator.

Доступ к контекстному словарю Airflow за пределами задачи невозможен.

Извлечение контекста Airflow с помощью @task декоратора

Чтобы получить доступ к контексту Airflow в @task декорированной задаче или задаче PythonOperator, вам нужно добавить **context аргумент в функцию задачи. Это сделает контекст доступным как словарь в вашей задаче.

В этой вызываемой функции мы извлекаем компоненты datetime из для execution_date динамического построения URL.

Извлечение контекста Airflow с использованием Jinja шаблона

Доступ ко многим элементам контекста Airflow можно получить с помощью шаблонизации Jinja. Вы можете получить список всех параметров, которые допускают шаблоны для любого оператора, распечатав его .template_fields атрибут.

Например, вы можете получить доступ к логической дате запуска DAG в формате, YYYY-MM-DD используя шаблон {{ ds }} в bash_command параметре BashOperator.

Также распространено использование шаблонов Jinja для доступа к значениям XCom в параметре традиционной задачи.

В приведенном ниже фрагменте кода первая задача return_greeting передаст строку «Hello» в XCom, а вторая задача greet_friend будет использовать шаблон Jinja для извлечения этого значения из ti объекта (экземпляра задачи) контекста Airflow и печати Hello friend! 🙂 в log.

Пример DAG Airflow с выводом контекста на печать

Пример выведенных значений из словаря context

Для вышеприведенного DAG значения взяты из запуска для дага от 1 февраля 2024 года:

Выведенный DAG context в лог:

Название Значение
conf <***.configuration.AirflowConfigParser object at 0x762213008cb0>
conn None
dag <DAG: print_dag_context>
dag_run <DagRun print_dag_context @ 2024-02-01 09:00:00+00:00: scheduled__2024-02-01T09:00:00+00:00, state:running, queued_at: 2024-08-31 19:09:52.687357+00:00. externally triggered: False>
data_interval_end DateTime(2024, 3, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’))
data_interval_start DateTime(2024, 2, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’))
ds ‘2024-02-01’
ds_nodash ‘20240201’
execution_date <Proxy at 0x7621f884ed00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘execution_date’, DateTime(2024, 2, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)))>
inlet_events InletEventsAccessors(_inlets=[], _datasets={}, _dataset_aliases={}, _session=<sqlalchemy.orm.session.Session object at 0x762211e20590>)
inlets []
logical_date DateTime(2024, 2, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’))
macros <module ‘***.macros’ from ‘/home/***/.local/lib/python3.12/site-packages/***/macros/__init__.py’>
map_index_template None
next_ds <Proxy at 0x7621f8a90700 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘next_ds’, ‘2024-03-01’)>
next_ds_nodash <Proxy at 0x7621f8c0c7c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘next_ds_nodash’, ‘20240301’)>
next_execution_date <Proxy at 0x7621f86d7f00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘next_execution_date’, DateTime(2024, 3, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)))>
outlet_events <***.utils.context.OutletEventAccessors object at 0x7621f85b2930>
outlets []
params {}
prev_data_interval_end_success None
prev_data_interval_start_success None
prev_ds <Proxy at 0x7621f85d1b40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_ds’, ‘2024-01-01’)>
prev_ds_nodash <Proxy at 0x7621f87017c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_ds_nodash’, ‘20240101’)>
prev_end_date_success None
prev_execution_date <Proxy at 0x7621f871f300 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_execution_date’, DateTime(2024, 1, 1, 9, 0, 0, tzinfo=Timezone(‘UTC’)))>
prev_execution_date_success <Proxy at 0x7621f8697f40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘prev_execution_date_success’, None)>
prev_start_date_success None
run_id ‘scheduled__2024-02-01T09:00:00+00:00’
task <Task(PythonOperator): print_dag_context>
task_instance <TaskInstance: print_dag_context.print_dag_context scheduled__2024-02-01T09:00:00+00:00 [running]>
task_instance_key_str ‘print_dag_context__print_dag_context__20240201’
templates_dict None
test_mode False
ti <TaskInstance: print_dag_context.print_dag_context scheduled__2024-02-01T09:00:00+00:00 [running]>
tomorrow_ds <Proxy at 0x7621f85e1f00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘tomorrow_ds’, ‘2024-02-02’)>
tomorrow_ds_nodash <Proxy at 0x7621f86403c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘tomorrow_ds_nodash’, ‘20240202’)>
triggering_dataset_events <Proxy at 0x7621f85d2ec0 with factory <function _get_template_context.<locals>.get_triggering_events at 0x7621f85c18a0>>
ts ‘2024-02-01T09:00:00+00:00’
ts_nodash ‘20240201T090000’
ts_nodash_with_tz ‘20240201T090000+0000’
var {‘json’: None, ‘value’: None}
yesterday_ds <Proxy at 0x7621f8640240 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘yesterday_ds’, ‘2024-01-31’)>
yesterday_ds_nodash <Proxy at 0x7621f8640540 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7621f85c2840>, ‘yesterday_ds_nodash’, ‘20240131’)>

Описание параметров контекста

  • conf: Объект, представляющий текущие конфигурации Airflow. Содержит настройки и параметры, используемые Airflow для выполнения задач.
  • conn: Переменная для хранения соединений (connections). Может быть None, если соединения не требуются или не используются в контексте.
  • dag: Объект текущего DAG, в рамках которого выполняется задача.
  • dag_run: Объект текущего выполнения DAG, который содержит информацию о запуске DAG, состоянии выполнения, времени постановки в очередь и т.д.
  • data_interval_end: Конечная точка временного интервала, к которому относится текущий DAG run (запуск).
  • data_interval_start: Начальная точка временного интервала, к которому относится текущий DAG run.
  • ds: Дата запуска DAG в формате YYYY-MM-DD.
  • ds_nodash: Дата запуска DAG в формате YYYYMMDD (без тире).
  • execution_date: Дата и время запуска DAG. Может быть проксированным объектом для отслеживания изменений.
  • expanded_ti_count: Число развернутых задач, используется в контексте динамических задач.
  • inlet_events: Объект, содержащий события, связанные с входными данными для текущей задачи.
  • inlets: Список входных данных или задач, от которых зависит текущая задача.
  • logical_date: Логическая дата выполнения, которая совпадает с execution_date.
  • macros: Модуль, содержащий макросы Airflow, которые можно использовать для шаблонизации команд и выражений.
  • map_index_template: Шаблон для индексов маппинга задач, используется в контексте динамически распределяемых задач.
  • next_ds: Дата следующего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
  • next_ds_nodash: Дата следующего запуска DAG в формате YYYYMMDD (без тире). Может быть проксированным объектом.
  • next_execution_date: Дата и время следующего выполнения DAG. Может быть проксированным объектом.
  • outlet_events: Объект, содержащий события, связанные с выходными данными для текущей задачи.
  • outlets: Список выходных данных или задач, на которые влияет текущая задача.
  • params: Словарь параметров, передаваемых в DAG и задачи. Обычно используется для передачи динамических значений.
  • prev_data_interval_end_success: Конечная точка временного интервала предыдущего успешного запуска DAG.
  • prev_data_interval_start_success: Начальная точка временного интервала предыдущего успешного запуска DAG.
  • prev_ds: Дата предыдущего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
  • prev_ds_nodash: Дата предыдущего запуска DAG в формате YYYYMMDD (без тире). Может быть проксированным объектом.
  • prev_end_date_success: Дата и время окончания предыдущего успешного выполнения DAG.
  • prev_execution_date: Дата и время предыдущего выполнения DAG. Может быть проксированным объектом.
  • prev_execution_date_success: Дата и время предыдущего успешного выполнения DAG. Может быть проксированным объектом.
  • prev_start_date_success: Дата и время начала предыдущего успешного выполнения DAG.
  • run_id: Уникальный идентификатор текущего выполнения DAG.
  • task: Объект текущей задачи (Task), которая выполняется в рамках DAG.
  • task_instance: Объект TaskInstance, представляющий текущее выполнение задачи, содержащий информацию о её состоянии и параметрах.
  • task_instance_key_str: Строковое представление ключа текущего выполнения задачи, уникальное для каждой задачи и запуска.
  • templates_dict: Словарь с динамическими значениями, используемыми в шаблонах задач.
  • test_mode: Булев флаг, указывающий, выполняется ли задача в тестовом режиме.
  • ti: Короткий алиас для task_instance.
  • tomorrow_ds: Дата завтрашнего дня относительно текущего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
  • tomorrow_ds_nodash: Дата завтрашнего дня в формате YYYYMMDD (без тире). Может быть проксированным объектом.
  • triggering_dataset_events: События, которые активировали текущий запуск DAG, связанные с наборами данных.
  • ts: Метка времени запуска DAG в формате YYYY-MM-DDTHH:MM:SS+00:00.
  • ts_nodash: Метка времени запуска DAG в формате YYYYMMDDTHHMMSS (без тире).
  • ts_nodash_with_tz: Метка времени запуска DAG в формате YYYYMMDDTHHMMSS+ZZZZ, включая временную зону.
  • var: Словарь переменных Airflow, включающий значения в виде JSON и простых строк.
  • yesterday_ds: Дата вчерашнего дня относительно текущего запуска DAG в формате YYYY-MM-DD. Может быть проксированным объектом.
  • yesterday_ds_nodash: Дата вчерашнего дня в формате YYYYMMDD (без тире). Может быть проксированным объектом.

Airflow 2.10.0 по умолчанию передает несколько переменных для работы с интервалами:

Переменная Описание
{{ data_interval_start }} Начало интервала данных.
{{ data_interval_end }} Конец интервала данных
{{ logical_date }}
Дата-время, которое логически идентифицирует текущий запуск DAG. Это значение не содержит никакой семантики, а является просто значением для идентификации.
Используйте data_interval_start and data_interval_end вместо этого, если вам нужно значение, имеющее реальную семантику,
например, для получения среза строк из базы данных на основе временных меток.
{{ ds }}
Логическая дата запуска DAG — YYYY-MM-DD.
То же, что и {{ logical_date | ds }}.
{{ ds_nodash }} То же, что и {{ logical_date | ds_nodash }}.
{{ ts }}
То же, что и {{ logical_date | ts }}.
Пример: 2018-01-01T00:00:00+00:00.
{{ ts_nodash_with_tz }}
То же, что и {{ logical_date | ts_nodash_with_tz }}
Пример: 20180101T000000+0000.
{{ ts_nodash }}
То же, что и {{ logical_date | ts_nodash }}.
Пример: 20180101T000000.
{{ prev_data_interval_start_success }}
Начало интервала данных предыдущего успешного DagRun.
{{ prev_data_interval_end_success }}
Конец интервала данных предыдущего успешного DagRun.
{{ prev_start_date_success }} Дата начала с предыдущего успешного выполнения DagRun(если доступно).
{{ prev_end_date_success }} Дата окончания предыдущего успешного периода DagRun(если имеется).

Устаревшие переменные (Deprecated variables) Apache Airflow 2.10.0

Следующие переменные устарели. Они сохранены для обратной совместимости, но вам следует преобразовать существующий код, чтобы использовать вместо них другие переменные.

Deprecated Variable Описание
{{ execution_date }} the execution date (logical date), same as logical_date
{{ next_execution_date }} the logical date of the next scheduled run (if applicable); you may be able to use data_interval_end instead
{{ next_ds }} the next execution date as YYYY-MM-DD if exists, else None
{{ next_ds_nodash }} the next execution date as YYYYMMDD if exists, else None
{{ prev_execution_date }} the logical date of the previous scheduled run (if applicable)
{{ prev_ds }} the previous execution date as YYYY-MM-DD if exists, else None
{{ prev_ds_nodash }} the previous execution date as YYYYMMDD if exists, else None
{{ yesterday_ds }} the day before the execution date as YYYY-MM-DD
{{ yesterday_ds_nodash }} the day before the execution date as YYYYMMDD
{{ tomorrow_ds }} the day after the execution date as YYYY-MM-DD
{{ tomorrow_ds_nodash }} the day after the execution date as YYYYMMDD
{{ prev_execution_date_success }} execution date from prior successful DAG run; you may be able to use prev_data_interval_start_success instead if the timetable/schedule you use for the DAG defines data_interval_start compatible with the legacy execution_date.

Концепция планирования (Airflow Schedule)

Чтобы лучше понять DAG Schedule, важно ознакомиться со следующими терминами и параметрами:

  • Data Interval (Интервал данных): Cвойство каждого запуска DAG, представляющее период данных, с которым должна работать каждая задача. Например, для DAG, запланированного на час, каждый интервал данных начинается в начале часа (минута 0) и заканчивается в конце часа (минута 59). Запуск DAG обычно выполняется в конце интервала данных.
  • Logical Date (Логическая дата): Начало интервала данных. Она не показывает, когда будет выполнена группа DAG. До Airflow 2.2 это называлось датой выполнения (execution date).
    Logical Date используется для идентификации конкретного выполнения DAG-а и помогает с версиями данных, которые обрабатываются.
  • Timetable (Расписание): Cвойство DAG, которое определяет интервал данных и логическую дату для каждого запуска DAG, а также определяет, когда планируется запуск DAG. Timetable заменяет стандартный schedule_interval и дает более гибкий контроль над запуском DAG-а, позволяя точно задать моменты времени или сложные интервалы для запуска.
  • Run After — это фактическое время, когда задача (DAG Run) была запущена или должна была быть запущена. Это время указывает на то, когда Airflow реально пытается выполнить задачу. Эта дата отображается в пользовательском интерфейсе Airflow и может совпадать с концом интервала данных в зависимости от расписания вашего DAG.
  • Backfilling — это процесс запуска задач за прошедшие периоды времени, которые были пропущены, например, из-за недоступности DAG-а или отказа системы. Он позволяет «догнать» все пропущенные данные.
  • Catchup — это возможность Airflow автоматически запускать пропущенные выполнения DAG-а за предыдущие периоды, если в них были пропуски. Этот механизм включен по умолчанию и помогает поддерживать данные актуальными.

Разбор кейса с переменными из контекста с запуском 1 раз в месяц

Рассмотрим пример, для DAG:

В Airflow запуск будет выглядеть так:

Тогда схема выполнения дага будет следущей:

Разбор кейса с переменными из контекста с запуском 1 раз в день

Рассмотрим DAG с запуском каждый день в 9:00 по UTC:

Получаем:

Статьи на английском по Airflow

0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x