Как устроена работа Iceberg на примере Trino и Rest Catalog?

Contents

Как устроена Java (краткий overview)

Код содержится в .java файлах. Java не компилируется сразу в машинный код, как C++. Вместо этого компилятор (javac) превращает исходники в байт-код, который записывается в .class файлы.

JVM (Java Virtual Machine) — это виртуальная машина Java, которая исполняет байт-код. JVM делает две важные вещи:

  • Интерпретирует байт-код (или компилирует его JIT — Just-In-Time)
  • Делает программу независимой от ОС (Java — “write once, run anywhere”)

JRE (Java Runtime Environment) — только запуск байт-кода (JVM + библиотеки)
JDK (Java Development Kit) — это набор инструментов для разработки (компилятор, отладка и т.д.)

Maven — это система сборки и управления зависимостями для Java-проектов. Он решает те же задачи, что pip в Python или npm в JavaScript, но чуть сложнее, потому что управляет ещё и сборкой, тестированием, упаковкой и т.д.

POM (Project Object Model) — это основной файл конфигурации Maven, обычно pom.xml. Он описывает:

  • что это за проект (название, версия)
  • какие зависимости нужны
  • какие плагины использовать для сборки, тестов и т.д.

Когда запускается команда mvn clean install, Maven проходит через стандартные фазы сборки:

  1. clean — очистить старые сборки
  2. compile — скомпилировать код
  3. test — запустить юнит-тесты
  4. package — упаковать в .jar или .war
  5. install — установить артефакт в локальный репозиторий

Maven сам скачает все зависимости, указанные в pom.xml, из центрального репозитория (Maven Central).

Gradle — это тоже система сборки (build system) для Java-проектов, но более современная и декларативная (конфигурация на Groovy/Kotlin). Maven является более старой build system с XML-конфигурацией.

Статьи по JVM на habr

Trino + Iceberg + Rest Catalog

В этом разделе будут приведены ссылки на основные библиотеки, которые будут рассмотрены в статье.

Link Описание
Iceberg Java implementation of Iceberg
Iceberg — Go Golang implementation
Iceberg Rust Rust implementation
Iceberg C++ C++ implementation
trino-iceberg Trino-Iceberg Plugin
java/org/apache/iceberg/rest Iceberg Rest Catalog
deepwiki.com/trinodb/trino DeepWiki Trino
deepwiki.com/apache/iceberg DeepWiki Iceberg

Краткий обзор, что такое Iceberg и как все работает:

Библиотека Apache Iceberg (пакет org.apache.iceberg) — это Java-API и набор абстракций для работы с табличными хранилищами больших данных: метаданные таблиц, транзакции, файлы данных, сканирование, поддержка форматов, изменения схем/партиций и др. То есть библиотека Java — это реализация на Java методов для работы с форматом Iceberg.

Catalog — это реализация менеджера метаданных/неймспейсов (Hive Metastore, Glue, Nessie, REST, HadoopCatalog и т.д.), которую использует Iceberg-библиотека для хранения/поиска метаданных таблиц.

Trino — распределённый SQL-движок; у него есть Iceberg-коннектор (плагин), который вызывает Iceberg Java API (и, опосредованно, выбранный Catalog) чтобы читать/писать таблицы.

Схема «Как работает Trino — Iceberg — Catalog — File System»

Общую схему «Trino — Iceberg — Catalog — HDFS/s3» можно представить в следующем виде:

Перечень наиболее значимых классов/интерфейсов Iceberg Java:

  • Table: представляет таблицу; операции создания, загрузки, модификации.
  • Catalog / TableOperations: абстракции для реализации каталога, где хранятся метаданные таблиц.
  • Schema, PartitionSpec, SortOrder: определяют структуру таблицы, партиционирование и порядок сортировки.
  • Snapshot: состояние таблицы на каком-то моменте времени; позволяет делать time-travel и возвращать данные к предыдущему состоянию.
  • TableScan / ScanTask: чтение таблицы — сканирование файлов, получение задач чтения.
  • AppendFiles, OverwriteFiles, Transaction: API записи и изменения таблицы.
  • DataFile, DeleteFile, ContentFile: представление физических файлов данных или удалений.
  • Metrics, FieldMetrics, MetricsModes: сбор статистики о столбцах, чтобы оптимизировать сканирование (например, фильтрация по статистике).
  • FileFormat: перечисление поддерживаемых форматов (Parquet, ORC, Avro и др.).
  • Метаданные как таблицы: SnapshotsTable, HistoryTable, DataFilesTable, ManifestsTable и др. позволяют работать с внутренним состоянием таблицы как с обычной таблицей.

Полная документация по Iceberg Java классам и методам Iceberg: iceberg/package-summary.html

Обзор Trino SPI и Plugin

Обзор SPI

Trino использует плагинную архитектуру для расширения своих возможностей и интеграции с различными источниками данных и другими системами. Плагины должны реализовывать интерфейсы и переопределять методы, определённые в SPI (Service Provider Interface — интерфейсе поставщика услуг).

Общая информация о плагинах представлена в разделе документации Plugin, а подробности по конкретным плагинам описаны на отдельных страницах, посвящённых каждому из них.

В этом разделе описана разработка плагинов, включая специальные страницы, посвящённые возможностям разных типов плагинов. Создание собственного плагина позволяет добавить в Trino дополнительные функции, например, реализовать коннектор для нового источника данных.

Исходный код SPI находится в каталоге core/trino-spi в дереве исходников Trino.

Метаданные плагина

Каждый плагин определяет точку входа — реализацию интерфейса Plugin. Имя этого класса предоставляется Trino через стандартный интерфейс Java ServiceLoader: в classpath находится файл-ресурс с именем io.trino.spi.Plugin в каталоге META-INF/services.
Содержимое этого файла представляет собой одну строку с полным именем класса плагина, например:

Для встроенного плагина, включённого в исходный код Trino, этот файл-ресурс создаётся автоматически, если в файле pom.xml плагина указана следующая строка:

Интерфейс Plugin

Интерфейс Plugin — это хорошая отправная точка для разработчиков, которые хотят разобраться с SPI Trino. Он содержит методы доступа для получения различных классов, которые может предоставлять плагин.

Например, метод getConnectorFactories() — это основная функция, которую Trino вызывает, когда готов создать экземпляр коннектора для работы с каталогом.
Существуют также аналогичные методы для объектов Type, ParametricType, Function, SystemAccessControl и EventListenerFactory.

Сборка плагинов с помощью Maven

Плагины зависят от SPI из Trino:

Плагин использует область provided, потому что классы из SPI предоставляются самим Trino во время выполнения, и поэтому плагин не должен включать их в свой состав при сборке.

Существуют и другие зависимости, которые Trino также предоставляет, включая Slice и аннотации Jackson. В частности, Jackson используется для сериализации дескрипторов коннекторов, поэтому плагины должны использовать ту версию аннотаций, которая поставляется вместе с Trino.

Все остальные зависимости зависят от того, что требуется самому плагину для его реализации. Плагины загружаются в отдельный загрузчик классов, что обеспечивает изоляцию и позволяет плагинам использовать версии библиотек, отличные от тех, что применяются внутри Trino.

Пример файла pom.xml можно найти в каталоге plugin/trino-example-http в дереве исходного кода Trino — это пример HTTP-коннектора.

Развёртывание собственного плагина

Плагины Trino должны использовать тип упаковки trino-plugin, предоставляемый плагином trino-maven-plugin.
Сборка плагина создаёт необходимый дескриптор сервиса и вызывает Provisio для формирования ZIP-файла в каталоге target.

Этот архив содержит JAR-файл плагина и все его зависимости в виде отдельных JAR-файлов и полностью готов к установке в Trino.

Совместимость

Успешная загрузка, установка и использование плагина зависят от его совместимости с целевым кластером Trino. Полная совместимость гарантируется только в том случае, если используется одна и та же версия Trino — как при сборке плагина, так и при его развертывании. Поэтому настоятельно рекомендуется использовать одинаковые версии.

Например, плагин Trino, скомпилированный для версии Trino 470, может не работать с более старыми или новыми версиями, такими как Trino 430 или Trino 490. Это особенно важно учитывать при установке плагинов, разработанных другими проектами, поставщиками или внутри вашей организации.

Плагины Trino реализуют SPI (Service Provider Interface), который может изменяться с каждым релизом Trino. По умолчанию в Trino отсутствует автоматическая проверка совместимости SPI во время выполнения, поэтому ответственность за проверку совместимости лежит на авторе плагина. Это можно сделать с помощью тестов при запуске.

Если исходный код плагина доступен, можно проверить, для какой версии Trino он предназначен, посмотрев файл pom.xml. Плагин должен явно указывать зависимость от SPI, и, соответственно, быть совместимым с релизом Trino, указанным в теге <version>:

Хорошей практикой является использование свойства для указания версии, чтобы упростить её изменение в будущем. Это свойство объявляется отдельно в файле pom.xml:

Диаграмма отношений классов Trino SPI

Старая диаграмма отношений классов (найдено на просторах интернета, могут быть неточности)

Как создать собственный плагин Trino: практический пример

Iceberg Java API

Таблицы (Tables)

Основная цель API Iceberg — управлять метаданными таблиц, такими как схема, спецификация партиционирования, метаданные и файлы данных, в которых хранится содержимое таблицы.

Метаданные таблицы и операции над ними доступны через интерфейс Table. Этот интерфейс предоставляет информацию о таблице.

Метаданные таблицы (Table metadata)

Интерфейс Table предоставляет доступ к метаданным таблицы:

  • schema — возвращает текущую схему таблицы
  • spec — возвращает текущую спецификацию партиционирования
  • properties — возвращает карту свойств в формате ключ–значение
  • currentSnapshot — возвращает текущий снимок (snapshot) таблицы
  • snapshots — возвращает все валидные снимки таблицы
  • snapshot(id) — возвращает конкретный снимок по ID
  • location — возвращает базовый путь таблицы

Таблицы также предоставляют метод refresh, который обновляет объект таблицы до самой последней версии, а также вспомогательные методы:

  • io — возвращает объект FileIO, используемый для чтения и записи файлов таблицы
  • locationProvider — возвращает LocationProvider для создания путей к data- и metadata-файлам

Сканирование (Scanning)

Интерфейс Table в Java API предоставляет методы для взаимодействия с таблицей Apache Iceberg, позволяя получать информацию о таких элементах, как схема (schema), спецификация партиционирования (partition spec), метаданные (metadata) и datafiles.

На уровне файлов (File level)

Чтение данных из таблицы Iceberg начинается с создания объекта TableScan с помощью метода newScan().
Объект TableScan возвращает список задач сканирования (scan tasks), включающих datafilesdelete files и другие файлы, необходимые для выполнения запроса.

Такой подход особенно полезен при интеграции Iceberg с вычислительными движками (compute engines) — например, SparkFlink или собственными Java-приложениями.
Движок может использовать результаты TableScan, чтобы получить список файлов, подлежащих чтению и обработке, без необходимости повторно реализовывать логику чтения метаданных Iceberg.


Сканирование таблицы в Iceberg начинается с создания объекта TableScan через метод newScan:

Чтобы настроить сканирование, вызывают методы filter и select, которые возвращают новый объект TableScan с применёнными изменениями:

Каждый метод конфигурации создаёт новый объект TableScan, чтобы он оставался иммутабельным и не изменялся непредсказуемо при использовании в разных потоках.

Когда сканирование настроено, методы planFiles, planTasks и schema используются для получения файлов, задач и проекции схемы:

Для time-travel запросов можно использовать методы asOfTime или useSnapshot, чтобы указать соответствующий снимок таблицы.

Уровень строк (Row level)

В Java API инициализация построчного сканирования (row-level scan), возвращающего отдельные записи данных, начинается с создания объекта ScanBuilder с помощью вызова IcebergGenerics.read().

Этот способ не всегда подходит для очень больших наборов данных, где эффективнее использовать полноценный движок запросов (query engine). Такие движки, как правило, используют рассмотренный ранее механизм TableScan, чтобы формировать список файлов и выполнять распределённое чтение.


Сканирование таблицы Iceberg на уровне строк начинается с создания объекта ScanBuilder с помощью IcebergGenerics.read:

Чтобы настроить сканирование, вызывают методы where и select у ScanBuilder, которые возвращают новый ScanBuilder с применёнными изменениями:

Когда сканирование настроено, вызывают метод build, чтобы выполнить его.
build() возвращает CloseableIterable<Record>:

Здесь Record — это запись Iceberg из модуля iceberg-data (org.apache.iceberg.data.Record).

Операции обновления (Update operations)

Интерфейс Table также предоставляет операции, позволяющие обновлять таблицу.
Эти операции используют шаблон builder, основанный на интерфейсе PendingUpdate, и фиксируются вызовом метода commit.

Например, обновление схемы таблицы выполняется вызовом updateSchema, добавлением изменений и вызовом commit, чтобы применить их:

Доступные операции обновления таблицы:

  • updateSchema — обновление схемы таблицы
  • updateSpec — изменение спецификации партиционирования
  • updateStatistics — обновление статистических файлов таблицы
  • updatePartitionStatistics — обновление статистики для определённого раздела
  • updateProperties — изменение свойств таблицы
  • updateLocation — изменение базового пути таблицы
  • expireSnapshots — удаление старых снимков
  • manageSnapshots — управление снимками таблицы
  • newAppend — добавление новых data-файлов
  • newFastAppend — добавление файлов без уплотнения метаданных
  • newOverwrite — добавление новых файлов и удаление старых, которые заменяются
  • newDelete — удаление data-файлов
  • newRewrite — переписывание файлов (замена новыми версиями)
  • newRowDelta — удаление или замена строк в существующих data-файлах
  • newTransaction — создание новой транзакции на уровне таблицы
  • rewriteManifests — переписывание манифестов для ускорения планирования сканирования
  • replaceSortOrder — замена сортировки таблицы новым правилом
  • newReplacePartitions — динамическая перезапись партиций новыми данными

Транзакции

Транзакции используются для того, чтобы зафиксировать несколько изменений таблицы в одной атомарной операции.
Транзакция создаёт отдельные операции с помощью фабричных методов (например, newAppend), точно так же, как это делается при работе напрямую с объектом Table.

Операции, созданные внутри транзакции, фиксируются группой, когда вызывается метод commitTransaction.

Пример: удаление и добавление файла в одной транзакции:

В следующем примере:

  • выполняется перезапись (overwrite) некоторых файлов на основе фильтра,
  • добавляется новый файл данных,
  • обновляется расположение таблицы (location) — всё это в рамках одной транзакции. В конце транзакция коммитится целиком, гарантируя атомарность изменений:

Типы данных (Types)

Типы данных Iceberg находятся в пакете org.apache.iceberg.types (см. документацию: package-summary.html).

Примитивные типы (Primitives)

Экземпляры примитивных типов доступны через статические методы в соответствующих классах типов.
Типы без параметров используют метод get, а такие типы, как decimal, создаются через фабричные методы:

Вложенные типы (Nested types)

Структуры (struct), карты (map) и списки (list) создаются с помощью фабричных методов в классе типов.

Как и поля структур, ключи и значения map, а также элементы списка отслеживаются как nested fields (вложенные поля). Вложенные поля имеют собственные ID полей и признак допуска null.

Поля структур создаются с помощью NestedField.optional и NestedField.required.
Nullability значений map и элементов списка задаётся в фабричных методах map/list.

Примеры:

Expressions (Выражения)

Выражения Iceberg используются для настройки сканирования таблицы.
Для создания выражений применяются фабричные методы из класса Expressions.

Поддерживаемые предикаты:

  • isNull
  • notNull
  • equal
  • notEqual
  • lessThan
  • lessThanOrEqual
  • greaterThan
  • greaterThanOrEqual
  • in
  • notIn
  • startsWith
  • notStartsWith

Поддерживаемые логические операции:

  • and
  • or
  • not

Константные выражения:

  • alwaysTrue
  • alwaysFalse

Привязка выражений (Expression binding)

При создании выражения являются непривязанными (unbound).
Прежде чем выражение будет использовано, оно должно быть привязано к конкретному типу данных:

  • Iceberg определяет ID поля, которому соответствует имя в выражении
  • Iceberg преобразует литералы предиката к типу соответствующего поля

Например, перед использованием выражения:

Iceberg должен определить:

  • к какой колонке относится имя «x»
  • нужно ли преобразовать 10 в тип этой колонки (например, long → int или наоборот)

Одно и то же выражение может быть успешно привязано к разным типам, например:

  • struct<1 x: long, 2 y: long>
  • struct<11 x: int, 12 y: int>

Expression example:

Модули

Поддержка таблиц Iceberg организована в виде библиотечных модулей:

  • iceberg-common — содержит служебные (utility) классы, используемые другими модулями
  • iceberg-api — включает публичный API Iceberg: выражения, типы данных, таблицы и операции
  • iceberg-arrow — реализация системы типов Iceberg для чтения/записи данных таблиц Iceberg с использованием Apache Arrow как формата данных в памяти
  • iceberg-aws — содержит реализации API Iceberg для работы с таблицами, размещёнными в AWS S3, и/или с каталогами AWS Glue
  • iceberg-core — содержит реализации API Iceberg и поддержку файлов Avro; именно от этого модуля должны зависеть вычислительные движки
  • iceberg-parquet — опциональный модуль для работы с таблицами, основанными на Parquet-файлах
  • iceberg-orc — опциональный модуль для работы с таблицами, основанными на ORC-файлах (экспериментальный)
  • iceberg-hive-metastore — реализация таблиц Iceberg на основе Hive Metastore через Thrift-клиент

Проект Iceberg также содержит модули для интеграции Iceberg с вычислительными движками и инструментами:

  • iceberg-spark — реализация Datasource V2 API Spark для Iceberg, с подмодулями для разных версий Spark (для избегания конфликтов зависимостей используйте runtime jars)
  • iceberg-flink — реализация Table API и DataStream API Flink для Iceberg (используйте iceberg-flink-runtime для shaded-версии)
  • iceberg-mr — реализация InputFormat для MapReduce и Hive, а также SerDe для Iceberg (для Hive используйте iceberg-hive-runtime)
  • iceberg-nessie — модуль для интеграции истории и операций метаданных Iceberg с Project Nessie
  • iceberg-data — клиентская библиотека для чтения таблиц Iceberg из JVM-приложений
  • iceberg-runtime — генерирует shaded runtime jar для Spark для интеграции с таблицами Iceberg

Java API Quickstart

В разделе рассматриваются прикладные примеры управления таблицами, схемами и файловыми данными в Apache Iceberg с помощью Java. Рассматриваются такие аспекты, как создание таблиц, схем и спецификаций партиционирования, а также операции чтения и обновления данных.

Java API предоставляет программный способ управления метаданными и данными таблиц Iceberg из пользовательских Java-приложений. Это включает операции с схемами (schema), спецификациями партиционирования (partition spec) и datafiles.

В этом разделе рассматриваются основные операции, такие как создание таблиц и схем, настройка партиционирования, а также чтение и обновление данных. Таким образом, вы сможете быстро начать работать с данным API.

Создание таблицы (Create a table)

Как и в любом другом вычислительном движке, первый шаг при работе с таблицами Apache Iceberg — это создание каталога (catalog).
В общем случае при использовании Java API таблицы создаются через реализацию интерфейсов Catalog или Table.

В этой главе будет показано, как использовать два встроенных типа каталогов для создания таблиц Iceberg. Тем не менее, вы также можете реализовать и использовать собственный кастомный каталог.

Использование каталога Hive

Каталог Hive подключается к Hive Metastore, чтобы отслеживать Iceberg-таблицы.
Вы можете инициализировать Hive-каталог, указав имя и набор свойств.

HiveCatalog реализует интерфейс Catalog, который содержит методы для работы с таблицами:
createTable, loadTable, renameTable, dropTable.

Чтобы создать таблицу, нужно передать идентификатор (Identifier) и схему (Schema) вместе с другими начальными метаданными:

Использование Hadoop-каталога

Hadoop catalog не требует подключения к Hive Metastore, но может использоваться только с HDFS или аналогичными файловыми системами, которые поддерживают атомарный rename.
Параллельные записи с Hadoop-каталогом небезопасны при использовании локальной файловой системы или S3.

Чтобы создать Hadoop-каталог:

Так же как и Hive-каталог, HadoopCatalog реализует интерфейс Catalog, поэтому он содержит такие методы, как createTable, loadTable, dropTable.

Пример создания таблицы с использованием Hadoop-каталога:

Схема таблицы (schema) и спецификация партиционирования (spec) создаются далее.

Таблицы в Spark

Spark может работать с таблицами по имени, используя HiveCatalog.

Spark также может загружать таблицу, созданную через HadoopCatalog, по пути:

Схемы (Schemas)

Создание схемы

Чтобы создать схему таблицы, необходимо создать объект класса Schema в Iceberg.

Пример создания схемы для таблицы logs:

При использовании Iceberg API напрямую необходимо указывать ID типов.
При конвертации из других форматов (Spark, Avro, Parquet) новые ID назначаются автоматически.

При создании таблицы все ID в схеме переназначаются для обеспечения уникальности.

Конвертация схемы из Avro (Convert a schema from Avro)

Чтобы создать схему Iceberg на основе существующей Avro-схемы, используйте конвертеры из AvroSchemaUtil:

Конвертация схемы из Spark (Convert a schema from Spark)

Чтобы создать схему Iceberg на основе существующей таблицы Spark, используйте SparkSchemaUtil:

Партиционирование (Partitioning)

Спецификация партиционирования в Java API определяется с помощью builder-паттерна.
В Iceberg процесс построения спецификации партиционирования начинается с вызова
PartitionSpec.builderFor(schema), где schema — это схема таблицы, на основе которой формируется партиционирование.

Создание partition spec

Partition spec описывает, как Iceberg должен группировать записи в файлы данных.
Partition spec создаётся для схемы таблицы с помощью билдера.

Пример создания partition spec для таблицы logs, где записи партиционируются по часу (event_time) и уровню логов (level):

Ветки и теги (Branching and Tagging)

Создание веток и тегов (Creating branches and tags)

Новые ветки и теги можно создавать через API ManageSnapshots в Java.

Коммиты в ветки (Committing to branches)

Запись данных в ветку выполняется с помощью параметра toBranch в операции. Полный список доступен в UpdateOperations.

Чтение из веток и тегов (Reading from branches and tags)

Чтение из ветки или тега выполняется обычным способом через Table Scan API, добавив имя ветки/тега в метод useRef.
Если указана ветка, будет использован снимок (snapshot), который является головой этой ветки.

Важно: чтение одновременно из ветки и с использованием asOfSnapshotId пока не поддерживается.

Замена и fast-forward веток и тегов (Replacing and fast forwarding branches and tags)

Снимки (snapshots), на которые указывают существующие ветки и теги, могут быть обновлены через методы replace.
Операция fast-forward похожа на fast-forward в Git: она позволяет продвинуть целевую ветку до головы исходной ветки или тега, если целевая ветка является предком исходной.

И при fast-forward, и при replace свойства ретеншена (retention) целевой ветки сохраняются по умолчанию.

Обновление параметров ретеншена (Updating retention properties)

Параметры ретеншена для веток и тегов также могут быть обновлены.
Используйте setMaxRefAgeMs для изменения ретенции для самой ветки или тега.
Параметры ретеншена snapshot-ов внутри ветки обновляются через setMinSnapshotsToKeep и setMaxSnapshotAgeMs.

Удаление веток и тегов (Removing branches and tags)

Ветки и теги можно удалять с помощью методов removeBranch и removeTag соответственно.

Что такое партиционирование?

Партиционирование — это способ ускорить выполнение запросов, группируя похожие строки вместе при записи.

Например, запросы к таблице логов обычно включают фильтрацию по диапазону времени, как в этом запросе для логов между 10 и 12 часами:

Если настроить таблицу логов на партиционирование по дате event_time, то события будут записываться в файлы, сгруппированные по дате. Iceberg отслеживает это значение и пропускает файлы с другими датами, где нет нужных данных.

Iceberg умеет партиционировать временные метки по годам, месяцам, дням и часам. Он также может использовать категориальные столбцы, например level в примере логов, чтобы группировать строки и ускорять запросы.

Что Iceberg делает иначе?

Другие форматы таблиц, такие как Hive, поддерживают партиционирование, но Iceberg предлагает скрытое партиционирование:

  • Iceberg берет на себя скучную и ошибкоопасную задачу вычисления значений партиций для строк.
  • Iceberg автоматически избегает чтения ненужных партиций. Клиентам не нужно знать схему партиционирования и добавлять вспомогательные фильтры.
  • Iceberg позволяет постепенно менять схему партиционирования по мере необходимости.

Партиционирование в Hive

Чтобы показать разницу, посмотрим, как Hive обрабатывает таблицу логов.

В Hive партиции явные и представлены столбцом, поэтому в таблице логов будет столбец event_date. При записи нужно явно указывать значение для этого столбца:

Так же и запросы должны включать фильтр по event_date в дополнение к фильтру по event_time:

Если фильтр event_date упустить, Hive будет сканировать все файлы таблицы, потому что не знает, что event_time связан с event_date.

Проблемы Hive-партиционирования

Hive необходимо явно передавать значения партиций. В примере с логами система не знает связи между event_time и event_date.

Отсюда возникают проблемы:

  • Hive не может проверить корректность значений партиций — ответственность на стороне writer’а
    • неверный формат (2018-12-01 вместо 20181201) приводит к тихим неправильным результатам
    • неверный источник данных или часовой пояс также приводит к ошибкам без предупреждений
  • Пользователь обязан писать запросы корректно
    • неверный формат фильтра снова приводит к тихим ошибкам
    • пользователи, не понимающие физическую схему таблицы, получают медленные запросы — Hive не может преобразовать фильтры автоматически
  • Рабочие запросы привязаны к схеме партиционирования, поэтому изменить её без поломки запросов невозможно

Скрытое партиционирование Iceberg

Iceberg вычисляет значения партиций, беря значение колонки и по желанию применяя трансформацию. Iceberg отвечает за преобразование event_time → event_date и хранит эту связь.

Партиционирование таблицы задаётся через такие связи. Например, таблица логов будет партиционироваться по day(event_time) и level.

Поскольку Iceberg не требует явных колонок для партиций, он может скрывать партиционирование:

  • значения партиций всегда вычисляются корректно
  • они автоматически используются для ускорения запросов
  • пользователи и процессоры данных вообще не видят event_date

Главное: запросы больше не зависят от физической структуры таблицы. Iceberg отделяет логическое представление данных от физического.

Благодаря этому Iceberg поддерживает эволюцию партиционирования: схему можно менять по мере роста объёма данных, а некорректную конфигурацию — исправлять без дорогих миграций.

Custom Catalog

Iceberg-таблицу можно читать как по пути в HDFS, так и как таблицу Hive. Также можно использовать custom metastore вместо Hive. Для этого нужно выполнить следующие шаги:

  • Custom TableOperations
  • Custom Catalog
  • Custom FileIO
  • Custom LocationProvider
  • Custom IcebergSource

Реализация пользовательских операций с таблицей (Custom TableOperations)

Нужно расширить BaseMetastoreTableOperations и реализовать логику чтения и записи метаданных.

Пример:

Обычно экземпляр TableOperations создаётся через вызов Catalog.newTableOps(TableIdentifier).

Реализация пользовательского каталога (Custom catalog implementation)

Расширьте BaseMetastoreCatalog, чтобы предоставить базовые пути хранилища (warehouse locations) и создать экземпляры CustomTableOperations.

Пример:

Реализации каталогов могут динамически загружаться в большинстве вычислительных движков.

  • В Spark и Flink вы можете указать свойство каталога catalog-impl, чтобы загрузить свою реализацию.
  • В MapReduce необходимо реализовать org.apache.iceberg.mr.CatalogLoader и указать Hadoop-свойство iceberg.mr.catalog.loader.class, чтобы загрузить каталог.
  • Если вашему каталогу требуется читать Hadoop-конфигурацию для доступа к параметрам окружения, сделайте его реализацию org.apache.hadoop.conf.Configurable.

Реализация пользовательского ввода/вывода файлов (Custom File IO implementation)

Расширьте интерфейс FileIO и реализуйте собственную логику для чтения и записи файлов данных.

Пример:

Если вы уже разрабатываете собственный каталог, вы можете реализовать метод TableOperations.io(), чтобы использовать свой CustomFileIO.

Кроме того, пользовательские реализации FileIO могут динамически загружаться в HadoopCatalog и HiveCatalog, если указать свойство каталога io-impl.

Если вашему FileIO необходимо читать Hadoop-конфигурацию для доступа к параметрам среды,
сделайте вашу реализацию наследником org.apache.hadoop.conf.Configurable.

Реализация пользовательского провайдера локаций (Custom location provider implementation)

Расширьте LocationProvider и реализуйте собственную логику определения пути, по которому будут записываться файлы данных.

Пример:

Если вы уже реализуете собственный каталог, вы можете переопределить
TableOperations.locationProvider() и указать свой CustomLocationProvider как стандартный.

Чтобы использовать другой кастомный провайдер локаций для конкретной таблицы, укажите его при создании таблицы через свойство:

write.location-provider.impl

Пример:

Custom IcebergSource

Расширьте IcebergSource и реализуйте логику чтения таблицы из CustomCatalog.

Пример:

Зарегистрируйте CustomIcebergSource, добавив его полное имя класса в файл:

META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Подпишись на телеграм канал Data Engineering Инжиниринг данных
Иван Шамаев
Более 14 лет опыта в ИТ. Разрабатывал аналитические решения и визуализации данных, строил витрины данных, автоматизировал выгрузку и обработку данных, автоматизировал data pipelines в Airflow. Разработка рассылок, финансовой отчетности, кастомизация Superset.
0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x