Apache Airflow: docker, python, DAG, конвейер обработки данных

Contents

Что такое Apache Airflow?

Apache Airflow — это платформа управления рабочими процессами обработки данных с открытым исходным кодом (фактически это инструмент для построения конвейеров обработки данных, либо оркестратор, который позволяет запускать процессы в сторонней системе). Он был запущен в Airbnb в октябре 2014 года (автор Maxime Beauchemin), как решение для управления все более сложными workflow компании. Создание Airflow позволило Airbnb программно создавать и планировать свои рабочие процессы и отслеживать их через встроенный пользовательский интерфейс Airflow. С самого начала исходный код проекта был открыт, в марте 2016 года он стал проектом Apache Incubator, а в январе 2019 года — проектом верхнего уровня Apache Software Foundation.

Airflow написан на Python, а рабочие процессы создаются с помощью скриптов Python. Apache Airflow спроектирован по принципу «конфигурация как код». В то время как существуют другие платформы рабочих процессов «конфигурация как код», использующие языки разметки, такие как XML, использование Python позволяет разработчикам импортировать библиотеки и классы.

Краткое описание функционала и предназначения

Apache Airflow — это планировщик задач, используемый для планирования, создания и отслеживания процессов. Он выходит за рамки управления надежным оркестратором данных.

Airflow — это фреймворк для оркестровки.

Что именно он может сделать?

  • Запуск заданий ETL/ELT (например python скриптов)
  • Обучение моделей машинного обучения
  • Работа Airflow как Track system
  • Создание рабочих процессов через DAGs
  • Управление расписанием задач, в том числе во внешних системах (например, запуск задач в QMC QlikView/Qlik Sense)
  • Настройка зависимостей событий/задач/рабочих процессов
  • Управление программным рабочим процессом
  • Airflow можно использовать для создания отчетов
  • Резервное копирование и другие задачи DevOps

Однако важно помнить, что Apache Airflow не обрабатывает потоковые рабочие процессы в реальном времени.

В версии Airflow 2.0+ были сделаны улучшения:

  • Горизонтальная масштабируемость. Если нагрузка задач на один планировщик увеличивается, пользователь теперь может запускать дополнительные «реплики» планировщика, чтобы увеличить пропускную способность своего развертывания воздушного потока.
  • Уменьшена задержка выполнения задачи. В Airflow 2.0 даже один планировщик позволяет планировать задачи с гораздо большей скоростью при том же уровне загрузки ЦП и памяти.
  • В Airflow 2.0 представлен новый комплексный REST API, который заложил прочную основу для нового пользовательского интерфейса и командной строки Airflow в будущем.

Плюсы Apache Airflow

  • Открытый исходный код. AirFlow активно поддерживается сообществом и имеет хорошо описанную документацию.
  • На основе Python. Python считается относительно простым языком для освоения и общепризнанным стандартом для специалистов в области Big Data и Data Science.
  • Когда ETL-процессы определены как код, они становятся более удобными для разработки, тестирования и сопровождения. Также устраняется необходимость использовать JSON- или XML-конфигурационные файлы для описания пайплайнов.
  • Богатый инструментарий и дружественный UI. Работа с AirFlow возможна при помощи CLI, REST API и веб-интерфейса, построенного на основе Python-фреймворка Flask.
  • Интеграция со множеством источников данных и сервисов. AirFlow поддерживает множество баз данных и Big Data-хранилищ: MySQL, PostgreSQL, MongoDB, Redis, Apache Hive, Apache Spark, Apache Hadoop, объектное хранилище S3 и другие.
  • Кастомизация. Есть возможность настройки собственных операторов.
  • Масштабируемость. Допускается неограниченное число DAG за счет модульной архитектуры и очереди сообщений. Worker могут масштабироваться при использовании Celery или Kubernetes.
  • Мониторинг и алертинг. Поддерживается интеграция с Statsd и FluentD — для сбора и отправки метрик и логов. Также доступен Airflow-exporter для интеграции с Prometheus.
  • Возможность настройки ролевого доступа. По умолчанию AirFlow предоставляет 5 ролей с различными уровнями доступа: Admin, Public, Viewer, Op, User. Также допускается создание собственных ролей с доступом к ограниченному числу DAG.
  • Дополнительно возможна интеграция с Active Directory и гибкая настройка доступов с помощью RBAC (Role-Based Access Control).
  • Поддержка тестирования. Можно добавить базовые Unit-тесты, которые будут проверять как пайплайны в целом, так и конкретные задачи в них.

Минусы Apache Airflow

  • При проектировании задач важно соблюдать идемпотентность: задачи должны быть написаны так, чтобы независимо от количества их запусков, для одних и тех же входных параметров возвращался одинаковый результат.
  • Необходимо разобраться в механизмах обработки execution_date. Важно понимать, что корректировки кода задач будут отражаться на всех их запусках за предыдущее время. Это исключает воспроизводимость результатов, но, с другой стороны, позволяет получить результаты работы новых алгоритмов за прошлые периоды.
  • Нет возможности спроектировать DAG в графическом виде, как это, например, доступно в Apache NiFi. Многие видят в этом, напротив, плюс, так как ревью кода проводится легче, чем ревью схем.
  • Высокая кривая обучения: Поскольку у Apache Airflow крутая кривая обучения, пользователям, особенно новичкам, может быть сложно адаптироваться к среде и выполнять такие задачи, как написание тестовых примеров для конвейеров данных, обрабатывающих необработанные данные.
  • Документация по Airflow сносная, но не очень хорошая, и трудно понять, каковы лучшие практики, когда вы начинаете, потому что существует так много разных способов сделать что-то, и большинство примеров, которые вы найдете, слишком упрощены.
  • Проблемы с переименованием: каждый раз, когда вы изменяете интервалы расписания, Apache Airflow просит вас переименовать ваши DAG, чтобы гарантировать, что ваши предыдущие экземпляры задач соответствуют новому периоду времени.
  • Удаляет метаданные: поскольку в конвейерах данных Apache Airflow отсутствует система контроля версий, если вы удаляете задание из своего кода DAG, а затем повторно развертываете его, все метаданные, связанные с транзакцией, автоматически удаляются.

Конкуренты Apache Airflow

Luigi

Luigi — это пакет Python, который выполняет длительную пакетную обработку. Это означает, что он управляет автоматическим выполнением процессов обработки данных для нескольких объектов в пакете. Задание обработки данных можно определить как серию зависимых задач в Luigi. Луиджи выясняет, какие задачи ему нужно выполнить, чтобы завершить задачу. Он обеспечивает основу для создания конвейеров обработки данных и управления ими в целом. Он был создан Spotify , чтобы помочь им управлять группами заданий, требующих извлечения и обработки данных из ряда источников.

Apache NiFi

Apache NiFi — это бесплатное приложение с открытым исходным кодом , которое автоматизирует передачу данных между системами. Приложение поставляется с пользовательским веб-интерфейсом для управления масштабируемыми ориентированными графами маршрутизации данных, преобразования и логики посредничества системы. Это сложная и надежная система обработки и распространения данных. Для редактирования данных во время выполнения он предоставляет очень гибкий и адаптируемый метод потока данных.

Dagster

Dagster — это машинное обучение , аналитика и оркестратор данных ETL . Поскольку он выполняет основную функцию планирования, эффективного упорядочивания и мониторинга вычислений, Dagster можно использовать в качестве альтернативы или замены для Airflow (и других классических механизмов рабочего процесса). Однако он выходит за рамки обычного определения оркестратора, заново изобретая весь сквозной процесс разработки и развертывания приложений для работы с данными.

Kedro

Kedro — это платформа Python с открытым исходным кодом для написания повторяемого, управляемого и модульного кода Data Science. Модульность, разделение задач и управление версиями относятся к числу идей, заимствованных из лучших практик разработки программного обеспечения и применяемых к алгоритмам машинного обучения.

Apache Oozie

Одним из сервисов/приложений планировщика рабочих процессов, работающих в кластере Hadoop, является Apache Oozie . Он используется для обработки задач Hadoop, таких как Hive, Sqoop, SQL, MapReduce и операций HDFS, таких как distcp. Это система, которая управляет рабочим процессом работ, которые зависят друг от друга. Здесь пользователи могут создавать направленные ациклические графы процессов, которые могут выполняться в Hadoop параллельно или последовательно.

Apache Oozie также вполне адаптируется . Задания можно просто запускать, останавливать, приостанавливать и перезапускать. Повторный запуск сбойных процессов с Oozie очень прост. Можно даже полностью обойти отказавший узел.

Таблица с характеристиками конкурентов (не всех вышеперечисленных)

Архитектура Apache Airflow

Обзор интерфейса Apache Airflow

Airflow поставляется с пользовательским интерфейсом, который позволяет вам видеть, что делают DAG и их задачи, запускать задачи DAG, просматривать журналы и выполнять некоторую ограниченную отладку и решение проблем с вашими DAG.

обзор интерфейса apache airflow 2.0 DAGs python run start

Краткий обзор меню Apache Airflow

Обзор основных сущностей Apache Airflow

  • Процессы обработки данных, или пайплайны, в Airflow описываются при помощи DAG (Directed Acyclic Graph). DAG — это сущность, объединяющая ваши задачи в цепочку задач, в которой явно видны зависимости между узлами.
  • В качестве узлов DAG выступают задачи (Task) — операции, применяемые к данным.
  • За реализацию задач отвечают операторы (Operator) — это шаблоны для выполнения задач.
  • Особую группу операторов составляют сенсоры (Sensor), позволяющие создавать триггер на событие.
  • Существует возможность добавления пользовательского оператора через расширение базового класса BaseOperator.
  • Hooks (Хуки) — это внешние интерфейсы для работы с различными сервисами: базы данных, внешние API ресурсы, распределенные хранилища типа S3, redis, memcached и т.д. Hooks являются строительными блоками операторов и берут на себя всю логику по взаимодействию с хранилищем конфигов и доступов.
  • XCom (кросс-коммуникация) обеспечивает способ обмена сообщениями или данными между различными задачами (или между операторами). Xcom по своей сути представляет собой таблицу, в которой хранятся пары ключ-значение, а также отслеживаются, какая пара была предоставлена ​​какой задачей и dag.

Перечень операторов

Python Operator Исполнение Python-кода
BranchPythonOperator Запуск задач в зависимости от выполнения заданного условия
BashOperator Запуск Bash-скриптов
SimpleHttpOperator Отправка HTTP-запросов
MySqlOperator Отправка запросов к базе данных MySQL
PostgresOperator Отправка запросов к базе данных PostgreSQL
S3FileTransformOperator Загрузка данных из S3 во временную директорию в локальной файловой системе, преобразование согласно указанному сценарию и сохранение результатов обработки в S3
DockerOperator Запуск Docker-контейнера под выполнение задачи
KubernetesPodOperator Создание отдельного Pod под выполнение задачи. Используется совместно с K8s
SqlSensor Проверка выполнения SQL-запроса
SlackAPIOperator Отправка сообщений в Slack
EmailOperator Отправка электронных писем
DummyOperator «Пустой» оператор, который можно использовать для группировки задач

DAG (Directed Acyclic Graph)

DAG (Направленный ациклический граф) — это основная концепция Airflow, объединяющая задачи вместе, организованная через настроенные зависимости и отношения, которые определяют как должна отработать цепочка задач (или data pipeline). Задачи, входящие в состав DAG, в Apache Airflow называются операторами(Operator).

Классическая схема DAG:

Пример DAG показан на диаграмме

Tasks и Task Instances (Экземпляр задачи)

Любой экземпляр оператора называется задачей. Каждая задача представлена ​​как узел в DAG.

Экземпляр задачи — это запуск задачи. В то время как мы определяем задачи в DAG, запуск DAG создается при выполнении DAG, он содержит идентификатор dag и дату выполнения DAG для идентификации каждого уникального запуска. Каждый запуск DAG состоит из нескольких задач, и каждый запуск этих задач называется экземпляром задачи.

Экземпляр задачи проходит через несколько состояний при запуске. Обычно поток состоит из следующих этапов:

  • No status (scheduler created empty task instance) — Нет статуса (планировщик создал пустой экземпляр задачи)
  • Scheduled (scheduler determined task instance needs to run) — Запланировано (необходим запуск определенного планировщиком экземпляра задачи)
  • Queued (scheduler sent the task to the queue – to be run) — В очереди (планировщик отправил задачу в очередь для выполнения)
  • Running (worker picked up a task and is now executing it) — Выполняется (работник взял задачу и теперь выполняет ее)
  • Success (task completed) — Успех (задача выполнена)

Жизненный цикл задачи:

Жизненный цикл задачи и экземпляра задачи

Обзор компонентов Apache Airflow

При работе с Airflow важно понимать основные компоненты его инфраструктуры. Даже если вы в основном взаимодействуете с Airflow как автор DAG, знание того, какие компоненты находятся «под капотом» и зачем они нужны, может быть полезно для разработки ваших DAG, отладки и успешного запуска в Airflow.

Обратите внимание, что в этой статье описаны компоненты и ​​функции Airflow 2.0+. Некоторые из упомянутых здесь компонентов и функций недоступны в более ранних версиях Airflow.

Основные компоненты Airflow

Apache Airflow имеет четыре основных компонента, которые работают постоянно:

  • Webserver (Веб-сервер): сервер Flask, работающий с Gunicorn, который обслуживает пользовательский интерфейс Airflow.
  • Scheduler (Планировщик): демон, отвечающий за планирование заданий. Это многопоточный процесс Python, который определяет, какие задачи нужно запускать, когда их нужно запускать и где они выполняются.
  • Database(База данных): база данных, в которой хранятся все метаданные DAG и задач. Обычно это база данных Postgres, но также поддерживаются MySQL, MsSQL и SQLite.
  • Executor: механизм запуска задач. Исполнитель запускается в планировщике всякий раз, когда Airflow работает.
  • Папка с файлами DAG, прочитанная планировщиком и исполнителем (и любыми рабочими процессами, имеющимися у исполнителя)

В дополнение к этим основным компонентам есть несколько ситуационных компонентов, которые используются только для запуска задач или использования определенных функций:

  • Worker(Рабочий): процесс, который выполняет задачи, определенные исполнителем. В зависимости от того, какого исполнителя вы выберете, у вас могут быть или не быть рабочие как часть вашей инфраструктуры Airflow.
  • Trigger (Триггер): Отдельный процесс, поддерживающий отложенные операторы. Этот компонент является необязательным и должен запускаться отдельно. Он нужен только в том случае, если вы планируете использовать отложенные (или «асинхронные») операторы.

На следующей схеме указано, как все эти компоненты работают вместе:

обзор и описание основных компонентов apache airflow architecture архитектура эирфлоу апачExecutors (Исполнители)

Пользователи Airflow могут выбрать один из нескольких доступных Executors или написать собственный. Каждый Executor имеет преимущества в определенных ситуациях:

  • SequentialExecutor: последовательно выполняет задачи внутри процесса планировщика без параллелизма. Этот исполнитель редко используется на практике, но он используется по умолчанию в конфигурации Airflow.
  • LocalExecutor: выполняет задачи локально внутри процесса планировщика, но поддерживает параллелизм и гиперпоточность. Этот исполнитель хорошо подходит для тестирования Airflow на локальном компьютере или на одном узле.
  • CeleryExecutor: использует серверную часть Celery (например, Redis, RabbitMq или другую систему очередей сообщений) для координации задач между предварительно настроенными рабочими процессами. Этот исполнитель идеально подходит, если у вас есть большое количество более коротких задач или более постоянная загрузка задач.
  • KubernetesExecutor: вызывает API Kubernetes для создания отдельного модуля для каждой выполняемой задачи, что позволяет пользователям передавать настраиваемые конфигурации для каждой из своих задач и эффективно использовать ресурсы. Этот исполнитель хорош в нескольких различных контекстах:
    • У вас есть длительные задачи, которые вы не хотите прерывать развертыванием кода или обновлениями Airflow.
    • Ваши задачи требуют очень специфических конфигураций ресурсов
    • Ваши задачи выполняются нечасто, и вы не хотите нести расходы на рабочие ресурсы, когда они не выполняются.

Обратите внимание, что есть также несколько других исполнителей, которые мы здесь не рассматриваем, в том числе CeleryKubernetes Executor и Dask Executor. Они считаются более экспериментальными и не так широко распространены, как другие описанные здесь исполнители.

Обзор процесса обработки Airflow DAGs

Чтобы увидеть, как Airflow выполняет DAG, давайте кратко рассмотрим общий процесс, связанный с разработкой и запуском Airflow DAG. На высоком уровне Airflow состоит из трех основных компонентов:

  • Airflow Scheduler — анализирует DAG, проверяет интервал их расписания и (если расписание DAG прошло) начинает планировать выполнение задач DAG, передавая их рабочим процессам Airflow.
  • Airflow Workers — берут задачи, запланированные для выполнения, и выполняют их. Таким образом, workers несут ответственность за фактическое «выполнение работы».
  • Airflow Webserver — визуализирует DAGs, проанализированные Scheduler, и предоставляет пользователям основной интерфейс для наблюдения за запусками DAGs и их результатами.

Сердцем Airflow, возможно, является планировщик, так как именно здесь происходит большая часть волшебства, которое определяет, когда и как выполняются ваши конвейеры.

Установка и настройка Apache Airflow

Установка Apache Airflow с помощью Docker-Compose на Ubuntu 20.04

Подробная официальная документация по развертыванию Apache Airflow с помощью docker-compose: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

Моя краткая инструкция с шагами, которые у меня заработали на Ubuntu 20.04:

После установки нужно зайти по адресу http://localhost:8080/

Логин/Пароль по умолчанию: airflow/airflow

Установка Apache Airflow с помощью Docker-Compose

Используемый при развертывании выше файл docker-compose.yaml содержит несколько сервисов:

  • airflow-scheduler — Планировщик отслеживает все задачи и DAG, а затем запускает экземпляры задач после завершения их зависимостей.
  • airflow-webserver — Веб-сервер доступен по адресу http://localhost:8080.
  • airflow-worker — Выполняет задачи, определенные планировщиком.
  • airflow-init — Служба инициализации.
  • postgres — База данных.
  • redis — брокер, который пересылает сообщения от планировщика к воркеру.

Примерное потребление ресурсов контейнерами Airflow (без нагрузки):

Airflow довольно требователен к ресурсам.

Я бы указал 4 Gb Ram как минимум (в документации минимальная планка 4Gb RAM), но на мой взгляд на прод лучше сразу запросить 8 Гб RAM.

Процесс установки также описан в видео «Running Airflow 2.0 in 5 mins with Docker»:

Настройка Apache Airflow

todo

Подборка видео по тематике Airflow

Видео на русском

Что такое Apache Airflow («Школы Больших Данных» г. Москва)

Начало работы с apache Airflow — часть 1 («Школы Больших Данных» г. Москва)

ETL на airflow — часть 2 («Школы Больших Данных» г. Москва)

5 Оркестраторы и работа с Airflow (Канал Intellik)

ВВЕДЕНИЕ В AIRFLOW / ПОНЯТИЕ DAG’а / НАСТРОЙКА DAG’а В AIRFLOW (Канал DataLearn)

Airflow и MLFlow автоматизаций пайплайнов Machine Learning / MLOps (Канал miracl6)

Видео на английском

The Newcomer’s Guide to Airflow’s Architecture (Канал Apache Airflow)

How to write your first DAG in Apache Airflow — Airflow tutorials

Build your first pipeline DAG | Apache airflow for beginners

0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x