Contents
- 1 Как устроена Java (краткий overview)
- 2 Trino + Iceberg + Rest Catalog
- 3 Обзор Trino SPI и Plugin
- 4 Iceberg Java API
- 5 Java API Quickstart
- 5.1 Создание таблицы (Create a table)
- 5.2 Схемы (Schemas)
- 5.3 Партиционирование (Partitioning)
- 5.4 Ветки и теги (Branching and Tagging)
- 5.4.1 Создание веток и тегов (Creating branches and tags)
- 5.4.2 Коммиты в ветки (Committing to branches)
- 5.4.3 Чтение из веток и тегов (Reading from branches and tags)
- 5.4.4 Замена и fast-forward веток и тегов (Replacing and fast forwarding branches and tags)
- 5.4.5 Обновление параметров ретеншена (Updating retention properties)
- 5.4.6 Удаление веток и тегов (Removing branches and tags)
- 6 Что такое партиционирование?
- 7 Custom Catalog
- 7.1 Реализация пользовательских операций с таблицей (Custom TableOperations)
- 7.2 Реализация пользовательского каталога (Custom catalog implementation)
- 7.3 Реализация пользовательского ввода/вывода файлов (Custom File IO implementation)
- 7.4 Реализация пользовательского провайдера локаций (Custom location provider implementation)
- 7.5 Custom IcebergSource
Как устроена Java (краткий overview)
Код содержится в .java файлах. Java не компилируется сразу в машинный код, как C++. Вместо этого компилятор (javac) превращает исходники в байт-код, который записывается в .class файлы.
JVM (Java Virtual Machine) — это виртуальная машина Java, которая исполняет байт-код. JVM делает две важные вещи:
- Интерпретирует байт-код (или компилирует его JIT — Just-In-Time)
- Делает программу независимой от ОС (Java — “write once, run anywhere”)
JRE (Java Runtime Environment) — только запуск байт-кода (JVM + библиотеки)
JDK (Java Development Kit) — это набор инструментов для разработки (компилятор, отладка и т.д.)
Maven — это система сборки и управления зависимостями для Java-проектов. Он решает те же задачи, что pip в Python или npm в JavaScript, но чуть сложнее, потому что управляет ещё и сборкой, тестированием, упаковкой и т.д.
POM (Project Object Model) — это основной файл конфигурации Maven, обычно pom.xml. Он описывает:
- что это за проект (название, версия)
- какие зависимости нужны
- какие плагины использовать для сборки, тестов и т.д.
Когда запускается команда mvn clean install, Maven проходит через стандартные фазы сборки:
clean— очистить старые сборкиcompile— скомпилировать кодtest— запустить юнит-тестыpackage— упаковать в.jarили.warinstall— установить артефакт в локальный репозиторий
Maven сам скачает все зависимости, указанные в pom.xml, из центрального репозитория (Maven Central).
Gradle — это тоже система сборки (build system) для Java-проектов, но более современная и декларативная (конфигурация на Groovy/Kotlin). Maven является более старой build system с XML-конфигурацией.
Статьи по JVM на habr
- Развеиваем мифы об управлении памятью в JVM
- Java-модель памяти (часть 1)
- Java-модель памяти (часть 2)
- Управление памятью Java
- Один день из жизни JVM-инженера
- Garbage Collection и JVM
- Понимание утечек памяти в Java
Trino + Iceberg + Rest Catalog
В этом разделе будут приведены ссылки на основные библиотеки, которые будут рассмотрены в статье.
| Link | Описание |
| Iceberg | Java implementation of Iceberg |
| Iceberg — Go | Golang implementation |
| Iceberg Rust | Rust implementation |
| Iceberg C++ | C++ implementation |
| trino-iceberg | Trino-Iceberg Plugin |
| java/org/apache/iceberg/rest | Iceberg Rest Catalog |
| deepwiki.com/trinodb/trino | DeepWiki Trino |
| deepwiki.com/apache/iceberg | DeepWiki Iceberg |
Краткий обзор, что такое Iceberg и как все работает:
Библиотека Apache Iceberg (пакет org.apache.iceberg) — это Java-API и набор абстракций для работы с табличными хранилищами больших данных: метаданные таблиц, транзакции, файлы данных, сканирование, поддержка форматов, изменения схем/партиций и др. То есть библиотека Java — это реализация на Java методов для работы с форматом Iceberg.
Catalog — это реализация менеджера метаданных/неймспейсов (Hive Metastore, Glue, Nessie, REST, HadoopCatalog и т.д.), которую использует Iceberg-библиотека для хранения/поиска метаданных таблиц.
Trino — распределённый SQL-движок; у него есть Iceberg-коннектор (плагин), который вызывает Iceberg Java API (и, опосредованно, выбранный Catalog) чтобы читать/писать таблицы.
Схема «Как работает Trino — Iceberg — Catalog — File System»
Общую схему «Trino — Iceberg — Catalog — HDFS/s3» можно представить в следующем виде:
Перечень наиболее значимых классов/интерфейсов Iceberg Java:
Table: представляет таблицу; операции создания, загрузки, модификации.Catalog/TableOperations: абстракции для реализации каталога, где хранятся метаданные таблиц.Schema,PartitionSpec,SortOrder: определяют структуру таблицы, партиционирование и порядок сортировки.Snapshot: состояние таблицы на каком-то моменте времени; позволяет делать time-travel и возвращать данные к предыдущему состоянию.TableScan/ScanTask: чтение таблицы — сканирование файлов, получение задач чтения.AppendFiles,OverwriteFiles, Transaction: API записи и изменения таблицы.DataFile,DeleteFile,ContentFile: представление физических файлов данных или удалений.Metrics,FieldMetrics,MetricsModes: сбор статистики о столбцах, чтобы оптимизировать сканирование (например, фильтрация по статистике).FileFormat: перечисление поддерживаемых форматов (Parquet, ORC, Avro и др.).- Метаданные как таблицы:
SnapshotsTable,HistoryTable,DataFilesTable,ManifestsTableи др. позволяют работать с внутренним состоянием таблицы как с обычной таблицей.
Полная документация по Iceberg Java классам и методам Iceberg: iceberg/package-summary.html
Обзор Trino SPI и Plugin
- Habr: Безграничная расширяемость: как экосистема плагинов помогает Trino работать в любом аналитическом ландшафте
- YouTube: Владимир Озеров — Как работает Apache Iceberg на примере Trino
Обзор SPI
Trino использует плагинную архитектуру для расширения своих возможностей и интеграции с различными источниками данных и другими системами. Плагины должны реализовывать интерфейсы и переопределять методы, определённые в SPI (Service Provider Interface — интерфейсе поставщика услуг).
Общая информация о плагинах представлена в разделе документации Plugin, а подробности по конкретным плагинам описаны на отдельных страницах, посвящённых каждому из них.
В этом разделе описана разработка плагинов, включая специальные страницы, посвящённые возможностям разных типов плагинов. Создание собственного плагина позволяет добавить в Trino дополнительные функции, например, реализовать коннектор для нового источника данных.
Исходный код SPI находится в каталоге core/trino-spi в дереве исходников Trino.
Метаданные плагина
Каждый плагин определяет точку входа — реализацию интерфейса Plugin. Имя этого класса предоставляется Trino через стандартный интерфейс Java ServiceLoader: в classpath находится файл-ресурс с именем io.trino.spi.Plugin в каталоге META-INF/services.
Содержимое этого файла представляет собой одну строку с полным именем класса плагина, например:
|
1 |
com.example.plugin.ExamplePlugin |
Для встроенного плагина, включённого в исходный код Trino, этот файл-ресурс создаётся автоматически, если в файле pom.xml плагина указана следующая строка:
|
1 |
<packaging>trino-plugin</packaging> |
Интерфейс Plugin
Интерфейс Plugin — это хорошая отправная точка для разработчиков, которые хотят разобраться с SPI Trino. Он содержит методы доступа для получения различных классов, которые может предоставлять плагин.
Например, метод getConnectorFactories() — это основная функция, которую Trino вызывает, когда готов создать экземпляр коннектора для работы с каталогом.
Существуют также аналогичные методы для объектов Type, ParametricType, Function, SystemAccessControl и EventListenerFactory.
Сборка плагинов с помощью Maven
Плагины зависят от SPI из Trino:
|
1 2 3 4 5 |
<dependency> <groupId>io.trino</groupId> <artifactId>trino-spi</artifactId> <scope>provided</scope> </dependency> |
Плагин использует область provided, потому что классы из SPI предоставляются самим Trino во время выполнения, и поэтому плагин не должен включать их в свой состав при сборке.
Существуют и другие зависимости, которые Trino также предоставляет, включая Slice и аннотации Jackson. В частности, Jackson используется для сериализации дескрипторов коннекторов, поэтому плагины должны использовать ту версию аннотаций, которая поставляется вместе с Trino.
Все остальные зависимости зависят от того, что требуется самому плагину для его реализации. Плагины загружаются в отдельный загрузчик классов, что обеспечивает изоляцию и позволяет плагинам использовать версии библиотек, отличные от тех, что применяются внутри Trino.
Пример файла pom.xml можно найти в каталоге plugin/trino-example-http в дереве исходного кода Trino — это пример HTTP-коннектора.
Развёртывание собственного плагина
Плагины Trino должны использовать тип упаковки trino-plugin, предоставляемый плагином trino-maven-plugin.
Сборка плагина создаёт необходимый дескриптор сервиса и вызывает Provisio для формирования ZIP-файла в каталоге target.
Этот архив содержит JAR-файл плагина и все его зависимости в виде отдельных JAR-файлов и полностью готов к установке в Trino.
Совместимость
Успешная загрузка, установка и использование плагина зависят от его совместимости с целевым кластером Trino. Полная совместимость гарантируется только в том случае, если используется одна и та же версия Trino — как при сборке плагина, так и при его развертывании. Поэтому настоятельно рекомендуется использовать одинаковые версии.
Например, плагин Trino, скомпилированный для версии Trino 470, может не работать с более старыми или новыми версиями, такими как Trino 430 или Trino 490. Это особенно важно учитывать при установке плагинов, разработанных другими проектами, поставщиками или внутри вашей организации.
Плагины Trino реализуют SPI (Service Provider Interface), который может изменяться с каждым релизом Trino. По умолчанию в Trino отсутствует автоматическая проверка совместимости SPI во время выполнения, поэтому ответственность за проверку совместимости лежит на авторе плагина. Это можно сделать с помощью тестов при запуске.
Если исходный код плагина доступен, можно проверить, для какой версии Trino он предназначен, посмотрев файл pom.xml. Плагин должен явно указывать зависимость от SPI, и, соответственно, быть совместимым с релизом Trino, указанным в теге <version>:
|
1 2 3 4 5 6 |
<dependency> <groupId>io.trino</groupId> <artifactId>trino-spi</artifactId> <version>470</version> <scope>provided</scope> </dependency> |
Хорошей практикой является использование свойства для указания версии, чтобы упростить её изменение в будущем. Это свойство объявляется отдельно в файле pom.xml:
|
1 2 3 4 5 6 7 8 9 |
... <dep.trino.version>470</dep.trino.version> ... <dependency> <groupId>io.trino</groupId> <artifactId>trino-spi</artifactId> <version>${dep.trino.version}</version> <scope>provided</scope> </dependency> |
Диаграмма отношений классов Trino SPI
Старая диаграмма отношений классов (найдено на просторах интернета, могут быть неточности)
Как создать собственный плагин Trino: практический пример
Iceberg Java API
Таблицы (Tables)
Основная цель API Iceberg — управлять метаданными таблиц, такими как схема, спецификация партиционирования, метаданные и файлы данных, в которых хранится содержимое таблицы.
Метаданные таблицы и операции над ними доступны через интерфейс Table. Этот интерфейс предоставляет информацию о таблице.
Метаданные таблицы (Table metadata)
Интерфейс Table предоставляет доступ к метаданным таблицы:
schema— возвращает текущую схему таблицыspec— возвращает текущую спецификацию партиционированияproperties— возвращает карту свойств в формате ключ–значениеcurrentSnapshot— возвращает текущий снимок (snapshot) таблицыsnapshots— возвращает все валидные снимки таблицыsnapshot(id)— возвращает конкретный снимок по IDlocation— возвращает базовый путь таблицы
Таблицы также предоставляют метод refresh, который обновляет объект таблицы до самой последней версии, а также вспомогательные методы:
io— возвращает объектFileIO, используемый для чтения и записи файлов таблицыlocationProvider— возвращаетLocationProviderдля создания путей к data- и metadata-файлам
Сканирование (Scanning)
Интерфейс Table в Java API предоставляет методы для взаимодействия с таблицей Apache Iceberg, позволяя получать информацию о таких элементах, как схема (schema), спецификация партиционирования (partition spec), метаданные (metadata) и datafiles.
На уровне файлов (File level)
Чтение данных из таблицы Iceberg начинается с создания объекта TableScan с помощью метода newScan().
Объект TableScan возвращает список задач сканирования (scan tasks), включающих datafiles, delete files и другие файлы, необходимые для выполнения запроса.
Такой подход особенно полезен при интеграции Iceberg с вычислительными движками (compute engines) — например, Spark, Flink или собственными Java-приложениями.
Движок может использовать результаты TableScan, чтобы получить список файлов, подлежащих чтению и обработке, без необходимости повторно реализовывать логику чтения метаданных Iceberg.
Сканирование таблицы в Iceberg начинается с создания объекта TableScan через метод newScan:
|
1 |
TableScan scan = table.newScan(); |
Чтобы настроить сканирование, вызывают методы filter и select, которые возвращают новый объект TableScan с применёнными изменениями:
|
1 |
TableScan filteredScan = scan.filter(Expressions.equal("id", 5)); |
Каждый метод конфигурации создаёт новый объект TableScan, чтобы он оставался иммутабельным и не изменялся непредсказуемо при использовании в разных потоках.
Когда сканирование настроено, методы planFiles, planTasks и schema используются для получения файлов, задач и проекции схемы:
|
1 2 3 4 5 6 |
TableScan scan = table.newScan() .filter(Expressions.equal("id", 5)) .select("id", "data"); Schema projection = scan.schema(); Iterable<CombinedScanTask> tasks = scan.planTasks(); |
Для time-travel запросов можно использовать методы asOfTime или useSnapshot, чтобы указать соответствующий снимок таблицы.
Уровень строк (Row level)
В Java API инициализация построчного сканирования (row-level scan), возвращающего отдельные записи данных, начинается с создания объекта ScanBuilder с помощью вызова IcebergGenerics.read().
Этот способ не всегда подходит для очень больших наборов данных, где эффективнее использовать полноценный движок запросов (query engine). Такие движки, как правило, используют рассмотренный ранее механизм TableScan, чтобы формировать список файлов и выполнять распределённое чтение.
Сканирование таблицы Iceberg на уровне строк начинается с создания объекта ScanBuilder с помощью IcebergGenerics.read:
|
1 |
ScanBuilder scanBuilder = IcebergGenerics.read(table); |
Чтобы настроить сканирование, вызывают методы where и select у ScanBuilder, которые возвращают новый ScanBuilder с применёнными изменениями:
|
1 |
scanBuilder.where(Expressions.equal("id", 5)); |
Когда сканирование настроено, вызывают метод build, чтобы выполнить его.
build() возвращает CloseableIterable<Record>:
|
1 2 3 |
CloseableIterable<Record> result = IcebergGenerics.read(table) .where(Expressions.lessThan("id", 5)) .build(); |
Здесь Record — это запись Iceberg из модуля iceberg-data (org.apache.iceberg.data.Record).
Операции обновления (Update operations)
Интерфейс Table также предоставляет операции, позволяющие обновлять таблицу.
Эти операции используют шаблон builder, основанный на интерфейсе PendingUpdate, и фиксируются вызовом метода commit.
Например, обновление схемы таблицы выполняется вызовом updateSchema, добавлением изменений и вызовом commit, чтобы применить их:
|
1 2 3 |
table.updateSchema() .addColumn("count", Types.LongType.get()) .commit(); |
Доступные операции обновления таблицы:
updateSchema— обновление схемы таблицыupdateSpec— изменение спецификации партиционированияupdateStatistics— обновление статистических файлов таблицыupdatePartitionStatistics— обновление статистики для определённого разделаupdateProperties— изменение свойств таблицыupdateLocation— изменение базового пути таблицыexpireSnapshots— удаление старых снимковmanageSnapshots— управление снимками таблицыnewAppend— добавление новых data-файловnewFastAppend— добавление файлов без уплотнения метаданныхnewOverwrite— добавление новых файлов и удаление старых, которые заменяютсяnewDelete— удаление data-файловnewRewrite— переписывание файлов (замена новыми версиями)newRowDelta— удаление или замена строк в существующих data-файлахnewTransaction— создание новой транзакции на уровне таблицыrewriteManifests— переписывание манифестов для ускорения планирования сканированияreplaceSortOrder— замена сортировки таблицы новым правиломnewReplacePartitions— динамическая перезапись партиций новыми данными
Транзакции
Транзакции используются для того, чтобы зафиксировать несколько изменений таблицы в одной атомарной операции.
Транзакция создаёт отдельные операции с помощью фабричных методов (например, newAppend), точно так же, как это делается при работе напрямую с объектом Table.
Операции, созданные внутри транзакции, фиксируются группой, когда вызывается метод commitTransaction.
Пример: удаление и добавление файла в одной транзакции:
|
1 2 3 4 5 6 7 8 |
Transaction t = table.newTransaction(); // фиксируем операции внутри транзакции t.newDelete().deleteFromRowFilter(filter).commit(); t.newAppend().appendFile(data).commit(); // фиксируем все изменения в таблице t.commitTransaction(); |
В следующем примере:
- выполняется перезапись (overwrite) некоторых файлов на основе фильтра,
- добавляется новый файл данных,
- обновляется расположение таблицы (location) — всё это в рамках одной транзакции. В конце транзакция коммитится целиком, гарантируя атомарность изменений:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Transaction t = table.newTransaction(); // Перезапись файлов t.newOverwrite() .overwriteByRowFilter(Expressions.equal("id", 5)) .addFile(newDataFile) .commit(); // Обновление пути хранения таблицы t.updateLocation() .setLocation("new-location-path") .commit(); // Фиксация всех изменений в таблице t.commitTransaction(); |
Типы данных (Types)
Типы данных Iceberg находятся в пакете org.apache.iceberg.types (см. документацию: package-summary.html).
Примитивные типы (Primitives)
Экземпляры примитивных типов доступны через статические методы в соответствующих классах типов.
Типы без параметров используют метод get, а такие типы, как decimal, создаются через фабричные методы:
|
1 2 3 |
Types.IntegerType.get() // int Types.DoubleType.get() // double Types.DecimalType.of(9, 2) // decimal(9, 2) |
Вложенные типы (Nested types)
Структуры (struct), карты (map) и списки (list) создаются с помощью фабричных методов в классе типов.
Как и поля структур, ключи и значения map, а также элементы списка отслеживаются как nested fields (вложенные поля). Вложенные поля имеют собственные ID полей и признак допуска null.
Поля структур создаются с помощью NestedField.optional и NestedField.required.
Nullability значений map и элементов списка задаётся в фабричных методах map/list.
Примеры:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// struct<1 id: int, 2 data: optional string> StructType struct = Struct.of( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get()) ); // map<1 key: int, 2 value: optional string> MapType map = MapType.ofOptional( 1, 2, Types.IntegerType.get(), Types.StringType.get() ); // array<1 element: int> ListType list = ListType.ofRequired(1, IntegerType.get()); |
Expressions (Выражения)
Выражения Iceberg используются для настройки сканирования таблицы.
Для создания выражений применяются фабричные методы из класса Expressions.
Поддерживаемые предикаты:
isNullnotNullequalnotEquallessThanlessThanOrEqualgreaterThangreaterThanOrEqualinnotInstartsWithnotStartsWith
Поддерживаемые логические операции:
andornot
Константные выражения:
alwaysTruealwaysFalse
Привязка выражений (Expression binding)
При создании выражения являются непривязанными (unbound).
Прежде чем выражение будет использовано, оно должно быть привязано к конкретному типу данных:
- Iceberg определяет ID поля, которому соответствует имя в выражении
- Iceberg преобразует литералы предиката к типу соответствующего поля
Например, перед использованием выражения:
|
1 |
lessThan("x", 10) |
Iceberg должен определить:
- к какой колонке относится имя «x»
- нужно ли преобразовать 10 в тип этой колонки (например, long → int или наоборот)
Одно и то же выражение может быть успешно привязано к разным типам, например:
struct<1 x: long, 2 y: long>struct<11 x: int, 12 y: int>
Expression example:
|
1 2 3 |
table.newScan() .filter(Expressions.greaterThanOrEqual("x", 5)) .filter(Expressions.lessThan("x", 10)) |
Модули
Поддержка таблиц Iceberg организована в виде библиотечных модулей:
iceberg-common— содержит служебные (utility) классы, используемые другими модулямиiceberg-api— включает публичный API Iceberg: выражения, типы данных, таблицы и операцииiceberg-arrow— реализация системы типов Iceberg для чтения/записи данных таблиц Iceberg с использованием Apache Arrow как формата данных в памятиiceberg-aws— содержит реализации API Iceberg для работы с таблицами, размещёнными в AWS S3, и/или с каталогами AWS Glueiceberg-core— содержит реализации API Iceberg и поддержку файлов Avro; именно от этого модуля должны зависеть вычислительные движкиiceberg-parquet— опциональный модуль для работы с таблицами, основанными на Parquet-файлахiceberg-orc— опциональный модуль для работы с таблицами, основанными на ORC-файлах (экспериментальный)iceberg-hive-metastore— реализация таблиц Iceberg на основе Hive Metastore через Thrift-клиент
Проект Iceberg также содержит модули для интеграции Iceberg с вычислительными движками и инструментами:
iceberg-spark— реализация Datasource V2 API Spark для Iceberg, с подмодулями для разных версий Spark (для избегания конфликтов зависимостей используйте runtime jars)iceberg-flink— реализация Table API и DataStream API Flink для Iceberg (используйте iceberg-flink-runtime для shaded-версии)iceberg-mr— реализация InputFormat для MapReduce и Hive, а также SerDe для Iceberg (для Hive используйте iceberg-hive-runtime)iceberg-nessie— модуль для интеграции истории и операций метаданных Iceberg с Project Nessieiceberg-data— клиентская библиотека для чтения таблиц Iceberg из JVM-приложенийiceberg-runtime— генерирует shaded runtime jar для Spark для интеграции с таблицами Iceberg
Java API Quickstart
В разделе рассматриваются прикладные примеры управления таблицами, схемами и файловыми данными в Apache Iceberg с помощью Java. Рассматриваются такие аспекты, как создание таблиц, схем и спецификаций партиционирования, а также операции чтения и обновления данных.
Java API предоставляет программный способ управления метаданными и данными таблиц Iceberg из пользовательских Java-приложений. Это включает операции с схемами (schema), спецификациями партиционирования (partition spec) и datafiles.
В этом разделе рассматриваются основные операции, такие как создание таблиц и схем, настройка партиционирования, а также чтение и обновление данных. Таким образом, вы сможете быстро начать работать с данным API.
Создание таблицы (Create a table)
Как и в любом другом вычислительном движке, первый шаг при работе с таблицами Apache Iceberg — это создание каталога (catalog).
В общем случае при использовании Java API таблицы создаются через реализацию интерфейсов Catalog или Table.
В этой главе будет показано, как использовать два встроенных типа каталогов для создания таблиц Iceberg. Тем не менее, вы также можете реализовать и использовать собственный кастомный каталог.
Использование каталога Hive
Каталог Hive подключается к Hive Metastore, чтобы отслеживать Iceberg-таблицы.
Вы можете инициализировать Hive-каталог, указав имя и набор свойств.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import java.util.HashMap; import java.util.Map; import org.apache.iceberg.hive.HiveCatalog; HiveCatalog catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext().hadoopConfiguration()); // Можно использовать Hadoop-конфигурацию Spark Map<String, String> properties = new HashMap<String, String>(); properties.put("warehouse", "..."); properties.put("uri", "..."); catalog.initialize("hive", properties); |
HiveCatalog реализует интерфейс Catalog, который содержит методы для работы с таблицами:
createTable, loadTable, renameTable, dropTable.
Чтобы создать таблицу, нужно передать идентификатор (Identifier) и схему (Schema) вместе с другими начальными метаданными:
|
1 2 3 4 5 6 7 8 |
import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; TableIdentifier name = TableIdentifier.of("logging", "logs"); Table table = catalog.createTable(name, schema, spec); // или для загрузки существующей таблицы: Table table = catalog.loadTable(name); |
Использование Hadoop-каталога
Hadoop catalog не требует подключения к Hive Metastore, но может использоваться только с HDFS или аналогичными файловыми системами, которые поддерживают атомарный rename.
Параллельные записи с Hadoop-каталогом небезопасны при использовании локальной файловой системы или S3.
Чтобы создать Hadoop-каталог:
|
1 2 3 4 5 6 |
import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.hadoop.HadoopCatalog; Configuration conf = new Configuration(); String warehousePath = "hdfs://host:8020/warehouse_path"; HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath); |
Так же как и Hive-каталог, HadoopCatalog реализует интерфейс Catalog, поэтому он содержит такие методы, как createTable, loadTable, dropTable.
Пример создания таблицы с использованием Hadoop-каталога:
|
1 2 3 4 5 6 7 8 |
import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; TableIdentifier name = TableIdentifier.of("logging", "logs"); Table table = catalog.createTable(name, schema, spec); // или загрузка существующей таблицы: Table table = catalog.loadTable(name); |
Схема таблицы (schema) и спецификация партиционирования (spec) создаются далее.
Таблицы в Spark
Spark может работать с таблицами по имени, используя HiveCatalog.
|
1 2 3 |
// spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog // spark.sql.catalog.hive_prod.type = hive spark.table("logging.logs"); |
Spark также может загружать таблицу, созданную через HadoopCatalog, по пути:
|
1 |
spark.read.format("iceberg").load("hdfs://host:8020/warehouse_path/logging/logs"); |
Схемы (Schemas)
Создание схемы
Чтобы создать схему таблицы, необходимо создать объект класса Schema в Iceberg.
Пример создания схемы для таблицы logs:
|
1 2 3 4 5 6 7 8 9 |
import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; Schema schema = new Schema( Types.NestedField.required(1, "level", Types.StringType.get()), Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), Types.NestedField.required(3, "message", Types.StringType.get()), Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())) ); |
При использовании Iceberg API напрямую необходимо указывать ID типов.
При конвертации из других форматов (Spark, Avro, Parquet) новые ID назначаются автоматически.
При создании таблицы все ID в схеме переназначаются для обеспечения уникальности.
Конвертация схемы из Avro (Convert a schema from Avro)
Чтобы создать схему Iceberg на основе существующей Avro-схемы, используйте конвертеры из AvroSchemaUtil:
|
1 2 3 4 5 6 |
import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.iceberg.avro.AvroSchemaUtil; Schema avroSchema = new Parser().parse("{\"type\": \"record\" , ... }"); Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); |
Конвертация схемы из Spark (Convert a schema from Spark)
Чтобы создать схему Iceberg на основе существующей таблицы Spark, используйте SparkSchemaUtil:
|
1 2 3 |
import org.apache.iceberg.spark.SparkSchemaUtil; Schema schema = SparkSchemaUtil.schemaForTable(sparkSession, tableName); |
Партиционирование (Partitioning)
Спецификация партиционирования в Java API определяется с помощью builder-паттерна.
В Iceberg процесс построения спецификации партиционирования начинается с вызова
PartitionSpec.builderFor(schema), где schema — это схема таблицы, на основе которой формируется партиционирование.
Создание partition spec
Partition spec описывает, как Iceberg должен группировать записи в файлы данных.
Partition spec создаётся для схемы таблицы с помощью билдера.
Пример создания partition spec для таблицы logs, где записи партиционируются по часу (event_time) и уровню логов (level):
|
1 2 3 4 5 6 |
import org.apache.iceberg.PartitionSpec; PartitionSpec spec = PartitionSpec.builderFor(schema) .hour("event_time") .identity("level") .build(); |
Ветки и теги (Branching and Tagging)
Новые ветки и теги можно создавать через API ManageSnapshots в Java.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
/* Создаём ветку test-branch, которая будет храниться 1 неделю, при этом всегда сохраняются последние 2 снимка. Все снимки этой ветки, созданные за последний час, также будут сохранены. */ String branch = "test-branch"; table.manageSnapshots() .createBranch(branch, 3) .setMinSnapshotsToKeep(branch, 2) .setMaxSnapshotAgeMs(branch, 3600000) .setMaxRefAgeMs(branch, 604800000) .commit(); // Создаём тег historical-tag на snapshot 10, который хранится 1 день String tag = "historical-tag"; table.manageSnapshots() .createTag(tag, 10) .setMaxRefAgeMs(tag, 86400000) .commit(); |
Коммиты в ветки (Committing to branches)
Запись данных в ветку выполняется с помощью параметра toBranch в операции. Полный список доступен в UpdateOperations.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Добавляем FILE_A в ветку test-branch String branch = "test-branch"; table.newAppend() .appendFile(FILE_A) .toBranch(branch) .commit(); // Выполняем обновления на уровне строк в ветке "test-branch" table.newRowDelta() .addRows(DATA_FILE) .addDeletes(DELETES) .toBranch(branch) .commit(); // Выполняем перепаковку файлов: SMALL_FILE_1 и SMALL_FILE_2 заменяются на compactedFile в "test-branch" table.newRewrite() .rewriteFiles( ImmutableSet.of(SMALL_FILE_1, SMALL_FILE_2), ImmutableSet.of(compactedFile) ) .toBranch(branch) .commit(); |
Чтение из ветки или тега выполняется обычным способом через Table Scan API, добавив имя ветки/тега в метод useRef.
Если указана ветка, будет использован снимок (snapshot), который является головой этой ветки.
Важно: чтение одновременно из ветки и с использованием asOfSnapshotId пока не поддерживается.
|
1 2 3 4 5 |
// Чтение из головного snapshot ветки test-branch TableScan branchRead = table.newScan().useRef("test-branch"); // Чтение из snapshot, на который указывает тег audit-tag TableScan tagRead = table.newScan().useRef("audit-tag"); |
Снимки (snapshots), на которые указывают существующие ветки и теги, могут быть обновлены через методы replace.
Операция fast-forward похожа на fast-forward в Git: она позволяет продвинуть целевую ветку до головы исходной ветки или тега, если целевая ветка является предком исходной.
И при fast-forward, и при replace свойства ретеншена (retention) целевой ветки сохраняются по умолчанию.
|
1 2 3 4 5 6 7 8 9 10 11 |
// Обновить "test-branch", чтобы она указывала на snapshot 4 table.manageSnapshots() .replaceBranch(branch, 4) .commit(); String tag = "audit-tag"; // Заменить "audit-tag", чтобы он указывал на snapshot 4, и обновить его настройки ретенции table.manageSnapshots() .replaceBranch(tag, 4) .setMaxRefAgeMs(1000) .commit(); |
Обновление параметров ретеншена (Updating retention properties)
Параметры ретеншена для веток и тегов также могут быть обновлены.
Используйте setMaxRefAgeMs для изменения ретенции для самой ветки или тега.
Параметры ретеншена snapshot-ов внутри ветки обновляются через setMinSnapshotsToKeep и setMaxSnapshotAgeMs.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
String branch = "test-branch"; // Обновить параметры ретенции для test-branch table.manageSnapshots() .setMinSnapshotsToKeep(branch, 10) .setMaxSnapshotAgeMs(branch, 7200000) .setMaxRefAgeMs(branch, 604800000) .commit(); // Обновить параметры ретенции для test-tag table.manageSnapshots() .setMaxRefAgeMs("test-tag", 604800000) .commit(); |
Ветки и теги можно удалять с помощью методов removeBranch и removeTag соответственно.
|
1 2 3 4 5 6 7 8 9 |
// Удалить ветку test-branch table.manageSnapshots() .removeBranch("test-branch") .commit(); // Удалить тег test-tag table.manageSnapshots() .removeTag("test-tag") .commit(); |
Что такое партиционирование?
Партиционирование — это способ ускорить выполнение запросов, группируя похожие строки вместе при записи.
Например, запросы к таблице логов обычно включают фильтрацию по диапазону времени, как в этом запросе для логов между 10 и 12 часами:
|
1 2 |
SELECT level, message FROM logs WHERE event_time BETWEEN '2018-12-01 10:00:00' AND '2018-12-01 12:00:00'; |
Если настроить таблицу логов на партиционирование по дате event_time, то события будут записываться в файлы, сгруппированные по дате. Iceberg отслеживает это значение и пропускает файлы с другими датами, где нет нужных данных.
Iceberg умеет партиционировать временные метки по годам, месяцам, дням и часам. Он также может использовать категориальные столбцы, например level в примере логов, чтобы группировать строки и ускорять запросы.
Что Iceberg делает иначе?
Другие форматы таблиц, такие как Hive, поддерживают партиционирование, но Iceberg предлагает скрытое партиционирование:
- Iceberg берет на себя скучную и ошибкоопасную задачу вычисления значений партиций для строк.
- Iceberg автоматически избегает чтения ненужных партиций. Клиентам не нужно знать схему партиционирования и добавлять вспомогательные фильтры.
- Iceberg позволяет постепенно менять схему партиционирования по мере необходимости.
Партиционирование в Hive
Чтобы показать разницу, посмотрим, как Hive обрабатывает таблицу логов.
В Hive партиции явные и представлены столбцом, поэтому в таблице логов будет столбец event_date. При записи нужно явно указывать значение для этого столбца:
|
1 2 3 |
INSERT INTO logs PARTITION (event_date) SELECT level, message, event_time, format_time(event_time, 'YYYY-MM-dd') FROM unstructured_log_source; |
Так же и запросы должны включать фильтр по event_date в дополнение к фильтру по event_time:
|
1 2 3 |
SELECT level, count(1) as count FROM logs WHERE event_time BETWEEN '2018-12-01 10:00:00' AND '2018-12-01 12:00:00' AND event_date = '2018-12-01'; |
Если фильтр event_date упустить, Hive будет сканировать все файлы таблицы, потому что не знает, что event_time связан с event_date.
Проблемы Hive-партиционирования
Hive необходимо явно передавать значения партиций. В примере с логами система не знает связи между event_time и event_date.
Отсюда возникают проблемы:
- Hive не может проверить корректность значений партиций — ответственность на стороне writer’а
- неверный формат (2018-12-01 вместо 20181201) приводит к тихим неправильным результатам
- неверный источник данных или часовой пояс также приводит к ошибкам без предупреждений
- Пользователь обязан писать запросы корректно
- неверный формат фильтра снова приводит к тихим ошибкам
- пользователи, не понимающие физическую схему таблицы, получают медленные запросы — Hive не может преобразовать фильтры автоматически
- Рабочие запросы привязаны к схеме партиционирования, поэтому изменить её без поломки запросов невозможно
Скрытое партиционирование Iceberg
Iceberg вычисляет значения партиций, беря значение колонки и по желанию применяя трансформацию. Iceberg отвечает за преобразование event_time → event_date и хранит эту связь.
Партиционирование таблицы задаётся через такие связи. Например, таблица логов будет партиционироваться по day(event_time) и level.
Поскольку Iceberg не требует явных колонок для партиций, он может скрывать партиционирование:
- значения партиций всегда вычисляются корректно
- они автоматически используются для ускорения запросов
- пользователи и процессоры данных вообще не видят
event_date
Главное: запросы больше не зависят от физической структуры таблицы. Iceberg отделяет логическое представление данных от физического.
Благодаря этому Iceberg поддерживает эволюцию партиционирования: схему можно менять по мере роста объёма данных, а некорректную конфигурацию — исправлять без дорогих миграций.
Custom Catalog
Iceberg-таблицу можно читать как по пути в HDFS, так и как таблицу Hive. Также можно использовать custom metastore вместо Hive. Для этого нужно выполнить следующие шаги:
- Custom TableOperations
- Custom Catalog
- Custom FileIO
- Custom LocationProvider
- Custom IcebergSource
Реализация пользовательских операций с таблицей (Custom TableOperations)
Нужно расширить BaseMetastoreTableOperations и реализовать логику чтения и записи метаданных.
Пример:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
class CustomTableOperations extends BaseMetastoreTableOperations { private String dbName; private String tableName; private Configuration conf; private FileIO fileIO; protected CustomTableOperations(Configuration conf, String dbName, String tableName) { this.conf = conf; this.dbName = dbName; this.tableName = tableName; } // Метод doRefresh должен реализовывать получение пути к metadata.json @Override public void doRefresh() { // Пример: кастомный сервис, который возвращает путь к метаданным по имени БД и таблицы String metadataLocation = CustomService.getMetadataForTable(conf, dbName, tableName); // При обновлении из файла метаданных используем вспомогательный метод refreshFromMetadataLocation(metadataLocation); } // Метод doCommit должен атомарно обновлять путь к метаданным @Override public void doCommit(TableMetadata base, TableMetadata metadata) { String oldMetadataLocation = base.location(); // Записываем новые метаданные String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); // Пример: кастомный сервис, который атомарно обновляет путь к метаданным CustomService.updateMetadataLocation(dbName, tableName, oldMetadataLocation, newMetadataLocation); } // Метод io возвращает FileIO для чтения и записи файлов метаданных @Override public FileIO io() { if (fileIO == null) { fileIO = new HadoopFileIO(conf); } return fileIO; } } |
Обычно экземпляр TableOperations создаётся через вызов Catalog.newTableOps(TableIdentifier).
Реализация пользовательского каталога (Custom catalog implementation)
Расширьте BaseMetastoreCatalog, чтобы предоставить базовые пути хранилища (warehouse locations) и создать экземпляры CustomTableOperations.
Пример:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
public class CustomCatalog extends BaseMetastoreCatalog { private Configuration configuration; // должен иметь конструктор без аргументов для динамической загрузки // initialize(String name, Map<String, String> properties) будет вызван для завершения инициализации public CustomCatalog() { } public CustomCatalog(Configuration configuration) { this.configuration = configuration; } @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); // создание экземпляра CustomTableOperations return new CustomTableOperations(configuration, dbName, tableName); } @Override protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { // можно использовать любое другое имя конфигурации String tableLocation = configuration.get("custom.iceberg.warehouse.location"); // может быть путь s3 или hdfs if (tableLocation == null) { throw new RuntimeException("custom.iceberg.warehouse.location configuration not set!"); } return String.format( "%s/%s.db/%s", tableLocation, tableIdentifier.namespace().levels()[0], tableIdentifier.name()); } @Override public boolean dropTable(TableIdentifier identifier, boolean purge) { // Пример сервиса для удаления таблицы CustomService.deleteTable(identifier.namespace().level(0), identifier.name()); } @Override public void renameTable(TableIdentifier from, TableIdentifier to) { Preconditions.checkArgument(from.namespace().level(0).equals(to.namespace().level(0)), "Cannot move table between databases"); // Пример сервиса для переименования таблицы CustomService.renameTable(from.namespace().level(0), from.name(), to.name()); } // реализация этого метода для чтения имени каталога и свойств при инициализации public void initialize(String name, Map<String, String> properties) { } } |
Реализации каталогов могут динамически загружаться в большинстве вычислительных движков.
- В Spark и Flink вы можете указать свойство каталога
catalog-impl, чтобы загрузить свою реализацию. - В MapReduce необходимо реализовать
org.apache.iceberg.mr.CatalogLoaderи указать Hadoop-свойствоiceberg.mr.catalog.loader.class, чтобы загрузить каталог. - Если вашему каталогу требуется читать Hadoop-конфигурацию для доступа к параметрам окружения, сделайте его реализацию
org.apache.hadoop.conf.Configurable.
Реализация пользовательского ввода/вывода файлов (Custom File IO implementation)
Расширьте интерфейс FileIO и реализуйте собственную логику для чтения и записи файлов данных.
Пример:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public class CustomFileIO implements FileIO { // должен иметь конструктор без аргументов для динамической загрузки // метод initialize(Map<String, String> properties) будет вызван для завершения инициализации public CustomFileIO() { } @Override public InputFile newInputFile(String s) { // вам также нужно реализовать интерфейс InputFile для пользовательского входного файла return new CustomInputFile(s); } @Override public OutputFile newOutputFile(String s) { // вам также нужно реализовать интерфейс OutputFile для пользовательского выходного файла return new CustomOutputFile(s); } @Override public void deleteFile(String path) { Path toDelete = new Path(path); FileSystem fs = Util.getFs(toDelete); try { fs.delete(toDelete, false /* не рекурсивно */); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to delete file: %s", path); } } // реализуйте этот метод, чтобы прочитать свойства каталога при инициализации public void initialize(Map<String, String> properties) { } } |
Если вы уже разрабатываете собственный каталог, вы можете реализовать метод TableOperations.io(), чтобы использовать свой CustomFileIO.
Кроме того, пользовательские реализации FileIO могут динамически загружаться в HadoopCatalog и HiveCatalog, если указать свойство каталога io-impl.
Если вашему FileIO необходимо читать Hadoop-конфигурацию для доступа к параметрам среды,
сделайте вашу реализацию наследником org.apache.hadoop.conf.Configurable.
Реализация пользовательского провайдера локаций (Custom location provider implementation)
Расширьте LocationProvider и реализуйте собственную логику определения пути, по которому будут записываться файлы данных.
Пример:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public class CustomLocationProvider implements LocationProvider { private String tableLocation; // должен иметь конструктор с 2 аргументами или конструктор без аргументов public CustomLocationProvider(String tableLocation, Map<String, String> properties) { this.tableLocation = tableLocation; } @Override public String newDataLocation(String filename) { // можно использовать любой метод для генерации пути по имени файла return String.format("%s/%s/%s", tableLocation, UUID.randomUUID().toString(), filename); } @Override public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { // можно использовать любой метод для генерации пути с учётом информации о партиции return newDataLocation(filename); } } |
Если вы уже реализуете собственный каталог, вы можете переопределить
TableOperations.locationProvider() и указать свой CustomLocationProvider как стандартный.
Чтобы использовать другой кастомный провайдер локаций для конкретной таблицы, укажите его при создании таблицы через свойство:
write.location-provider.impl
Пример:
|
1 2 3 4 5 6 7 8 9 |
CREATE TABLE hive.default.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.location-provider.impl'='com.my.CustomLocationProvider' ) PARTITIONED BY (category); |
Custom IcebergSource
Расширьте IcebergSource и реализуйте логику чтения таблицы из CustomCatalog.
Пример:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class CustomIcebergSource extends IcebergSource { @Override protected Table findTable(DataSourceOptions options, Configuration conf) { Optional<String> path = options.get("path"); Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set"); // Читаем таблицу из CustomCatalog CustomCatalog catalog = new CustomCatalog(conf); TableIdentifier tableIdentifier = TableIdentifier.parse(path.get()); return catalog.loadTable(tableIdentifier); } } |
Зарегистрируйте CustomIcebergSource, добавив его полное имя класса в файл:
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister











Leave a Reply