Apache Iceberg Specification

Contents

Документация Apache Iceberg Latest (1.10.0)

Apache Iceberg — это открытый табличный формат для огромных аналитических наборов данных. Iceberg добавляет поддержку таблиц в вычислительные движки, включая Spark, Trino, PrestoDB, Flink, Hive и Impala, используя высокопроизводительный табличный формат, который работает как обычная SQL-таблица.

Пользовательский опыт

Iceberg избегает неприятных сюрпризов. Эволюция схемы работает корректно и не приводит к случайному восстановлению удалённых данных. Пользователям не нужно знать о партиционировании, чтобы получать быстрые запросы.

  • Эволюция схемы поддерживает добавление, удаление, обновление и переименование полей, и не имеет побочных эффектов
  • Скрытое партиционирование предотвращает ошибки пользователей, которые могут вызвать незаметно некорректные результаты или крайне медленные запросы
  • Эволюция партиционного layout-а позволяет обновлять схему партиционирования по мере роста объёма данных или изменения шаблонов запросов
  • Путешествия во времени (time travel) позволяют выполнять воспроизводимые запросы, используя точный снимок таблицы, или легко изучать её изменения
  • Откат версии (rollback) позволяет быстро исправлять проблемы, возвращая таблицу в корректное состояние

Надёжность и производительность

Iceberg создан для огромных таблиц. Iceberg используется в продакшене, где одна таблица может содержать десятки петабайт данных, и даже такие таблицы можно читать без распределённого SQL-движка.

  • Планирование сканов очень быстрое — распределённый SQL-движок не требуется, чтобы прочитать таблицу или найти файлы
  • Продвинутые фильтры — файлы данных отбрасываются по партиционным и колонным статистикам, используя метаданные таблицы

Iceberg разработан для решения проблем корректности в облачных хранилищах с eventual consistency

Работает с любым облачным хранилищем и снижает нагрузку на NameNode в HDFS, избегая операций listing и rename

  • Сериализуемая изоляция — изменения таблицы атомарны, и читатели никогда не видят частично записанные или незафиксированные данные
  • Одновременные записи несколькими клиентами используют оптимистичную конкуренцию — операции будут ретраиться, чтобы убедиться, что совместимые обновления завершаются, даже при коллизиях.

Открытый стандарт

Iceberg спроектирован как открытый стандарт сообщества со спецификацией, обеспечивающей совместимость между языками и реализациями.

Apache Iceberg — это open-source проект, развиваемый под эгидой Apache Software Foundation.

Tables Concept

Branching and Tagging

Обзор

Метаданные таблицы Iceberg содержат журнал снимков (snapshot log), который отражает все изменения, применённые к таблице.

Снимки (snapshots) — фундаментальная часть Iceberg, так как на них основаны:

  • изоляция читателей,
  • запросы «путешествия во времени» (time travel).

Чтобы контролировать размер метаданных и затраты на хранение, Iceberg предоставляет механизмы управления жизненным циклом снимков, например, процедуру expire_snapshots, которая удаляет неиспользуемые снимки и больше не нужные файлы данных согласно политике удержания снимков таблицы.

Для более продвинутого управления жизненным циклом Iceberg поддерживает ветки (branches) и теги (tags) — именованные ссылки на снимки с собственными независимыми жизненными циклами.

Этот жизненный цикл регулируется политиками удержания на уровне веток и тегов.

  • Ветки — это независимые линии снимков, указывающие на вершину своей истории.
  • Теги — именованные ссылки на конкретные снимки.

У тегов и веток есть свойство максимальный возраст ссылки, определяющее, когда ссылка на снимок должна быть удалена.

Для веток также можно настроить:

  • минимальное количество снимков, которые нужно сохранять;
  • максимальный возраст отдельных снимков.

Эти параметры используются процедурой expireSnapshots.

Сценарии использования

Ветки и теги применяются:

  • для соблюдения требований GDPR,
  • для сохранения важных исторических снимков для аудита,
  • в рабочих процессах data engineering — например, создание экспериментальных веток для тестирования новых джобов.

Исторические теги

Теги можно использовать для сохранения важных исторических снимков для аудита.

Пример политик: Сохранять 1 снимок в неделю в течение месяца

Создаём тег для еженедельного снимка, с удержанием в течение 7 дней:

Сохранять 1 снимок в месяц в течение 6 месяцев

Сохранять 1 снимок в год навсегда

По умолчанию теги сохраняются бессрочно:

Создать временную тестовую ветку на 7 дней и хранить последние 2 снимка

Аудиторская ветка (Audit Branch)

Этот пример показывает, как использовать аудиторскую ветку для проверки (валидации) процесса записи.

Сначала необходимо включить режим WAP (write-audit-publish):

Создаём ветку audit-branch начиная с снимка 3, удерживаем её неделю:

Записи выполняются вне основной истории таблицы, только в ветке audit-branch:

Далее выполняется валидация данных (например, проверка качества).

После успешной проверки основную ветку можно перемотать вперёд к состоянию audit-branch:

Спустя неделю, при выполнении expireSnapshots, ветка будет удалена автоматически.

Использование branches и tags

Создание, чтение и запись в ветки (branches) и теги (tags) поддерживаются в:

  • Java-библиотеке Iceberg,
  • интеграциях Spark,
  • интеграциях Flink.

Доступные разделы:

  • Iceberg Java Library
  • Spark DDLs
  • Spark Reads
  • Spark Branch Writes
  • Flink Reads
  • Flink Branch Writes

Выбор схемы при работе с ветками и тегами🔗

Важно понимать:

1. Схема таблицы единая для всех веток.

При работе с ветками используется схема таблицы, потому что именно она применяется для валидации записей в ветку.

2. При запросах тега используется схема снимка (snapshot)

При запросе через тег используется схема, которая была актуальна для этого снимка в момент его создания.

Ниже приведены примеры, показывающие, какая схема используется.

Создадим таблицу и вставим данные:

Результат:

Создадим ветку test_branch, указывающую на текущий снимок, и прочитаем данные:

Результат:

Изменим схему таблицы. Удалим колонку и добавим новую:

Теперь таблица содержит:

Чтение “головы” ветки будет использовать текущую схему таблицы:

Оба запроса дают:

Запрос по ID снимка использует схему снимка:

Пример результата:

Запрос по snapshot ID:

Результат (старые данные со старой схемой):

При записи в ветку используется схема таблицы:

Результат:

 

Configuration

Свойства таблицы

Таблицы Iceberg поддерживают свойства таблицы (table properties), которые используются для настройки поведения таблицы — например, размера сплитов по умолчанию для читателей.

Read properties

Property Default Description (перевод)
read.split.target-size 134217728 (128 MB) Целевой размер при объединении входных сплитов данных
read.split.metadata-target-size 33554432 (32 MB) Целевой размер при объединении входных сплитов метаданных
read.split.planning-lookback 10 Количество корзин (bins), рассматриваемых при объединении сплитов
read.split.open-file-cost 4194304 (4 MB) Оценочная стоимость открытия файла, используется как минимальный вес при объединении сплитов
read.parquet.vectorization.enabled true Управляет использованием векторизированного чтения Parquet
read.parquet.vectorization.batch-size 5000 Размер batch для векторизированного чтения Parquet
read.orc.vectorization.enabled false Управляет использованием векторизированного чтения ORC
read.orc.vectorization.batch-size 5000 Размер batch для векторизированного чтения ORC

Write properties

Property Default Description (перевод)
write.format.default parquet Формат файлов по умолчанию для таблицы; parquet, avro или orc
write.delete.format.default data file format Формат файлов удаления по умолчанию; parquet, avro или orc
write.parquet.row-group-size-bytes 134217728 (128 MB) Размер row group для Parquet
write.parquet.page-size-bytes 1048576 (1 MB) Размер страницы Parquet
write.parquet.page-row-limit 20000 Лимит строк на страницу Parquet
write.parquet.dict-size-bytes 2097152 (2 MB) Размер dictionary page для Parquet
write.parquet.compression-codec zstd Кодек сжатия Parquet
write.parquet.compression-level null Уровень сжатия Parquet
write.parquet.bloom-filter-enabled.column.col1 (not set) Подсказка Parquet записывать Bloom-фильтр для столбца col1
write.parquet.bloom-filter-max-bytes 1048576 (1 MB) Максимальный размер Bloom-фильтра в байтах
write.parquet.bloom-filter-fpp.column.col1 0.01 Вероятность ложного срабатывания Bloom-фильтра для col1
write.parquet.stats-enabled.column.col1 (not set) Управляет сбором статистики Parquet для столбца col1
write.avro.compression-codec gzip Кодек сжатия Avro
write.avro.compression-level null Уровень сжатия Avro
write.orc.stripe-size-bytes 67108864 (64 MB) Размер stripe в ORC
write.orc.block-size-bytes 268435456 (256 MB) Размер блока ФС для ORC
write.orc.compression-codec zlib Кодек сжатия ORC
write.orc.compression-strategy speed Стратегия сжатия ORC: скорость или степень сжатия
write.orc.bloom.filter.columns (not set) Список столбцов, для которых создаётся Bloom-фильтр
write.orc.bloom.filter.fpp 0.05 Вероятность ложного срабатывания Bloom-фильтра
write.location-provider.impl null Необязательная пользовательская реализация LocationProvider
write.metadata.compression-codec none Кодек сжатия метаданных; none или gzip
write.metadata.metrics.max-inferred-column-defaults 100 Максимальное число столбцов, для которых собираются метрики
write.metadata.metrics.default truncate(16) Режим метрик по умолчанию: none, counts, truncate(length) или full
write.metadata.metrics.column.col1 (not set) Режим метрик для столбца col1
write.target-file-size-bytes 536870912 (512 MB) Желательный размер генерируемых файлов
write.delete.target-file-size-bytes 67108864 (64 MB) Желательный размер файлов удаления
write.distribution-mode not set Определяет распределение данных при записи
write.delete.distribution-mode (not set) Распределение данных при записи delete
write.update.distribution-mode (not set) Распределение данных при записи update
write.merge.distribution-mode (not set) Распределение данных при merge
write.wap.enabled false Включает режим write-audit-publish
write.summary.partition-limit 0 Включает статистику разделов, если их меньше заданного лимита
write.metadata.delete-after-commit.enabled false Удалять ли старые файлы метаданных после каждого коммита
write.metadata.previous-versions-max 100 Максимум старых версий метаданных для хранения
write.spark.fanout.enabled false Включает Spark fanout writer (не требует кластеризации данных)
write.object-storage.enabled false Включает размещение файлов с хеш-компонентом в путях
write.object-storage.partitioned-paths true Включает добавление значений партиции в путь
write.data.path table location + /data Базовый путь для файлов данных
write.metadata.path table location + /metadata Базовый путь для файлов метаданных
write.delete.mode copy-on-write Режим выполнения delete: copy-on-write или merge-on-read
write.delete.isolation-level serializable Уровень изоляции delete
write.update.mode copy-on-write Режим выполнения update
write.update.isolation-level serializable Уровень изоляции update
write.merge.mode copy-on-write Режим выполнения merge
write.merge.isolation-level serializable Уровень изоляции merge
write.delete.granularity partition Гранулярность файлов удаления: partition или file

Свойства, управляющие поведением таблицы

Property Default Description (перевод)
commit.retry.num-retries 4 Количество попыток повторить commit перед ошибкой
commit.retry.min-wait-ms 100 Минимальное время ожидания (мс) перед повторной попыткой commit
commit.retry.max-wait-ms 60000 (1 min) Максимальное время ожидания (мс) между повторными попытками commit
commit.retry.total-timeout-ms 1800000 (30 min) Общий таймаут (мс), в течение которого commit может перезапускаться
commit.status-check.num-retries 3 Количество повторных проверок статуса commit после потери соединения
commit.status-check.min-wait-ms 1000 (1s) Минимальное ожидание (мс) перед повтором проверки статуса
commit.status-check.max-wait-ms 60000 (1 min) Максимальное ожидание (мс) между проверками статуса
commit.status-check.total-timeout-ms 1800000 (30 min) Общий таймаут (мс) для успешной проверки состояния commit
commit.manifest.target-size-bytes 8388608 (8 MB) Целевой размер объединённых manifest-файлов
commit.manifest.min-count-to-merge 100 Минимальное число manifest-файлов для запуска их слияния
commit.manifest-merge.enabled true Управляет автоматическим объединением manifest-файлов при записи
history.expire.max-snapshot-age-ms 432000000 (5 days) Максимальный возраст snapshot’ов, которые хранятся при очистке истории
history.expire.min-snapshots-to-keep 1 Минимальное количество snapshot’ов, которые всегда сохраняются
history.expire.max-ref-age-ms Long.MAX_VALUE (forever) Для ссылок на snapshot’ы (кроме ветки main): максимальный возраст ссылок при очистке истории. Ветка main никогда не очищается

Reserved table properties (зарезервированные свойства таблицы)

Зарезервированные свойства таблицы используются только для управления поведением при создании или обновлении таблицы. Они не сохраняются как часть метаданных таблицы.

Property Default Description (перевод)
format-version 2 Версия формата таблицы (1 или 2), как описано в спецификации. По умолчанию 2 начиная с версии Iceberg 1.4.0.

Compatibility flags (флаги совместимости)

Property Default Description (перевод)
compatibility.snapshot-id-inheritance.enabled false Разрешает коммиты snapshot’ов без явного указания их snapshot ID (всегда true для формата версии > 1).

Catalog properties (свойства каталога)

Каталоги Iceberg поддерживают свойства, которые управляют их поведением. Ниже — список наиболее часто используемых свойств.

Property Default Description (перевод)
catalog-impl null Пользовательская реализация Catalog, которую должен использовать движок
io-impl null Пользовательская реализация FileIO для использования в каталоге
warehouse null Корневой путь data warehouse
uri null Строка URI, например URI Hive Metastore
clients 2 Размер пула клиентов
cache-enabled true Включено ли кеширование объектов каталога
cache.expiration-interval-ms 30000 Время локального кеширования записей каталога (мс); 0 — выключить кеш, отрицательное значение — выключить истечение срока
metrics-reporter-impl org.apache.iceberg.metrics.LoggingMetricsReporter Кастомная реализация MetricsReporter. См. раздел о метриках

HadoopCatalog и HiveCatalog могут получать доступ к этим свойствам в своих конструкторах.
Любой другой кастомный каталог может получить доступ к свойствам, реализовав:

Catalog.initialize(catalogName, catalogProperties).

Свойства можно:

  • собрать вручную,
  • или передать из вычислительного движка, такого как Spark или Flink.

Spark использует session properties как catalog properties.
Flink передаёт свойства каталога через команду CREATE CATALOG — подробнее в разделе Flink.

Lock catalog properties (Свойства каталога, связанные с блокировками)

Ниже представлены свойства каталога, связанные с механизмом блокировок.
Они используются некоторыми реализациями каталога для управления поведением блокировок во время коммитов.

Property Default Description (перевод)
lock-impl null Пользовательская реализация менеджера блокировок; конкретный интерфейс зависит от выбранного каталога
lock.table null Вспомогательная таблица для механизма блокировок, например в AWS DynamoDB lock manager
lock.acquire-interval-ms 5000 (5 s) Интервал ожидания между попытками получения блокировки
lock.acquire-timeout-ms 180000 (3 min) Максимальное время попыток получить блокировку
lock.heartbeat-interval-ms 3000 (3 s) Интервал отправки heartbeat после получения блокировки
lock.heartbeat-timeout-ms 15000 (15 s) Максимальное время без heartbeat, после которого блокировка считается просроченной

Hadoop configuration (Настройки Hadoop)

Следующие свойства Hadoop используются коннектором Hive Metastore. Блокировка таблиц в HMS — это двухэтапный процесс:

  • Создание блокировки: создать блокировку в Hive Metastore и поставить её в очередь на получение
  • Проверка блокировки: проверять, была ли блокировка успешно получена
Property Default Description (перевод)
iceberg.hive.client-pool-size 5 Размер пула Hive-клиентов при отслеживании таблиц в HMS
iceberg.hive.lock-creation-timeout-ms 180000 (3 min) Максимальное время (мс), отводимое на создание блокировки в HMS
iceberg.hive.lock-creation-min-wait-ms 50 Минимальное время (мс) между повторными попытками создания блокировки
iceberg.hive.lock-creation-max-wait-ms 5000 Максимальное время (мс) между повторными попытками создания блокировки
iceberg.hive.lock-timeout-ms 180000 (3 min) Максимальное время (мс), отводимое на получение блокировки
iceberg.hive.lock-check-min-wait-ms 50 Минимальное время (мс) между проверками получения блокировки
iceberg.hive.lock-check-max-wait-ms 5000 Максимальное время (мс) между проверками получения блокировки
iceberg.hive.lock-heartbeat-interval-ms 240000 (4 min) Интервал heartbeat для блокировок HMS
iceberg.hive.metadata-refresh-max-retries 2 Максимальное число повторных попыток, если отсутствует metadata-файл
iceberg.hive.table-level-lock-evict-ms 600000 (10 min) Таймаут JVM-блокировки таблицы
iceberg.engine.hive.lock-enabled true Использовать ли HMS-блокировки для обеспечения атомарности коммитов

Note (Примечание)

Свойства iceberg.hive.lock-check-max-wait-ms и iceberg.hive.lock-heartbeat-interval-ms должны быть меньше, чем timeout транзакций Hive Metastore (hive.txn.timeout или metastore.txn.timeout).

Иначе heartbeat блокировок будет истекать в Hive Metastore раньше, чем Iceberg попытается повторно получить блокировку.

Warn (Предупреждение)

Если установить:

то HiveCatalog будет делать коммиты без использования Hive-блокировок.

Это можно делать только если ВСЕ следующие условия выполнены:

  • На сервере Hive Metastore доступен фикс HIVE-26882
  • Доступен фикс HIVE-28121, если HMS использует MySQL/MariaDB
  • Все остальные HiveCatalog, коммитящие в те же таблицы, работают на Iceberg 1.3+
  • Все остальные HiveCatalog, записывающие в те же таблицы, также отключили Hive-блокировки

Если эти условия не соблюдены — есть высокий риск повредить таблицу.

Дополнительно

Даже если:

в конкретной таблице можно включить блокировки, установив свойство:

Это полезно, если другие каталоги Hive не могут быть обновлены и требуют блокировок.

Evolution (Эволюция)

Iceberg поддерживает эволюцию таблиц без переписывания данных.
Вы можете изменять схему таблицы так же, как в SQL — включая вложенные структуры — или менять схему партиционирования по мере роста объёма данных.
Iceberg не требует дорогих операций, таких как переписывание всего датасета или перенос данных в новую таблицу.

Например, в Hive схема партиционирования не может быть изменена. Переход от дневных партиций к часовым требует создания новой таблицы. Так как запросы привязаны к партициям, приходится менять SQL-запросы под новую структуру. В некоторых системах даже такие простые изменения, как переименование колонки, могут быть невозможны или приводить к ошибкам в данных.

Schema evolution (Эволюция схемы)

Iceberg поддерживает следующие изменения схемы:

  • Add — добавить новую колонку в таблицу или во вложенную структуру
  • Drop — удалить существующую колонку
  • Rename — переименовать колонку или поле внутри вложенной структуры
  • Update — расширить тип (например, int → long) для колонки, поля структуры, ключей и значений map или элементов списка
  • Reorder — изменить порядок колонок или полей структуры

Все эти операции — метаданные, поэтому переписывать файлы не нужно.

Примечание: ключи map не поддерживают добавление и удаление полей структуры — это может нарушить семантику равенства.

Correctness (Корректность)

Iceberg гарантирует, что изменения схемы независимы и не имеют побочных эффектов.
При этом файлы данных не переписываются:

  1. При добавлении колонки её значения никогда не читаются из другой колонки.
  2. Удаление колонки не изменяет значения в других колонках.
  3. Обновление типа колонки не изменяет других колонок.
  4. Перестановка колонок не меняет данные, связанные с именами колонок.

Iceberg использует уникальные ID для колонок, чтобы избежать ошибок.

Сравнение с форматами, где ID нет:

  • Форматы, отслеживающие колонки по имени, могут случайно «воскресить» удалённую колонку, если имя повторно используется — нарушение пункта 1.
  • Форматы, отслеживающие по позиции, не могут удалить колонку без смещения последующих имён — нарушение пункта 2.

Partition evolution (Эволюция партиционирования)

Партиционирование таблицы Iceberg может быть обновлено в существующей таблице, потому что запросы не ссылаются напрямую на значения партиций.

Когда вы изменяете спецификацию партиционирования, старые данные, записанные с использованием предыдущей спецификации, остаются неизменными. Новые данные записываются с использованием новой спецификации в новой схеме. Метаданные для каждой версии партиционирования хранятся отдельно. Из-за этого при выполнении запросов возникает раздельное планирование. Это означает, что каждая схема партиционирования планирует файлы отдельно, используя фильтр, который она выводит для своей конкретной схемы партиционирования. Ниже приведено визуальное представление условного примера:

Диаграмма эволюции партиционирования. Данные за 2008 год партиционированы по месяцам. Начиная с 2009 года таблица обновляется так, что данные вместо этого партиционируются по дням. Обе схемы партиционирования могут сосуществовать в одной таблице.

Iceberg использует скрытое партиционирование, поэтому вам не нужно писать запросы, ориентированные на конкретную схему партиционирования, чтобы они выполнялись быстро. Вместо этого можно писать запросы, выбирающие нужные данные, а Iceberg автоматически исключает файлы, которые не содержат подходящих данных.

Эволюция партиционирования является операцией на уровне метаданных и не выполняет немедленного переписывания файлов.

Java API таблиц Iceberg предоставляет API updateSpec для обновления спецификации партиционирования. Например, следующий код может быть использован для обновления спецификации, чтобы добавить новое поле партиционирования, которое распределяет значения колонки id по 8 бакетам, и удалить существующее поле партиционирования category:

Spark также поддерживает изменение partition spec через ALTER TABLE.

Sort order evolution (Эволюция порядка сортировки)

Аналогично спецификации партиционирования, порядок сортировки Iceberg также может быть обновлен в существующей таблице. Когда вы изменяете порядок сортировки, старые данные, записанные с использованием предыдущего порядка, остаются неизменными. Вычислительные движки всегда могут выбирать, записывать данные в соответствии с последним порядком сортировки или без сортировки, если сортировка слишком затратна.

Java API таблиц Iceberg предоставляет API replaceSortOrder для обновления порядка сортировки. Например, следующий код может быть использован для создания нового порядка сортировки, где колонка id сортируется по возрастанию с значениями null в конце, а колонка category сортируется по убыванию со значениями null в начале:

Пример API на Java:

Spark также поддерживает смену sort order через ALTER TABLE.

Maintenance

Информация
Операции maintenance обслуживания требуют экземпляр Table

Истечение срока действия снимков (Expire Snapshots)

Каждая запись в таблицу Iceberg создаёт новый snapshot, или версию таблицы. Снимки могут использоваться для запросов с путешествием во времени (time travel), или таблица может быть откатана к любому действительному snapshot’у.

Снимки накапливаются до тех пор, пока не будут удалены операцией expireSnapshots. Регулярное удаление устаревших снимков рекомендуется, чтобы удалить файлы данных, которые больше не нужны, и сохранить небольшой размер метаданных таблицы.

В этом примере удаляются снимки, которым больше 1 дня:

Также существует Spark-действие, которое может выполнять удаление снимков параллельно для больших таблиц:

Удаление старых снимков исключает их из метаданных, поэтому они больше недоступны для запросов с путешествием во времени.

Информация
Файлы данных не удаляются до тех пор, пока на них ссылается хотя бы один snapshot, который может быть использован для time travel или отката. Регулярное удаление снимков позволяет удалять неиспользуемые файлы данных.

Удаление старых файлов метаданных (Remove old metadata files)

Iceberg отслеживает метаданные таблицы с помощью JSON-файлов. Каждое изменение в таблице создаёт новый файл метаданных для обеспечения атомарности.

По умолчанию старые файлы метаданных сохраняются для истории. Таблицы с частыми коммитами, например те, что пишутся потоковыми заданиями, могут нуждаться в регулярной очистке файлов метаданных.

Каждый файл метаданных отслеживает более старые файлы метаданных в поле metadata-log. Количество отслеживаемых файлов метаданных определяется параметром write.metadata.previous-versions-max.

Чтобы автоматически удалять более старые файлы метаданных, установите write.metadata.delete-after-commit.enabled=true в свойствах таблицы. Это сохранит часть файлов метаданных как отслеживаемые (до write.metadata.previous-versions-max) и будет удалять самый старый файл метаданных каждый раз при создании нового. Обратите внимание, что это удалит только файлы метаданных, которые отслеживаются в журнале метаданных, и не удалит «осиротевшие» файлы метаданных.

Неотслеживаемые файлы метаданных также удаляются как часть процедуры удаления orphan-файлов.

Property Default Description
write.metadata.delete-after-commit.enabled false Управляет тем, следует ли удалять самые старые отслеживаемые версии файлов метаданных после каждого коммита таблицы
write.metadata.previous-versions-max 100 Максимальное количество предыдущих версий файлов метаданных для отслеживания

Примеры:

  • С write.metadata.delete-after-commit.enabled=false и write.metadata.previous-versions-max=10 после 100 коммитов будет 10 отслеживаемых файлов метаданных и 90 «осиротевших» файлов метаданных. Эти 90 «осиротевших» файлов метаданных нельзя удалить, установив write.metadata.delete-after-commit.enabled=true, потому что они уже не отслеживаются. Их можно очистить только с помощью процедуры orphan-file deletion.
  • С write.metadata.delete-after-commit.enabled=true и write.metadata.previous-versions-max=20 после 21 коммита будет 20 отслеживаемых файлов метаданных, при этом самый старый файл метаданных будет удалён писателем после коммита. С каждым последующим коммитом самый старый файл метаданных будет удаляться.

Удаление orphan-файлов

В Spark и других распределённых вычислительных движках сбои задач или джобов могут оставлять файлы, которые не упоминаются в метаданных таблицы, и в некоторых случаях обычное удаление старых снепшотов не может определить, что файл больше не нужен, и удалить его.

Чтобы очистить такие «orphan»-файлы в директории таблицы, используйте действие deleteOrphanFiles.

Это действие может занять много времени, если у вас много файлов в директориях data и metadata. Рекомендуется выполнять его периодически, но частое выполнение обычно не требуется.

Info

Опасно удалять orphan-файлы с интервалом хранения короче времени, необходимого для завершения любой операции записи, потому что это может повредить таблицу, если файлы, находящиеся в процессе записи, будут ошибочно признаны orphan-файлами и удалены. Интервал по умолчанию — 3 дня.

Info

Iceberg использует строковые представления путей при определении файлов, которые нужно удалить. В некоторых файловых системах путь может измениться со временем, но при этом указывать на тот же файл. Например, если изменить authority в кластере HDFS, старые URL путей, использованные при создании файлов, не будут совпадать с текущими. Это приведёт к потере данных при запуске RemoveOrphanFiles. Убедитесь, что записи в ваших MetadataTables совпадают с путями, возвращаемыми Hadoop FileSystem API, чтобы избежать непреднамеренного удаления.

Необязательное обслуживание (Optional Maintenance)

Некоторым таблицам требуется дополнительное обслуживание. Например, потоковые запросы могут создавать маленькие файлы данных, которые следует объединять в более крупные. А для некоторых таблиц выгодно переписывать файлы манифестов, чтобы значительно ускорить поиск данных для запросов.

Compact data files

Iceberg отслеживает каждый файл данных в таблице. Большее количество файлов данных приводит к увеличению объёма метаданных в manifest-файлах, а маленькие файлы данных создают избыточные метаданные и менее эффективные запросы из-за высокой стоимости операций открытия файлов.

Iceberg может компактировать файлы данных параллельно с помощью Spark через действие rewriteDataFiles. Это позволит объединить небольшие файлы в более крупные, чтобы уменьшить объём метаданных и снизить стоимость открытия файлов во время выполнения запросов.

Таблица метаданных files полезна для анализа размеров файлов данных и определения моментов, когда партиции стоит компактировать.

Переписывание manifest-файлов

Iceberg использует метаданные в manifest list и manifest-файлах, чтобы ускорять планирование запросов и отбрасывать ненужные файлы данных. Дерево метаданных работает как индекс для данных таблицы.

Манифесты в дереве метаданных автоматически компактируются в порядке их добавления, что ускоряет запросы, если шаблон записи совпадает с фильтрами чтения. Например, запись данных, разбитых по часам, по мере их поступления совпадает с фильтрами запросов по временным диапазонам.

Когда шаблон записи таблицы не совпадает с шаблоном запросов, метаданные можно переписать, чтобы пере-группировать файлы данных по манифестам, используя rewriteManifests или действие rewriteManifests (для параллельной обработки в Spark).

Этот пример переписывает маленькие манифесты и группирует файлы данных по первому полю партиционирования.

Metrics Reporting

Начиная с версии 1.1.0 Iceberg поддерживает MetricsReporter и MetricsReport APIs. Эти два API позволяют описывать различные отчёты метрик и обеспечивают подключаемый механизм для их доставки.

Отчет ScanReport

ScanReport содержит метрики, собираемые во время планирования скана таблицы. Помимо общей информации о таблице, такой как идентификатор снимка и имя таблицы, он включает метрики вроде:

  • total scan planning duration
  • number of data/delete files included in the result
  • number of data/delete manifests scanned/skipped
  • number of data/delete files scanned/skipped
  • number of equality/positional delete files scanned

Отчет CommitReport

CommitReport содержит метрики, собираемые после фиксации изменений в таблице (то есть после создания снимка). Помимо общей информации о таблице, такой как идентификатор снимка и имя таблицы, он включает метрики вроде:

  • total duration
  • number of attempts required for the commit to succeed
  • number of added/removed data/delete files
  • number of added/removed equality/positional delete files
  • number of added/removed equality/positional deletes

Available Metrics Reporters — LoggingMetricsReporter

Это репортёр метрик по умолчанию, если не указано иное. Его задача — записывать результаты в лог-файл. Пример выходных данных будет выглядеть следующим образом:

RESTMetricsReporter

Это репортёр метрик по умолчанию при использовании RESTCatalog. Его задача — отправлять метрики на REST-сервер по адресу:
/v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics, как определено в OpenAPI — спецификации REST.

Отправку метрик через REST можно контролировать с помощью свойства rest-metrics-reporting-enabled (по умолчанию: true).

Implementing a custom Metrics Reporter (Реализация собственного репортёра метрик)

Реализация API MetricsReporter даёт полную гибкость в обработке входящих объектов MetricsReport. Например, можно отправлять результаты в Prometheus-эндпоинт или любую другую систему наблюдаемости.

Ниже приведён короткий пример InMemoryMetricsReporter, который сохраняет отчёты в список и предоставляет к ним доступ:

Registering a custom Metrics Reporter (Регистрация собственного репортёра метрик)

Via Catalog Configuration (Через конфигурацию каталога)

Свойство каталога metrics-reporter-impl позволяет зарегистрировать конкретный MetricsReporter, указав его полностью квалифицированное имя класса. Например:

Via the Java API during Scan planning (Через Java API в процессе планирования скана)

Независимо от того, зарегистрирован ли MetricsReporter на уровне каталога через свойство metrics-reporter-impl, можно дополнительно указать репортёры во время планирования скана, как показано ниже:

 

 

Partitioning

Что такое партиционирование?

Партиционирование — это способ ускорить выполнение запросов за счёт группировки похожих строк вместе при записи.

Например, запросы к таблице logs обычно включают диапазон времени, как в этом запросе для логов между 10 и 12 часами:

Если настроить таблицу logs на партиционирование по дате event_time, то события логов будут объединены в файлы с одинаковой датой события. Iceberg отслеживает эту дату и использует её, чтобы пропускать файлы за другие даты, в которых нет нужных данных.

Iceberg может партиционировать временные метки по году, месяцу, дню и часу. Он также может использовать категориальные колонки, такие как level в приведённом примере, чтобы хранить строки вместе и ускорять запросы.

Что Iceberg делает иначе?

Другие форматы таблиц, такие как Hive, поддерживают партиционирование, но Iceberg поддерживает скрытое партиционирование.

Iceberg обрабатывает утомительную и склонную к ошибкам задачу формирования значений партиций для строк таблицы.
Iceberg автоматически избегает чтения ненужных партиций. Потребителям не нужно знать, как таблица партиционирована, и добавлять дополнительные фильтры в свои запросы.
Iceberg позволяет схеме партиционирования эволюционировать по мере необходимости.

Партиционирование в Hive

Чтобы продемонстрировать разницу, рассмотрим, как Hive обрабатывал бы таблицу logs.

В Hive партиции являются явными и присутствуют как колонка, поэтому в таблице logs была бы колонка event_date. При записи оператор INSERT должен указывать данные для колонки event_date:

Аналогично, запросы, которые выполняют поиск по таблице logs, должны содержать фильтр event_date в дополнение к фильтру event_time.

Если фильтр event_date отсутствует, Hive просканирует каждый файл в таблице, потому что он не знает, что колонка event_time связана с колонкой event_date.

Проблемы с партиционированием в Hive

Hive должен получать значения партиций. В примере с logs он не знает связь между event_time и event_date.

Это приводит к нескольким проблемам:

  • Hive не может проверять корректность значений партиций — ответственность за формирование корректного значения лежит на записи.
    • Использование неправильного формата, 2018-12-01 вместо 20181201, приводит к тихим неправильным результатам, а не к ошибкам запроса.
    • Использование неправильной исходной колонки, например processing_time, или неправильного часового пояса также вызывает неправильные результаты, а не ошибки.
  • Пользователь должен сам правильно писать запросы.
    • Использование неправильного формата также приводит к тихим неправильным результатам.
    • Пользователи, которые не понимают физическое устройство таблицы, получают неоправданно медленные запросы — Hive не может автоматически преобразовать фильтры.
  • Рабочие запросы привязаны к схеме партиционирования таблицы, поэтому конфигурацию партиционирования нельзя изменить без нарушения работы запросов.

Скрытое партиционирование в Iceberg (hidden partitioning)

Iceberg формирует значения партиций, беря значение колонки и при необходимости преобразуя его. Iceberg отвечает за преобразование event_time в event_date и отслеживает эту связь.

Партиционирование таблицы настраивается с использованием этих связей. Таблица logs была бы партиционирована по day(event_time) и level.

Поскольку Iceberg не требует, чтобы пользователь поддерживал колонки партиций, он может скрывать партиционирование. Значения партиций всегда формируются корректно и всегда используются для ускорения запросов, когда это возможно. Продюсеры и потребители даже не увидят event_date.

Самое важное — запросы больше не зависят от физического устройства таблицы. Благодаря разделению физического и логического уровней таблицы Iceberg могут со временем изменять схемы партиционирования по мере роста объёмов данных. Некорректно сконфигурированные таблицы можно исправить без дорогостоящей миграции.

Performance

  • Iceberg разработан для огромных таблиц и используется в продакшене, где одна таблица может содержать десятки петабайт данных.
  • Даже многопетабайтные таблицы могут быть прочитаны с одного узла, без необходимости использовать распределённый SQL-движок для перебора метаданных таблицы.

Планирование скана

Планирование скана — это процесс определения файлов таблицы, которые требуются для выполнения запроса.

Планирование в таблице Iceberg помещается на один узел, потому что метаданные Iceberg позволяют:

  • исключать ненужные файлы метаданных (prune),
  • а также отфильтровывать файлы данных, которые не содержат подходящих данных.

Быстрое планирование скана на одном узле даёт:

  • низкую задержку SQL-запросов — за счёт исключения распределённого скана для планирования распределённого скана;
  • доступ с любого клиента — автономные процессы могут напрямую читать данные из таблиц Iceberg.

Фильтрация метаданных

Iceberg использует два уровня метаданных для отслеживания файлов в снапшоте:

  • Файлы манифестов (manifest files) содержат список файлов данных, а также данные о разделах (partition data) и статистику по столбцам для каждого файла.
  • Список манифестов (manifest list) содержит список манифестов снапшота и диапазоны значений для каждого поля партиционирования.

Для быстрого планирования скана Iceberg сначала фильтрует манифесты, используя диапазоны значений партиций из списка манифестов. Затем читает только нужные манифесты, чтобы получить файлы данных.

Таким образом, список манифестов работает как индекс по манифестам, позволяя планировать скан без чтения всех манифестов.

Помимо диапазонов значений партиций, manifest list также содержит количество добавленных и удалённых файлов в каждом манифесте, что ускоряет операции вроде удаления старых снапшотов.

Фильтрация данных

Файлы манифестов содержат:

  • данные о партициях,
  • статистику по столбцам для каждого файла данных.

Во время планирования предикаты запроса автоматически преобразуются в предикаты на данные партиций и сначала применяются для фильтрации файлов. Затем используются:

  • количество значений (value counts),
  • количество null,
  • нижние и верхние границы значений по столбцам
  • для исключения файлов, которые не могут соответствовать условию запроса.

Используя верхние и нижние границы значений для фильтрации файлов данных при планировании, Iceberg может отбрасывать целые сплиты без запуска задач.

В некоторых случаях это даёт увеличение производительности в 10 раз.

Reliability

todo

Schemas

Таблицы Iceberg поддерживают следующие типы:

Type Описание Примечания
boolean Истина или ложь  
int 32-битные знаковые целые числа Может быть повышен до long
long 64-битные знаковые целые числа  
float 32-битное число с плавающей запятой IEEE 754 Может быть повышен до double
double 64-битное число с плавающей запятой IEEE 754  
decimal(P,S) Десятичное число с фиксированной точностью P и масштабом S Масштаб фиксирован, точность должна быть 38 или меньше
date Календарная дата без часового пояса и времени  
time Время суток без даты и часового пояса Хранится в микросекундах
timestamp Таймстамп без часового пояса Хранится в микросекундах
timestamptz Таймстамп с часовым поясом Хранится в микросекундах
string Строка произвольной длины Кодируется в UTF-8
fixed(L) Байтовый массив фиксированной длины L  
binary Байтовый массив произвольной длины  
struct<...> Запись с именованными полями произвольных типов  
list Список элементов произвольного типа  
map<K, V> Карта (словарь) с ключами и значениями произвольных типов  

Iceberg отслеживает каждое поле в схеме таблицы, используя идентификатор, который никогда не используется повторно в таблице.

Views Concept / Configuration

View properties

todo

View behavior properties

todo

 

 

Подпишись на телеграм канал Data Engineering Инжиниринг данных
Иван Шамаев
Более 14 лет опыта в ИТ. Разрабатывал аналитические решения и визуализации данных, строил витрины данных, автоматизировал выгрузку и обработку данных, автоматизировал data pipelines в Airflow. Разработка рассылок, финансовой отчетности, кастомизация Superset.
0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x