Данная статья — это перевод некоторых разделов документации по Trino 478.
Contents
- 1 Материалы по теме Trino
- 2 Trino Glossary
- 3 Установка Trino + Iceberg + Rest Catalog + Minio
- 4 Статистика таблиц (Table statistics)
- 5 Cost in EXPLAIN
- 6 Cost-based optimizations
- 6.1 Типы Joins в Trino (вводная часть перед рассмотрением оптимизаций)
- 6.2 Перебор соединений (Join enumeration)
- 6.3 Выбор стратегии распределения соединений (Join distribution selection)
- 6.4 Ограничение размера реплицируемой таблицы
- 6.5 Синтаксический порядок соединений
- 6.6 Реализации коннекторов
- 6.7 Проброс предикатов (Predicate pushdown)
- 6.8 Проброс проекций (Projection pushdown)
- 6.9 Проброс разыменования (Dereference pushdown)
- 6.10 Проброс агрегации (Aggregation pushdown)
- 6.11 Ограничения
- 6.12 Проброс соединений (Join pushdown)
- 6.13 Проброс ограничения (Limit pushdown)
- 6.14 Проброс Top-N (Top-N pushdown)
- 7 Адаптивные оптимизации планов (Adaptive plan optimizations)
- 8 Адаптивное переупорядочивание распределённых соединений (Adaptive reordering of partitioned joins)
- 9 Trino Iceberg connector
- 10 Управление данными Trino и Iceberg
- 11 Основные алгоритмы соединения в Trino
- 12 Обзор Explain и Explain Analyze
- 13 Session properties
- 14 Сброс на диск (Spill to disk)
- 15 Exchange (Обмен данными)
- 16 Scanning (Сканирование данных)
- 17 Fault-tolerant execution (Отказоустойчивое выполнение)
- 18 Task properties
- 19 Exchange properties
- 20 Resource management properties
- 21 Query management properties
- 21.1 query.client.timeout
- 21.2 query.execution-policy
- 21.3 query.determine-partition-count-for-write-enabled
- 21.4 query.max-hash-partition-count
- 21.5 query.min-hash-partition-count
- 21.6 query.min-hash-partition-count-for-write
- 21.7 query.max-writer-task-count
- 21.8 query.low-memory-killer.policy
- 21.9 task.low-memory-killer.policy
- 21.10 query.max-execution-time
- 21.11 query.max-length
- 21.12 query.max-planning-time
- 21.13 query.max-run-time
- 21.14 query.max-scan-physical-bytes
- 21.15 query.max-write-physical-size
- 21.16 query.max-stage-count
- 21.17 query.max-history
- 21.18 query.min-expire-age
- 21.19 query.remote-task.enable-adaptive-request-size
- 21.20 query.remote-task.guaranteed-splits-per-task
- 21.21 query.remote-task.max-error-duration
- 21.22 query.remote-task.max-request-size
- 21.23 query.remote-task.request-size-headroom
- 21.24 query.info-url-template
- 21.25 retry-policy
- 22 Optimizer properties
- 22.1 optimizer.dictionary-aggregation
- 22.2 optimizer.optimize-metadata-queries
- 22.3 optimizer.distinct-aggregations-strategy
- 22.4 optimizer.push-aggregation-through-outer-join
- 22.5 optimizer.push-table-write-through-union
- 22.6 optimizer.push-filter-into-values-max-row-count
- 22.7 optimizer.join-reordering-strategy
- 22.8 optimizer.max-reordered-joins
- 22.9 optimizer.optimize-duplicate-insensitive-joins
- 22.10 optimizer.use-exact-partitioning
- 22.11 optimizer.use-table-scan-node-partitioning
- 22.12 optimizer.table-scan-node-partitioning-min-bucket-to-task-ratio
- 22.13 optimizer.colocated-joins-enabled
- 22.14 optimizer.filter-conjunction-independence-factor
- 22.15 optimizer.join-multi-clause-independence-factor
- 22.16 optimizer.non-estimatable-predicate-approximation.enabled
- 22.17 optimizer.join-partitioned-build-min-row-count
- 22.18 optimizer.min-input-size-per-task
- 22.19 optimizer.min-input-rows-per-task
- 22.20 optimizer.use-cost-based-partitioning
Материалы по теме Trino
- Habr «Почему Trino такой быстрый: архитектура оптимизатора SQL-запросов»
- Habr «Почему Trino такой быстрый: динамические фильтры»
- Habr «Как мы ускорили Trino, научив оптимизатор удалять ненужные Join»
- Habr Как устроен massively parallel processing (MPP) в Trino
- Архитектура оптимизатора Trino
- Обработка данных в Data Lake с помощью Trino | Запись выступления
- Массивно-параллельная обработка запросов в Trino
- Как Trino читает данные из файлов Parquet
- Trino-Ebook — Eng (PDF файл)
- How to Optimize Trino Query Performance: 2025 Playbook — Eng
- Speed Trino Queries with These Performance-Tuning Tips
- Habr «Бенчмарк lakehouse-движков, часть 1: StarRocks и Doris падают под нагрузкой, Presto аутсайдер, CedrusData быстрее всех»
- BigdataSchool: Подборка статей по теме Trino
- YouTube:
- Trino Meetup #1: Как пересесть на Trino после Vertica: реальный кейс Авито, Дмитрий Рейман
- Trino Meetup #1: Архитектура оптимизатора Trino, Владимир Озеров
- Trino в Авито. Возможности CedrusData Catalog | Lakehouse Meetup
- Trino Meetup #2: Как ускорить работу Trino с Data Lake c помощью кэширования, Владимир Озеров
- Trino Meetup #2: Trino в Тинькофф, Дмитрий Зуев
- Trino в Лемана Тех, Nessie в Азбуке Вкуса, круглый стол о проблемах lakehouse | Lakehouse Meetup
- VK:
- Поднимаем Data Lakehouse на основе Trino в облаке
- Роль Trino в Тинькофф: использование встроенных возможностей, собственные доработки и future work
- Митап «Trino для нетерпеливых»
- Владимир Озеров — Быстрая обработка данных в Data Lake с помощью Trino
- Владимир Озеров — Как устроено выполнение SQL-запросов в Presto/Trino
- Роль Trino в Тинькофф: использование встроенных возможностей, собственные доработки и future work
- Поднимаем Data Lakehouse на основе Trino в облаке
- Владимир Озеров — Как работает Apache Iceberg на примере Trino
- YouTube and Articles in English:
- trino query plan analysis (video series)
- Playlist «Starburst Icehouse Architecture»
- Playlist «Starburst Galaxy»
- Playlist «Starburst Academy»
Trino Glossary
Для начала рассмотрим ряд терминов по Trino.
- Predicate Pushdown (проталкивание фильтров): Trino пытается “протолкнуть” фильтры (WHERE, JOIN условия) из SQL-запроса как можно ближе к источнику данных (Iceberg, Parquet, ORC и т.д.), чтобы уменьшить объём прочитанных данных. Работает на уровне manifest-файлов (min/max statistics), data files (file-level stats), иногда даже на уровне Parquet row groups (если включен deep filtering).
- Partition Pruning (отсечение партиций): Если таблица Iceberg партиционирована, Trino анализирует выражения фильтра (WHERE, JOIN ON) и отсекает целые партиции, которые точно не могут удовлетворить условию.
- Dynamic filtering (Динамические фильтры) — это оптимизация, при которой фильтры, полученные во время выполнения из одной части запроса (например, из результата join), автоматически применяются к другой части запроса, чтобы уменьшить объём данных, считываемых из источника. То есть используются значения для фильтрации, которые получаются динамически в процессе выполнения, а не на этапе планирования.
- Optimizer — это компонент, который берёт разобранный SQL (из Parser), строит логический план выполнения (операции: scan, filter, join, aggregate и т.д.), и далее преобразует его в физический план, который реально будет выполняться на worker-узлах. Задача optimizer — это минимизировать время и стоимость запроса, выбирая лучший способ выполнения (порядок join’ов, pushdown, partition pruning и т.д.).
- Cost-based Optimization (CBO) — это механизм планировщика, который на основе статистики таблиц (например, объёма данных, распределения значений и т.д.) оценивает «стоимость» различных вариантов выполнения запроса (например, порядок join’ов, стратегия распределения данных) и выбирает тот план, который, по оценке, будет наиболее эффективным (то есть дешевым).
- Распределенное соединение (Join distribution) — стратегия выполнения операции соединения в распределенной среде:
- PARTITIONED JOIN (Партиционированное соединение) — «каждый делает свою часть работы»: обе таблицы разбиваются (partitioned) по ключу соединения (например, customer_id), и каждый worker Trino получает только свою часть данных, чтобы соединить локально.
- BROADCAST JOIN (Широковещательное соединение) — «раздать маленькое всем»: если одна таблица (например, B) маленькая, Trino просто рассылает (broadcast) её копию на все узлы. Каждый worker соединяет свою часть большой таблицы A с полной копией B.
- [Legacy] Spill в Trino позволяет выгружать промежуточные результаты операций, интенсивно использующих память (таких как агрегации, соединения и сортировки), из оперативной памяти на локальный диск для предотвращения сбоев запросов из-за нехватки памяти. Система отслеживает использование памяти и, при превышении заданного порога, перемещает данные на диск, продолжая обработку с меньшим потреблением памяти.
- Trino Rule — это правило оптимизации, то есть отдельное “мини-правило”, которое говорит оптимизатору: «Если ты видишь вот такую конструкцию в плане запроса — перепиши её вот так, потому что это будет быстрее / проще / эффективнее.»
- Trino exchange — это механизм, отвечающий за передачу промежуточных данных между различными рабочими узлами (worker’ами) во время выполнения распределенного запроса. Этот процесс необходим, чтобы операторы (например, JOIN, AGGREGATION) могли корректно и эффективно работать с данными, распределенными по разным серверам кластера.
- Fragment — это самостоятельный кусок плана запроса, который выполняется на одном или нескольких worker-узлах параллельно. Каждый fragment — это логически независимая часть вычислений (например, чтение данных, join, агрегация), между которыми Trino передаёт результаты по сети. Проще говоря, Trino делит запрос на фрагменты, чтобы выполнять их распределённо и эффективно: один fragment готовит данные, другой их обрабатывает, а coordinator собирает итог.
- Build Side (Сторона построения) — Данные с этой стороны используются для создания структуры данных в памяти, которая обеспечивает быстрый поиск — обычно хеш-таблицы (hash table).
- Probe Side (Сторона проверки/сканирования) — Данные с этой стороны используются для проверки соответствий в уже построенной хеш-таблице. Система потоково (строка за строкой) сканирует «probe side» таблицу. Для каждой строки вычисляется хеш-ключ, и выполняется поиск в хеш-таблице, созданной на «build side», для нахождения соответствующих строк.
- Split — это минимальная единица данных для обработки, которую worker-узел читает и выполняет локально. Каждый split обычно соответствует части файла или нескольких файлов (например, блок Parquet или ORC) и включает информацию о том, где находятся эти данные и какие колонки нужно читать. Trino делит таблицу на множество split-ов, чтобы распараллелить выполнение запроса: каждый worker обрабатывает один или несколько split-ов одновременно, что позволяет эффективно использовать кластер и ускорять чтение больших объёмов данных.
- Федеративные запросы в Trino — возможность выполнять один SQL-запрос, который объединяет данные из нескольких разных источников данных (например, PostgreSQL, S3, Kafka и т.д.) без необходимости перемещать их в одно хранилище или выполнять сложный ETL-процесс. Trino подключается к источникам, получает необходимые данные, а затем обрабатывает их параллельно и возвращает результат.
- Координатор (Coordinator) — сервер Trino, который управляет входящими SQL-запросами, планирует выполнение запросов и распределяет задачи между рабочими узлами (workers).
- Рабочий узел (Worker) — сервер Trino, который выполняет задачи, назначенные координатором, и обрабатывает данные.
- Плагин (Plugin) — набор кода, реализующий SPI (Service Provider Interface) Trino для добавления новых коннекторов, типов данных, функций SQL и других возможностей.
- Fault tolerance executor — это механизм, который обеспечивает отказоустойчивое выполнение запросов, позволяя кластеру автоматически повторять выполнение запросов или отдельных задач при сбоях. При включенном режиме fault-tolerant execution промежуточные данные сохраняются в указанном пространстве (hdfs, s3) и могут быть использованы другими воркерами для повторного выполнения, что снижает вероятность провала запроса из-за сбоев и не требует ручного перезапуска.
Splits в Trino
Split — часть данных таблицы источника, которая может быть отсканирована независимо.
За выбор размера сплита отвечают параметры hive.max-initial-split-size и hive.max-split-size.
Основные термины и концепции Trino
Чтобы понять работу Trino, сначала нужно разобраться с основными терминами и концепциями, которые используются в его документации.
Хотя базовые запросы и операторы SQL понять просто, чтобы эффективно использовать Trino, важно знать, как работают такие понятия, как stages (стадии) и splits (фрагменты).
Если вы администратор или разработчик Trino, вам также нужно понимать, как стадии преобразуются в tasks (задачи) и как задачи состоят из drivers (драйверов), обрабатывающих данные.
Этот раздел даёт чёткие определения ключевых концепций, встречающихся в документации Trino, начиная от самых общих до самых конкретных.
Примечание:
За дополнительной информацией можно обратиться к книге Trino: The Definitive Guide и исследовательской работе Presto_SQL_on_Everything.pdf, в которых подробно описываются архитектура и концепции Trino.
Архитектура (Architecture)
Trino — это распределённый движок выполнения запросов, который обрабатывает данные параллельно на множестве серверов.
Существует два типа серверов Trino:
- координаторы (coordinators)
- воркеры (workers)
Кластер (Cluster)
Кластер Trino состоит из нескольких узлов (nodes) — одного координатора и одного или нескольких воркеров.
Пользователи подключаются к координатору через свой SQL-клиент. Координатор взаимодействует с воркерами, а оба типа узлов получают доступ к источникам данных, настроенным в каталогах (catalogs).
Обработка каждого запроса — это состояние процесса (stateful operation). Координатор распределяет нагрузку и выполняет её параллельно на всех воркерах.
Каждый узел запускает Trino в одном экземпляре JVM, а обработка внутри узла дополнительно распараллеливается через потоки (threads).
Узел (Node)
Любой сервер Trino в кластере считается узлом.
Технически, узел — это Java-процесс, запущенный с программой Trino, но на практике под узлом часто подразумевают физическую машину, поскольку рекомендуется запускать только один процесс Trino на сервер.
Координатор (Coordinator)
Координатор Trino — это сервер, отвечающий за:
- разбор SQL-запросов (parsing),
- построение плана выполнения (planning),
- управление воркерами.
Он является «мозгом» системы Trino и тем узлом, к которому подключается клиент для выполнения запросов.
Каждая установка Trino должна содержать один координатор и как минимум один воркер.
Для целей разработки или тестирования можно настроить один сервер, выполняющий обе роли одновременно.
Координатор:
- отслеживает активность воркеров,
- координирует выполнение запроса,
- создаёт логическую модель запроса, состоящую из стадий (stages),
- преобразует их в набор связанных задач (tasks), которые выполняются на кластере воркеров.
Координатор взаимодействует с воркерами и клиентами через REST API.
Воркер (Worker)
Воркер Trino — это сервер, который выполняет задачи (tasks) и обрабатывает данные.
Он:
- извлекает данные из коннекторов,
- обменивается промежуточными результатами с другими воркерами,
- передаёт результаты координатору, который собирает их и возвращает клиенту.
Когда воркер запускается, он регистрируется на сервере обнаружения (discovery server), встроенном в координатор.
После этого координатор может использовать воркер для выполнения задач.
Воркеры, как и координатор, обмениваются данными через REST API.
Клиент (Client)
Клиенты позволяют подключаться к Trino, отправлять SQL-запросы и получать результаты.
Клиенты могут обращаться ко всем настроенным источникам данных через каталоги (catalogs).
Клиенты представляют собой полнофункциональные приложения, драйверы или библиотеки, которые позволяют подключаться к Trino как из стандартных инструментов (например, SQL IDE или BI-систем), так и из кастомных приложений или скриптов.
Клиентские приложения включают CLI-инструменты, десктопные программы, веб-приложения и SaaS-решения с такими возможностями, как:
- интерактивное написание SQL-запросов в редакторах,
- визуальное построение запросов,
- выполнение запросов и отображение результатов,
- построение визуализаций (графики, диаграммы),
- создание отчётов и дашбордов.
Если клиентское приложение поддерживает другие языки запросов или использует графический интерфейс для построения запроса, оно должно транслировать запрос в SQL, который поддерживает Trino.
Плагин (Plugin)
Trino использует плагинную архитектуру, которая позволяет расширять функциональность движка и интегрироваться с различными источниками данных и внешними системами.
Источник данных (Data source)
Trino — это движок выполнения запросов, который может работать с различными источниками данных, включая:
- data lakes и lakehouses,
- реляционные СУБД,
key-valueхранилища,- и множество других типов систем хранения данных.
Источники данных предоставляют Trino данные для выполнения запросов.
Чтобы получить доступ к ним, необходимо настроить каталог (catalog), указав соответствующий коннектор (connector) для конкретного источника данных.
После этого можно использовать любой поддерживаемый клиент для выполнения SQL-запросов к этим источникам с использованием функционала Trino и возможностей клиента.
В документации Trino часто встречаются термины connector, catalog, schema и table — это ключевые понятия модели Trino, описывающие представление внешних источников данных.
Коннектор (Connector)
Коннектор — это адаптер, который позволяет Trino работать с конкретным источником данных — например, с data lake (через Hadoop/Hive или Apache Iceberg) или с реляционной БД (например, PostgreSQL).
По сути, коннектор можно рассматривать как драйвер для базы данных.
Он реализует Service Provider Interface (SPI) Trino, предоставляя стандартный API для взаимодействия движка с источником данных.
Trino включает множество встроенных коннекторов, включая:
- Коннекторы для data lakes и lakehouses: Delta Lake, Hive, Hudi, Iceberg.
- Коннекторы для реляционных СУБД: MySQL, PostgreSQL, Oracle, SQL Server.
- Коннекторы для других систем: Cassandra, ClickHouse, OpenSearch, Pinot, Prometheus, SingleStore, Snowflake.
- Вспомогательные коннекторы: JMX, System, TPC-H и другие.
Каждый каталог использует конкретный коннектор.
В конфигурационном файле каталога всегда присутствует обязательное свойство connector.name, которое определяет, какой коннектор используется.
Каталог (Catalog)
Каталог Trino — это набор конфигурационных свойств, необходимых для подключения к определённому источнику данных.
Каталог включает:
- выбранный коннектор,
- параметры подключения (URL, учётные данные и т. д.).
Каталоги описываются в файлах свойств (properties), которые хранятся в конфигурационном каталоге Trino (etc/catalog/).
Имя файла определяет имя каталога. Например, файл etc/example.properties создаёт каталог с именем example.
Вы можете настроить несколько каталогов — с разными или одинаковыми коннекторами — для доступа к различным источникам данных.
Например:
- два каталога с Hive-коннектором для подключения к двум разным data lakes,
- один каталог с Hive-коннектором для data lake и другой с Iceberg-коннектором для lakehouse,
- несколько каталогов с PostgreSQL-коннектором для подключения к разным БД.
Каталог содержит одну или несколько схем (schemas), которые в свою очередь содержат объекты — таблицы, представления (views), материализованные представления и т. д.
Любой объект в Trino (например, таблица) имеет полное имя (fully-qualified name), начинающееся с имени каталога.
Например:example.test_data.test — таблица test в схеме test_data каталога example.
Схема (Schema)
Схема — это способ организации таблиц внутри каталога.
Каталог и схема вместе определяют набор таблиц и других объектов, доступных для выполнения запросов.
При работе с Hive или реляционными СУБД (например, MySQL) схема в Trino напрямую соответствует схеме в целевой базе данных.
Для других типов коннекторов схемы могут определяться иначе, в зависимости от логики источника данных.
Таблица (Table)
Таблица — это набор неупорядоченных строк (rows), организованных в именованные столбцы (columns) с определёнными типами данных.
Это полностью соответствует понятию таблицы в любой реляционной базе данных.
Маппинг типов (type mapping) между исходными данными и типами Trino определяется коннектором, и может различаться в зависимости от коннектора.
Модель выполнения запросов (Query execution model)
Trino выполняет SQL-операторы (statements) и преобразует их в запросы (queries), которые обрабатываются распределённым кластером, состоящим из координатора и воркеров.
Оператор (Statement)
Trino выполняет SQL-операторы, совместимые со стандартом ANSI SQL.
Когда в документации Trino упоминается statement, имеется в виду именно оператор в смысле стандарта ANSI SQL — то есть конструкция, состоящая из клауз, выражений и предикатов.
Можно задаться вопросом, зачем различать понятия statement и query.
Это важно, потому что в Trino statement — это просто текст SQL-запроса, который пользователь отправляет системе.
Когда Trino выполняет оператор, он создаёт query — запрос, то есть внутреннее представление оператора с планом выполнения, который затем распределяется по узлам кластера Trino (воркерам).
Запрос (Query)
Когда Trino разбирает оператор (parses a statement), он преобразует его в запрос (query) и создаёт распределённый план выполнения (distributed query plan).
Этот план реализуется в виде набора связанных стадий (stages), выполняющихся на воркерах Trino.
Когда вы запрашиваете информацию о выполняющемся запросе, Trino возвращает снимок (snapshot) состояния всех компонентов, участвующих в обработке и формировании результирующего набора данных.
Разница между statement и query проста:
- Statement — это SQL-текст, отправленный в Trino.
- Query — это структура и набор компонентов (стадий, задач, разбиений и т.д.), созданных для выполнения этого SQL-запроса.
Таким образом, query включает в себя стадии (stages), задачи (tasks), разбиения (splits), коннекторы (connectors) и другие элементы, совместно обеспечивающие выполнение оператора.
Стадия (Stage)
Trino выполняет запрос, разбивая его на иерархию стадий.
Например, если Trino нужно агрегировать данные из миллиарда строк в Hive, он создаёт:
- корневую стадию (root stage) — для агрегирования итоговых данных,
- и несколько дочерних стадий, реализующих отдельные части распределённого плана.
Иерархия стадий напоминает дерево, где:
- корневая стадия агрегирует результаты других стадий,
- а каждая стадия описывает часть плана выполнения запроса.
Важно: сами стадии не выполняются на воркерах — они лишь представляют логическую модель выполнения, которую использует координатор.
Задача (Task)
Стадии не выполняются напрямую на воркерах.
Чтобы понять, как они исполняются, нужно знать, что каждая стадия реализуется как набор задач (tasks), распределённых между воркерами Trino.
Задачи — это “рабочие лошади” Trino.
Распределённый план запроса разбивается на стадии, которые затем транслируются в задачи.
Задачи обрабатывают данные (splits) и передают результаты дальше.
Каждая задача:
- имеет входные и выходные потоки данных,
- может выполняться параллельно с другими задачами,
- включает драйверы (drivers), которые реализуют низкоуровневое выполнение.
Разбиение (Split)
Задачи работают с разбиениями (splits) — это фрагменты большого набора данных.
На нижних уровнях распределённого плана стадии получают данные из коннекторов через сплиты.
На высших уровнях — промежуточные стадии получают данные из других стадий (через обмены — exchanges).
Когда Trino планирует выполнение запроса:
- Координатор запрашивает у коннектора список доступных сплитов для таблицы.
- Затем он распределяет сплиты по задачам,
- и отслеживает, какие воркеры обрабатывают какие сплиты.
Драйвер (Driver)
Каждая задача содержит один или несколько параллельных драйверов.
Драйвер — это последовательность операторов (operators), которые:
- принимают данные,
- применяют преобразования,
- и выдают результат, который передаётся другой задаче на следующей стадии.
Можно считать драйвер физической реализацией набора операторов в памяти.
Это наименьшая единица параллелизма в архитектуре Trino.
Каждый драйвер имеет один вход и один выход.
Оператор (Operator)
Оператор — это компонент, который принимает, преобразует и производит данные.
Примеры:
- Table scan operator — извлекает данные из коннектора, создавая поток строк.
- Filter operator — получает данные и применяет предикат (условие), возвращая только подходящие строки.
Операторы объединяются в драйверы, образуя цепочку обработки данных.
Обмен (Exchange)
Exchanges (обмены) обеспечивают передачу данных между узлами Trino при выполнении разных стадий запроса.
Задачи записывают результаты в выходные буферы (output buffers).
Другие задачи потребляют эти данные через exchange clients.
Таким образом, обмены обеспечивают связь между стадиями и позволяют Trino выполнять сложные запросы распределённо и параллельно.
Установка Trino + Iceberg + Rest Catalog + Minio
Для того, чтобы поиграться с инструментом, можно запустить в Docker — настройки в GitHub https://github.com/ivanshamaev/trino-iceberg-minio
Статистика таблиц (Table statistics)
Trino поддерживает оптимизации запросов, основанные на статистике. Чтобы запрос мог использовать эти оптимизации, Trino должен иметь статистическую информацию о таблицах, участвующих в этом запросе.
Статистика таблиц представляет собой оценки характеристик хранимых данных. Эти оценки предоставляются планировщику запросов коннекторами и позволяют повышать производительность при обработке запросов.
Доступная статистика
В Trino доступны следующие виды статистики:
Для таблицы:
- количество строк (row count): общее количество строк в таблице
Для каждого столбца в таблице:
- размер данных (data size): объём данных, который необходимо прочитать
- доля null-значений (nulls fraction): доля значений null
- количество уникальных значений (distinct value count): число различных значений
- минимальное значение (low value): наименьшее значение в столбце
- максимальное значение (high value): наибольшее значение в столбце
Набор доступных статистических данных для конкретного запроса зависит от используемого коннектора и может различаться для разных таблиц. Например, коннектор Hive в настоящее время не предоставляет статистику по размеру данных.
Статистику таблиц можно просмотреть через SQL-интерфейс Trino с помощью команды SHOW STATS.
В зависимости от поддержки со стороны коннектора, статистика таблиц обновляется Trino при выполнении операторов управления данными, таких как INSERT, UPDATE или DELETE. Например, коннекторы Delta Lake, Hive и Iceberg поддерживают управление статистикой таблиц непосредственно из Trino.
Также можно инициировать сбор статистики с помощью команды ANALYZE. Это необходимо в случаях, когда другие системы изменяют данные без участия Trino, и, следовательно, статистика, отслеживаемая Trino, становится неактуальной. Другие коннекторы полагаются на базовый источник данных для управления статистикой таблиц или вовсе не поддерживают использование статистики таблиц.
Cost in EXPLAIN
Во время планирования стоимость, связанная с каждым узлом плана, вычисляется на основе статистики таблиц, участвующих в запросе. Эта рассчитанная стоимость выводится как часть результата выполнения оператора EXPLAIN.
Информация о стоимости отображается в дереве плана в формате {rows: XX (XX), cpu: XX, memory: XX, network: XX}. Параметр rows указывает ожидаемое количество строк, которое будет выведено каждым узлом плана во время выполнения. Значение в скобках после количества строк обозначает ожидаемый размер данных, выводимых узлом плана, в байтах. Остальные параметры показывают оценочное количество ресурсов CPU, памяти и сети, которые будут использованы при выполнении данного узла плана. Эти значения не представляют собой реальные единицы измерения, а служат для сравнения относительных затрат между различными узлами плана, что позволяет оптимизатору выбрать наилучший план выполнения запроса. Если какое-либо значение неизвестно, выводится символ ?.
Пример:
|
1 |
EXPLAIN SELECT comment FROM tpch.sf1.nation WHERE nationkey > 3; |
Результат:
|
1 2 3 4 5 6 7 8 |
- Output[comment] => [[comment]] Estimates: {rows: 22 (1.69kB), cpu: 6148.25, memory: 0.00, network: 1734.25} - RemoteExchange[GATHER] => [[comment]] Estimates: {rows: 22 (1.69kB), cpu: 6148.25, memory: 0.00, network: 1734.25} - ScanFilterProject[table = tpch:nation:sf1.0, filterPredicate = ("nationkey" > BIGINT '3')] => [[comment]] Estimates: {rows: 25 (1.94kB), cpu: 2207.00, memory: 0.00, network: 0.00}/{rows: 22 (1.69kB), cpu: 4414.00, memory: 0.00, network: 0.00}/{rows: 22 (1.69kB), cpu: 6148.25, memory: 0.00, network: 0.00} nationkey := tpch:nationkey comment := tpch:comment |
Обычно для каждого узла плана выводится только одно значение стоимости. Однако когда оператор Scan совмещён с операторами Filter и/или Project, выводится несколько структур стоимости, каждая из которых соответствует отдельной логической части объединённого оператора. Например, для оператора ScanFilterProject отображаются три структуры стоимости, соответствующие, соответственно, частям Scan, Filter и Project.
Оценочная стоимость также выводится в EXPLAIN ANALYZE, дополнительно к фактическим статистическим данным о времени выполнения.
Cost-based optimizations
Trino поддерживает несколько оптимизаций, основанных на стоимости, описанных ниже.
Типы Joins в Trino (вводная часть перед рассмотрением оптимизаций)
Trino имеет так называемый оптимизатор на основе стоимости (Cost-Based Optimizer, CBO). Он анализирует статистику по таблицам (размер, распределение данных и т. д.) и автоматически решает, какой алгоритм соединения будет самым эффективным для данного запроса.
Broadcast Join
Это работает, когда одна из таблиц (меньшая) достаточно маленькая, чтобы поместиться в оперативную память на всех рабочих узлах.
- «Строим» хэш-таблицу: Trino берет маленькую таблицу (называемую build side) и создает из нее специальную, очень быструю таблицу для поиска — хэш-таблицу.
- «Рассылаем» ее: Эта хэш-таблица рассылается на каждый рабочий узел, где будет выполняться соединение.
- «Проверяем»: Затем, другая, большая таблица (называемая probe side) построчно сканируется на каждом узле. Для каждой строки из большой таблицы происходит очень быстрый поиск совпадения в локальной хэш-таблице.
- Соединяем: При нахождении совпадения, строки объединяются.
Когда это используется: Когда одна таблица значительно меньше другой. Это самый быстрый способ соединения, так как не нужно передавать много данных по сети между узлами.
Probe — Build schema
Partitioned Join
Этот метод применяется, когда обе таблицы слишком велики, чтобы их можно было целиком хранить в памяти каждого узла.
- «Перемешиваем» данные: Trino берет обе большие таблицы и распределяет их между рабочими узлами, используя хэш-функцию от ключевого столбца соединения. Например, все строки, у которых join_key имеет хэш 1, отправляются на первый узел, с хэшем 2 — на второй и так далее.
- «Соединяем локально»: После того как данные распределены, каждый рабочий узел получает только часть обеих таблиц. Теперь каждый узел может выполнить локальное объединение (как в broadcast join) со своей частью данных, так как все потенциальные совпадения теперь находятся на одном и том же узле.
- Собираем результат: Конечный результат собирается со всех узлов.
Когда это используется: Когда обе таблицы большие. Это медленнее, чем broadcast join, из-за необходимости перемешивать данные по сети, но позволяет работать с очень большими объемами, которые не помещаются в память одного узла.
Главная идея: Probe side всегда обрабатывается потоком, строка за строкой, чтобы избежать полной загрузки большого объёма данных в оперативную память. Trino не хранит эти данные у себя, а работает с ними напрямую из источника.
Перебор соединений (Join enumeration)
Порядок, в котором выполняются соединения (joins) в запросе, может существенно влиять на его производительность. Основным фактором, оказывающим наибольшее влияние на производительность, является объём данных, обрабатываемых и передаваемых по сети. Если соединение, генерирующее большой объём данных, выполняется на ранней стадии выполнения запроса, то последующие этапы вынуждены обрабатывать эти большие объёмы дольше, чем это необходимо, что увеличивает время выполнения и потребление ресурсов.
С помощью перебора соединений на основе стоимости Trino использует статистику таблиц, предоставляемую коннекторами, для оценки стоимости различных порядков соединений и автоматически выбирает порядок с минимальной рассчитанной стоимостью.
Стратегия перебора соединений управляется параметром сессии join_reordering_strategy, при этом параметр конфигурации optimizer.join-reordering-strategy задаёт значение по умолчанию.
Возможные значения:
- AUTOMATIC (по умолчанию) — включает полностью автоматический перебор соединений.
- ELIMINATE_CROSS_JOINS — устраняет ненужные кросс-соединения.
- NONE — чисто синтаксический порядок соединений.
Если используется AUTOMATIC и статистика недоступна или стоимость не может быть рассчитана по какой-либо причине, Trino применяет стратегию ELIMINATE_CROSS_JOINS.
Выбор стратегии распределения соединений (Join distribution selection)
Trino использует алгоритм соединения на основе хешей (hash join). Для каждого оператора соединения создаётся хеш-таблица на основе одного из входов соединения — так называемой build side. Второй вход, probe side, затем итерируется, и для каждой строки выполняется поиск совпадающих записей в хеш-таблице.
Существует два типа распределения соединений:
- Partitioned (распределённое): каждый узел, участвующий в запросе, строит хеш-таблицу только на части данных.
- Broadcast (рассылочное): каждый узел строит хеш-таблицу по всем данным, которые реплицируются на каждый узел.
Каждый тип имеет свои преимущества и недостатки. Partitioned — Распределённые соединения требуют перераспределения обеих таблиц с использованием хеша по ключу соединения. Такие соединения могут быть медленнее, чем рассылочные, но позволяют выполнять значительно более крупные соединения. Broadcast — Рассылочные соединения быстрее, если сторона build значительно меньше стороны probe. Однако Broadcast JOINs требуют, чтобы таблицы на стороне build после фильтрации помещались в память каждого узла, тогда как распределённые соединения требуют, чтобы данные помещались только в совокупную распределённую память всех узлов.
При выборе распределения соединений, основанном на стоимости, Trino автоматически определяет, использовать ли Partitioned или Broadcast соединение. При переборе соединений на основе стоимости Trino также автоматически выбирает, какая сторона будет probe, а какая build.
Стратегия распределения соединений управляется параметром сессии join_distribution_type, а параметр конфигурации join-distribution-type задаёт значение по умолчанию.
Допустимые значения:
- AUTOMATIC (по умолчанию) — тип распределения соединения выбирается автоматически для каждого соединения.
- BROADCAST — используется рассылочное соединение для всех случаев.
- PARTITIONED — используется распределённое соединение для всех случаев.
Ограничение размера реплицируемой таблицы
Тип распределения соединения выбирается автоматически, когда стратегия перебора соединений установлена в AUTOMATIC или когда тип распределения соединения установлен в AUTOMATIC. В обоих случаях можно ограничить максимальный размер реплицируемой таблицы с помощью параметра конфигурации join-max-broadcast-table-size или параметра сессии join_max_broadcast_table_size. Это позволяет повысить параллелизм выполнения в кластере и предотвратить неудачные планы в случае, если оптимизатор на основе стоимости неверно оценивает размер соединяемых таблиц.
По умолчанию максимальный размер реплицируемой таблицы ограничен 100 МБ.
Синтаксический порядок соединений
Если оптимизация на основе стоимости не используется, Trino применяет синтаксический порядок соединений по умолчанию. Хотя формальных способов оптимизации запросов в этом режиме нет, можно воспользоваться особенностями реализации соединений в Trino для повышения производительности.
Trino использует соединения в памяти (in-memory hash joins). При обработке выражения JOIN Trino загружает самую правую таблицу соединения в память в качестве build side, затем потоково читает следующую справа таблицу в качестве probe side для выполнения соединения. Если запрос содержит несколько соединений, результат первого соединения остаётся в памяти как build side, а следующая таблица справа используется как probe side, и так далее для последующих соединений. Если порядок соединений усложнён, например, за счёт скобок, задающих приоритеты соединений, Trino может выполнять несколько нижестоящих соединений одновременно, но каждая стадия следует той же логике, включая финальное объединение результатов.
Из-за такого поведения оптимально синтаксически упорядочивать соединения в SQL-запросах от самых больших таблиц к самым маленьким, что позволяет минимизировать использование памяти.
Пример:
|
1 2 3 4 5 6 |
SELECT * FROM large_table l LEFT JOIN medium_table m ON l.user_id = m.user_id LEFT JOIN small_table s ON s.user_id = l.user_id |
Предупреждение:
Такой способ оптимизации не является функцией Trino. Это побочный эффект реализации соединений, поэтому данное поведение может измениться без уведомления.
Реализации коннекторов
Чтобы оптимизатор Trino мог использовать стратегии, основанные на стоимости, реализация коннектора должна предоставлять статистику таблиц.
Trino может передавать выполнение запросов, либо отдельных частей запросов, во внешний источник данных. Это означает, что определённое предикатное условие, агрегирующая функция или другая операция передаются для выполнения непосредственно в базовую базу данных или систему хранения.
Результаты такого pushdown-а дают следующие преимущества:
- Повышение общей производительности запросов
- Снижение сетевого трафика между Trino и источником данных
- Снижение нагрузки на удалённый источник данных
Эти преимущества часто приводят к заметному сокращению затрат.
Поддержка pushdown-а является специфичной для каждого коннектора и соответствующей базы данных или системы хранения.
Проброс предикатов (Predicate pushdown)
Predicate pushdown оптимизирует фильтрацию строк. Он использует вычисленный фильтр, как правило, сформированный из условия в операторе WHERE, чтобы исключить ненужные строки. Обработка фильтра передаётся коннектором в источник данных, где и выполняется.
Если проброс предиката для конкретного условия выполнен успешно, в плане выполнения (EXPLAIN) запроса не будет операции ScanFilterProject для этого условия.
Проброс проекций (Projection pushdown)
Projection pushdown оптимизирует фильтрацию по колонкам. Он использует столбцы, указанные в предложении SELECT и других частях запроса, чтобы ограничить доступ только к этим столбцам. Обработка передаётся коннектором в источник данных, и источник данных считывает и возвращает только необходимые столбцы.
Если проброс проекций выполнен успешно, план выполнения (EXPLAIN) запроса обращается только к соответствующим колонкам в Layout операции TableScan.
Проброс разыменования (Dereference pushdown)
Projection pushdown и dereference pushdown оба ограничивают доступ только к нужным столбцам, однако dereference pushdown является более избирательным. Он ограничивает доступ, позволяя считывать только конкретные поля внутри верхнего или вложенного типа данных ROW.
Например, если в таблице коннектора Hive есть столбец типа ROW с несколькими полями, и запрос обращается только к одному полю, dereference pushdown позволяет файловому ридеру считать только это одно поле внутри строки. То же самое применяется к полям вложенных структур внутри верхнего уровня ROW. Это может привести к существенной экономии объёма данных, считываемых из системы хранения.
Проброс агрегации (Aggregation pushdown)
Проброс агрегации может выполняться при соблюдении следующих условий:
- Если коннектор в целом поддерживает проброс агрегации.
- Если коннектор поддерживает проброс конкретной функции или функций.
- Если структура запроса допускает выполнение проброса.
Проверить, выполняется ли проброс для конкретного запроса, можно с помощью анализа плана выполнения (EXPLAIN). Если агрегирующая функция была успешно передана коннектору, в плане выполнения не будет показан оператор Aggregate. В плане будут отображаться только операции, выполняемые непосредственно Trino.
Например, мы загрузили набор данных TPC-H в базу данных PostgreSQL и затем выполнили запрос к нему через коннектор PostgreSQL:
|
1 2 3 |
SELECT regionkey, count(*) FROM nation GROUP BY regionkey; |
Получить план выполнения можно, добавив в начало запроса ключевое слово EXPLAIN:
|
1 2 3 4 |
EXPLAIN SELECT regionkey, count(*) FROM nation GROUP BY regionkey; |
План выполнения этого запроса не содержит оператора Aggregate с функцией count, так как эта операция выполняется коннектором. Функция count(*) отображается как часть оператора TableScan коннектора PostgreSQL. Это подтверждает, что проброс был выполнен успешно.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Fragment 0 [SINGLE] Output layout: [regionkey_0, _generated_1] Output partitioning: SINGLE [] Output[regionkey, _col1] │ Layout: [regionkey_0:bigint, _generated_1:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?} │ regionkey := regionkey_0 │ _col1 := _generated_1 └─ RemoteSource[1] Layout: [regionkey_0:bigint, _generated_1:bigint] Fragment 1 [SOURCE] Output layout: [regionkey_0, _generated_1] Output partitioning: SINGLE [] TableScan[postgresql:tpch.nation tpch.nation columns=[regionkey:bigint:int8, count(*):_generated_1:bigint:bigint] groupingSets=[[regionkey:bigint:int8]], gro Layout: [regionkey_0:bigint, _generated_1:bigint] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} _generated_1 := count(*):_generated_1:bigint:bigint regionkey_0 := regionkey:bigint:int8 |
Существует ряд факторов, которые могут помешать выполнению проброса:
- добавление дополнительных условий в запрос;
- использование другой агрегирующей функции, которую нельзя пробросить в коннектор;
- использование коннектора, не поддерживающего проброс для конкретной функции.
В этом случае в плане выполнения будет показана операция Aggregate, выполняемая Trino. Это чёткий признак того, что проброс в удалённый источник данных не выполняется, и агрегирование обрабатывается самим Trino.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
Fragment 0 [SINGLE] Output layout: [regionkey, count] Output partitioning: SINGLE [] Output[regionkey, _col1] │ Layout: [regionkey:bigint, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ _col1 := count └─ RemoteSource[1] Layout: [regionkey:bigint, count:bigint] Fragment 1 [HASH] Output layout: [regionkey, count] Output partitioning: SINGLE [] Aggregate(FINAL)[regionkey] │ Layout: [regionkey:bigint, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ count := count("count_0") └─ LocalExchange[HASH][$hashvalue] ("regionkey") │ Layout: [regionkey:bigint, count_0:bigint, $hashvalue:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ RemoteSource[2] Layout: [regionkey:bigint, count_0:bigint, $hashvalue_1:bigint] Fragment 2 [SOURCE] Output layout: [regionkey, count_0, $hashvalue_2] Output partitioning: HASH [regionkey][$hashvalue_2] Project[] │ Layout: [regionkey:bigint, count_0:bigint, $hashvalue_2:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ $hashvalue_2 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0)) └─ Aggregate(PARTIAL)[regionkey] │ Layout: [regionkey:bigint, count_0:bigint] │ count_0 := count(*) └─ TableScan[tpch:nation:sf0.01, grouped = false] Layout: [regionkey:bigint] Estimates: {rows: 25 (225B), cpu: 225, memory: 0B, network: 0B} regionkey := tpch:regionkey |
Ограничения
Проброс агрегации не поддерживает ряд более сложных конструкций:
- сложные группировки, такие как
ROLLUP,CUBEилиGROUPING SETS; - выражения внутри вызова агрегирующей функции, например
sum(a * b); - приведение типов (
coercions), напримерsum(integer_column); - агрегирования с сортировкой;
- агрегирования с фильтром.
Проброс соединений (Join pushdown)
Проброс соединений позволяет коннектору передавать операцию соединения таблиц (JOIN) в нижележащий источник данных. Это может значительно повысить производительность и позволяет Trino обрабатывать оставшуюся часть запроса на меньшем объёме данных.
Поддержка проброса соединений зависит от конкретного источника данных, а значит — и от используемого коннектора.
Тем не менее, существуют общие условия, необходимые для выполнения проброса соединения:
- все предикаты (условия), участвующие в соединении, должны поддерживать проброс;
- все таблицы, участвующие в соединении, должны находиться в одном каталоге.
Проверить, выполняется ли проброс соединения, можно с помощью плана выполнения запроса (EXPLAIN). Если соединение было проброшено в источник данных, в плане не будет показан оператор Join:
|
1 2 |
EXPLAIN SELECT c.custkey, o.orderkey FROM orders o JOIN customer c ON c.custkey = o.custkey; |
Ниже приведён пример плана, полученного при выполнении запроса через коннектор PostgreSQL к данным TPC-H в базе PostgreSQL. Он не содержит оператора Join, что означает успешный проброс соединения:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Fragment 0 [SINGLE] Output layout: [custkey, orderkey] Output partitioning: SINGLE [] Output[custkey, orderkey] │ Layout: [custkey:bigint, orderkey:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?} └─ RemoteSource[1] Layout: [orderkey:bigint, custkey:bigint] Fragment 1 [SOURCE] Output layout: [orderkey, custkey] Output partitioning: SINGLE [] TableScan[postgres:Query[SELECT l."orderkey" AS "orderkey_0", l."custkey" AS "custkey_1", r."custkey" AS "custkey_2" FROM (SELECT "orderkey", "custkey" FROM "tpch"."orders") l INNER JOIN (SELECT "custkey" FROM "tpch"."customer") r O Layout: [orderkey:bigint, custkey:bigint] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} orderkey := orderkey_0:bigint:int8 custkey := custkey_1:bigint:int8 |
Обычно проброс соединений полезен, однако в некоторых случаях он может привести к увеличению количества строк (например, при соединении больших таблиц), что может повлиять на производительность.
Проброс ограничения (Limit pushdown)
Операторы LIMIT или FETCH FIRST уменьшают количество возвращаемых записей. Проброс ограничения позволяет коннектору передавать обработку таких запросов в источник данных.
Это может улучшить производительность и значительно снизить объём данных, передаваемых из источника в Trino.
Такие запросы включают конструкции:
|
1 |
LIMIT N |
или
|
1 |
FETCH FIRST N ROWS |
Реализация и поддержка зависят от конкретного коннектора, поскольку разные источники данных обладают разными возможностями.
Проброс Top-N (Top-N pushdown)
Комбинация LIMIT/FETCH FIRST с оператором ORDER BY создаёт запрос, который выбирает небольшое количество строк из большого отсортированного набора данных.
Так как порядок строк имеет значение, оптимизация таких запросов отличается от обычного проброса LIMIT.
Проброс такого запроса называется Top-N pushdown, поскольку он возвращает первые N строк. Эта оптимизация позволяет передать выполнение сортировки и выборки верхних N записей источнику данных, что значительно снижает нагрузку на Trino.
Такие запросы включают части:
|
1 |
ORDER BY ... LIMIT N |
или
|
1 |
ORDER BY ... FETCH FIRST N ROWS |
Поддержка и реализация зависят от коннектора, так как разные источники данных используют разный синтаксис и методы выполнения.
Пример 1. Проброс Top-N в PostgreSQL
|
1 2 3 4 |
SELECT id, name FROM postgresql.public.company ORDER BY id LIMIT 5; |
План выполнения (EXPLAIN):
|
1 2 3 4 |
EXPLAIN SELECT id, name FROM postgresql.public.company ORDER BY id LIMIT 5; |
Результат:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Fragment 0 [SINGLE] Output layout: [id, name] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION Output[id, name] │ Layout: [id:integer, name:varchar] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?} └─ RemoteSource[1] Layout: [id:integer, name:varchar] Fragment 1 [SOURCE] Output layout: [id, name] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION TableScan[postgresql:public.company public.company sortOrder=[id:integer:int4 ASC NULLS LAST] limit=5, grouped = false] Layout: [id:integer, name:varchar] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} name := name:varchar:text id := id:integer:int4 |
Здесь видно, что операция сортировки и ограничения выполняется на стороне PostgreSQL — признак успешного Top-N pushdown.
Пример 2. Отсутствие Top-N pushdown в коннекторе TPC-H
|
1 2 3 4 |
SELECT custkey, name FROM tpch.sf1.customer ORDER BY custkey LIMIT 5; |
План выполнения:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Fragment 0 [SINGLE] Output layout: [custkey, name] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION Output[custkey, name] │ Layout: [custkey:bigint, name:varchar(25)] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ TopN[5 by (custkey ASC NULLS LAST)] │ Layout: [custkey:bigint, name:varchar(25)] └─ LocalExchange[SINGLE] () │ Layout: [custkey:bigint, name:varchar(25)] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ RemoteSource[1] Layout: [custkey:bigint, name:varchar(25)] Fragment 1 [SOURCE] Output layout: [custkey, name] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION TopNPartial[5 by (custkey ASC NULLS LAST)] │ Layout: [custkey:bigint, name:varchar(25)] └─ TableScan[tpch:customer:sf1.0, grouped = false] Layout: [custkey:bigint, name:varchar(25)] Estimates: {rows: 150000 (4.58MB), cpu: 4.58M, memory: 0B, network: 0B} custkey := tpch:custkey name := tpch:name |
В данном случае операция TopN[5 by (custkey ASC NULLS LAST)] выполняется самим Trino, а не источником данных, что означает отсутствие Top-N pushdown.
Примечание:
В отличие от примера с PostgreSQL, в плане запроса TPC-H присутствует оператор TopN в Fragment 0 — это явный признак того, что проброс Top-N не выполняется.
Адаптивные оптимизации планов (Adaptive plan optimizations)
Trino предоставляет несколько адаптивных оптимизаций планов выполнения, которые динамически изменяют план запроса на основе статистики, собираемой во время выполнения.
Эти оптимизации доступны только при включённом режиме отказоустойчивого выполнения (Fault-tolerant execution).
Чтобы отключить все адаптивные оптимизации планов, необходимо установить конфигурационное свойствоfault-tolerant-execution-adaptive-query-planning-enabled=false.
Эквивалентное свойство для сессии — fault_tolerant_execution_adaptive_query_planning_enabled.
Адаптивное переупорядочивание распределённых соединений (Adaptive reordering of partitioned joins)
По умолчанию Trino включает адаптивное переупорядочивание соединений с распределением по разделам (partitioned joins).
Эта оптимизация позволяет Trino динамически менять порядок входных данных для соединений на основе фактических размеров сторон build и probe во время выполнения запроса.
Это особенно полезно, когда статистика таблиц недоступна заранее — Trino может принимать более эффективные решения о порядке соединений, основываясь на реальной информации, собранной во время выполнения.
Чтобы отключить эту оптимизацию, установите конфигурационное свойствоfault-tolerant-execution-adaptive-join-reordering-enabled=false.
Эквивалентное свойство для сессии — fault_tolerant_execution_adaptive_join_reordering_enabled.
Trino Iceberg connector
Apache Iceberg — это открытый табличный формат для работы с большими аналитическими наборами данных.
Коннектор Iceberg позволяет выполнять запросы к данным, хранящимся в файлах, записанных в формате Iceberg, как определено в Iceberg Table Spec.
Коннектор поддерживает спецификации таблиц Apache Iceberg версий 1 и 2.
Состояние таблицы хранится в метаданных. Каждое изменение состояния таблицы создаёт новый файл метаданных и заменяет старый атомарной операцией.
Файл метаданных таблицы содержит информацию о схеме таблицы, конфигурации партиционирования, пользовательских свойствах, а также снимках (snapshots) содержимого таблицы.
Данные Iceberg хранятся в файлах форматов Parquet, ORC или Avro — в зависимости от свойства format, заданного при определении таблицы.
Iceberg был разработан для устранения известных проблем масштабируемости Hive. Hive хранит метаданные таблиц в metastore, который работает поверх реляционной базы данных (например, MySQL). Hive отслеживает пути к партициям в metastore, но не управляет отдельными файлами данных. При выполнении запроса через коннектор Hive в Trino сначала запрашиваются пути к партициям из metastore, затем вызывается файловая система для получения списка всех файлов данных в каждой партиции, после чего считываются метаданные из каждого файла.
В отличие от этого, Iceberg хранит пути к файлам данных в самих файлах метаданных, поэтому при выполнении запросов обращается к файловой системе только для чтения нужных файлов данных.
Требования (Requirements)
Для использования Iceberg необходимы следующие условия:
- Сетевой доступ от координатора Trino и рабочих узлов (workers) к распределённому объектному хранилищу.
- Доступ к одному из поддерживаемых сервисов каталога: Hive metastore (HMS), AWS Glue catalog, JDBC catalog, REST catalog, Nessie server или Snowflake catalog.
- Файлы данных, сохранённые в одном из поддерживаемых форматов — Parquet (по умолчанию), ORC или Avro — и размещённые в поддерживаемой файловой системе.
Чтобы использовать Hive Thrift Metastore, необходимо задать параметр конфигурацииhive.metastore=thrift и дополнительно указать подробности с помощью следующих свойств конфигурации:
| Property name | Description | Default |
|---|---|---|
| hive.metastore.uri | URIs Hive metastore для подключения по протоколу Thrift. Если указан список URI через запятую, используется первый URI, остальные — резервные. Обязательное свойство. Пример: thrift://192.0.2.3:9083 или thrift://192.0.2.3:9083,thrift://192.0.2.4:9083 |
— |
| hive.metastore.username | Имя пользователя, используемое Trino для доступа к Hive metastore | — |
| hive.metastore.authentication.type | Тип аутентификации Hive metastore. Возможные значения: NONE или KERBEROS |
NONE |
| hive.metastore.thrift.client.connect-timeout | Время ожидания соединения для клиента metastore | 10s |
| hive.metastore.thrift.client.read-timeout | Время ожидания чтения сокета для клиента metastore | 10s |
| hive.metastore.thrift.impersonation.enabled | Включить имитацию пользователя (end user impersonation) Hive metastore | — |
| hive.metastore.thrift.use-spark-table-statistics-fallback | Использовать статистику таблиц, сгенерированную Apache Spark, если статистика Hive недоступна | true |
| hive.metastore.thrift.delegation-token.cache-ttl | Время жизни кэша делегирующих токенов для metastore | 1h |
| hive.metastore.thrift.delegation-token.cache-maximum-size | Максимальный размер кэша делегирующих токенов | 1000 |
| hive.metastore.thrift.client.ssl.enabled | Использовать SSL при подключении к metastore | false |
| hive.metastore.thrift.client.ssl.key | Путь к приватному ключу и сертификату клиента (key store) | — |
| hive.metastore.thrift.client.ssl.key-password | Пароль для приватного ключа | — |
| hive.metastore.thrift.client.ssl.trust-certificate | Путь к цепочке сертификатов сервера (trust store). Обязателен при включенном SSL | — |
| hive.metastore.thrift.client.ssl.trust-certificate-password | Пароль для trust store | — |
| hive.metastore.service.principal | Kerberos principal сервиса Hive metastore | — |
| hive.metastore.client.principal | Kerberos principal, используемый Trino для подключения к Hive metastore | — |
| hive.metastore.client.keytab | Путь к keytab клиента Hive metastore | — |
| hive.metastore.thrift.delete-files-on-drop | Активное удаление файлов для управляемых таблиц при операциях drop table/partition, если metastore не удаляет файлы | false |
| hive.metastore.thrift.assume-canonical-partition-keys | Разрешить метастору предполагать, что значения колонок партиций можно конвертировать в строки. Улучшает производительность фильтров по партициям. Колонки типа TIMESTAMP не канонизируются | false |
| hive.metastore.thrift.client.socks-proxy | SOCKS-прокси для Thrift Hive metastore | — |
| hive.metastore.thrift.client.max-retries | Максимальное число повторных попыток запросов к metastore | 9 |
| hive.metastore.thrift.client.backoff-scale-factor | Множитель для увеличения задержки повторных попыток запросов | 2.0 |
| hive.metastore.thrift.client.max-retry-time | Общее максимально допустимое время для повторных попыток запроса к metastore | 30s |
| hive.metastore.thrift.client.min-backoff-delay | Минимальная задержка между повторными попытками запроса | 1s |
| hive.metastore.thrift.client.max-backoff-delay | Максимальная задержка между повторными попытками запроса | 1s |
| hive.metastore.thrift.txn-lock-max-wait | Максимальное время ожидания блокировки транзакции Hive | 10m |
| hive.metastore.thrift.catalog-name | «Имя каталога Hive metastore» — концепция абстракции в Hive для подключения к независимым каталогам. По умолчанию: hive. Если оставить пустым, будет использоваться каталог по умолчанию |
hive |
Конфигурационные свойства Hive-каталога, специфичные для Iceberg
При использовании Hive-каталога коннектор Iceberg поддерживает те же общие параметры конфигурации Thrift Metastore, что и описанные ранее, с добавлением следующего свойства:
iceberg.hive-catalog.locking-enabled — фиксация изменений (commit) таблиц с использованием блокировок Hive. Значение по умолчанию = true.
Установка iceberg.hive-catalog.locking-enabled=false приведёт к фиксации изменений таблиц без использования блокировок Hive. Это свойство следует устанавливать в false только если выполняются все нижеперечисленные условия:
- HIVE-26882 доступен на сервере Hive Metastore. Требуется версия 2.3.10, 4.0.0-beta-1 или новее.
- HIVE-28121 доступен на сервере Hive Metastore, если он работает с MySQL или MariaDB. Требуется версия 2.3.10, 4.1.0, 4.0.1 или новее.
Все остальные каталоги, выполняющие фиксацию изменений в те же таблицы, что и данный каталог, также должны работать на Iceberg версии 1.3 или выше и иметь отключённые блокировки Hive при фиксации.
Общая конфигурация
Чтобы настроить коннектор Iceberg, необходимо создать файл свойств каталога etc/catalog/example.properties, который ссылается на коннектор Iceberg.
Каталог Hive Metastore является реализацией по умолчанию.
Необходимо выбрать и настроить одну из поддерживаемых файловых систем.
|
1 2 3 |
connector.name=iceberg hive.metastore.uri=thrift://example.net:9083 fs.x.enabled=true |
Замените свойство конфигурации fs.x.enabled на соответствующую настройку для требуемой файловой системы.
Также доступны другие типы каталогов метаданных, указанные в разделе требований данной темы. Каждый тип метастора имеет собственные специфические параметры конфигурации, а также общие параметры конфигурации метастора.
Следующие параметры конфигурации Iceberg являются независимыми от того, какая реализация каталога используется:
| Property name | Description | Default |
|---|---|---|
| iceberg.catalog.type | Определяет тип метастора. Возможные значения: hive_metastore, glue, jdbc, rest, nessie, snowflake |
hive_metastore |
| iceberg.file-format | Определяет формат файлов хранения данных Iceberg. Возможные значения: PARQUET, ORC, AVRO |
PARQUET |
| iceberg.compression-codec | Кодек сжатия при записи файлов. Возможные значения: NONE, SNAPPY, LZ4, ZSTD, GZIP |
ZSTD |
| iceberg.use-file-size-from-metadata | Использовать размеры файлов из метаданных вместо файловой системы. Только как временное решение для версии < 0.11.0 Iceberg | true |
| iceberg.max-partitions-per-writer | Максимальное число партиций, обрабатываемых одним writer. Эквивалентное свойство сессии: max_partitions_per_writer |
100 |
| iceberg.target-max-file-size | Целевой максимальный размер записываемых файлов; фактический размер может быть больше | 1GB |
| iceberg.unique-table-location | Использовать случайные, уникальные местоположения таблиц | true |
| iceberg.dynamic-filtering.wait-timeout | Максимальное время ожидания завершения динамических фильтров при генерации сплитов | 1s |
| iceberg.delete-schema-locations-fallback | Удалять ли местоположения схем, если Trino не может определить наличие внешних файлов | false |
| iceberg.minimum-assigned-split-weight | Минимальный вес, назначаемый каждому сплиту (0, 1]. Низкое значение улучшает производительность для таблиц с маленькими файлами | 0.05 |
| iceberg.table-statistics-enabled | Включение статистики таблиц. Эквивалентное свойство сессии: statistics_enabled. Используется для cost-based оптимизаций |
true |
| iceberg.extended-statistics.enabled | Включение сбора расширенной статистики с ANALYZE. Эквивалентное свойство сессии: extended_statistics_enabled |
true |
| iceberg.extended-statistics.collect-on-write | Включение сбора расширенной статистики при записи. Эквивалентное свойство сессии: collect_extended_statistics_on_write |
true |
| iceberg.projection-pushdown-enabled | Включение pushdown проекций | true |
| iceberg.hive-catalog-name | Каталог для переадресации при ссылке на Hive таблицу | — |
| iceberg.register-table-procedure.enabled | Разрешить вызов процедуры register_table пользователем |
false |
| iceberg.add-files-procedure.enabled | Разрешить вызов процедуры add_files пользователем |
false |
| iceberg.query-partition-filter-required | Принудительно требовать фильтр по партициям для запросов к схемам, указанным в iceberg.query-partition-filter-required-schemas. Эквивалентное свойство сессии: query_partition_filter_required |
false |
| iceberg.query-partition-filter-required-schemas | Список схем, для которых Trino применяет фильтр по ключам партиций. Эквивалентное свойство сессии: query_partition_filter_required_schemas. Используется, если iceberg.query-partition-filter-required=true |
[] |
| iceberg.incremental-refresh-enabled | Включение инкрементного обновления materialized view. Эквивалентное свойство сессии: incremental_refresh_enabled |
true |
| iceberg.metadata-cache.enabled | Включение in-memory кэша метаданных на координаторе. Не используется при fs.cache.enabled=true |
true |
| iceberg.object-store-layout.enabled | Включение object store file layout Iceberg. Добавляет детерминированный хэш после пути записи | false |
| iceberg.expire-snapshots.min-retention | Минимальный период хранения для команды expire_snapshots. Эквивалентное свойство сессии: expire_snapshots_min_retention |
7d |
| iceberg.remove-orphan-files.min-retention | Минимальный период хранения для команды remove_orphan_files. Эквивалентное свойство сессии: remove_orphan_files_min_retention |
7d |
| iceberg.idle-writer-min-file-size | Минимальный объем данных, записанных одним writer, чтобы считаться idle и закрытым движком. Эквивалентное свойство сессии: idle_writer_min_file_size |
16MB |
| iceberg.sorted-writing-enabled | Включение сортированной записи в таблицы с заданным порядком сортировки. Эквивалентное свойство сессии: sorted_writing_enabled |
true |
| iceberg.sorted-writing.local-staging-path | Локальная директория для staging при записи сортированных таблиц. Поддерживается ${USER} для разных пользователей. Без настройки используется целевое хранилище |
— |
| iceberg.allowed-extra-properties | Список дополнительных свойств, разрешённых для Iceberg таблиц. * — разрешить все |
[] |
| iceberg.split-manager-threads | Количество потоков для генерации сплитов | В 2 раза больше числа процессоров на координаторе |
| iceberg.metadata.parallelism | Количество потоков для получения метаданных. В данный момент параллелизируется только загрузка таблиц | 8 |
| iceberg.file-delete-threads | Количество потоков для удаления файлов при выполнении expire_snapshots |
В 2 раза больше числа процессоров на координаторе |
| iceberg.bucket-execution | Включение bucket-aware выполнения. Использует физическую информацию о бакетах для оптимизации запросов и уменьшения обмена данными | true |
Hive connector
Коннектор Hive в Trino позволяет выполнять запросы к данным, хранящимся в хранилище данных Apache Hive.
Hive представляет собой комбинацию трёх основных компонентов:
- Файлы данных в различных форматах — обычно хранятся в Hadoop Distributed File System (HDFS) или в объектных хранилищах, таких как Amazon S3.
- Метаданные, описывающие, как файлы данных сопоставляются со схемами и таблицами. Эти метаданные хранятся в базе данных (например, MySQL) и доступны через службу Hive Metastore (HMS).
- Язык запросов HiveQL, который выполняется в распределённой среде вычислений, такой как MapReduce или Tez.
Trino использует только первые два компонента — данные и метаданные. Trino не использует HiveQL и не зависит от среды выполнения Hive.
Требования
Чтобы использовать Hive-коннектор, необходимо:
- Служба Hive Metastore (HMS) или совместимая реализация (например, AWS Glue).
- Настроенная файловая система, поддерживаемая Trino, указанная в конфигурационном файле каталога (catalog).
Сетевой доступ ко всем нужным системам:
- координатор и все worker-узлы должны иметь доступ к Hive Metastore и системе хранения данных;
- доступ к Hive Metastore по протоколу Thrift по умолчанию осуществляется через порт 9083.
Файлы данных должны быть в поддерживаемом формате.
Формат можно задать через свойство таблицы format и соответствующие параметры.
Поддерживаемые форматы файлов
- ORC
- Parquet
- Avro
- Поддерживаемые SerDe (сериализаторы/десериализаторы)
Для сериализуемых форматов допустимы только определённые SerDe:
- RCText — RCFile с использованием
ColumnarSerDe - RCBinary — RCFile с использованием
LazyBinaryColumnarSerDe - SequenceFile (Text) —
org.apache.hadoop.io.Text - SequenceFile (BytesWritable) — Использует
com.twitter.elephantbird.hive.serde.ProtobufDeserializerдля десериализации protobuf-записей - CSV —
org.apache.hadoop.hive.serde2.OpenCSVSerde - JSON —
org.apache.hive.hcatalog.data.JsonSerDe - OPENX_JSON —
org.openx.data.jsonserde.JsonSerDe(OpenX JSON SerDe) — подробнее см. реализацию Trino в исходном коде - TextFile — Обычный текстовый формат
- ESRI —
com.esri.hadoop.hive.serde.EsriJsonSerDe— SerDe для данных ESRI JSON
Хочешь, я переведу следующую часть про конфигурацию Hive-коннектора (например, hive.config.properties и параметры hive.metastore.uri, hive.s3.*)?
Общая конфигурация Hive-коннектора
В таблице документации Trino приведён список общих параметров конфигурации для коннектора Hive. Помимо них, существуют дополнительные наборы параметров, описанные в других разделах документации (например, для настройки S3, GCS, Azure Blob, метастора, ORC/Parquet и др.).
| Имя параметра | Описание | Значение по умолчанию |
|---|---|---|
| hive.recursive-directories | Включает чтение данных из поддиректорий в путях таблиц или партиций. Если выключено, поддиректории игнорируются. Аналогично свойству hive.mapred.supports.subdirectories в Hive. |
false |
| hive.ignore-absent-partitions | Игнорировать партиции, если путь в файловой системе не существует, вместо ошибки. Это может привести к пропуску ожидаемых данных. | false |
| hive.storage-format | Формат файлов по умолчанию при создании новых таблиц. | ORC |
| hive.orc.use-column-names | Доступ к колонкам ORC по именам. По умолчанию доступ осуществляется по порядковому номеру. Эквивалент свойства сессии orc_use_column_names. |
false |
| hive.parquet.use-column-names | Доступ к колонкам Parquet по именам. Если false, используется порядок колонок. Эквивалент parquet_use_column_names. |
true |
| hive.parquet.time-zone | Часовой пояс для чтения и записи Parquet. | По умолчанию часовой пояс JVM |
| hive.compression-codec | Кодек сжатия при записи файлов. Возможные значения: NONE, SNAPPY, LZ4, ZSTD, GZIP. |
GZIP |
| hive.force-local-scheduling | Принудительное выполнение сплитов на том же узле, где работает DataNode, обслуживающий данные. Полезно, если Trino установлен рядом с каждым DataNode. | false |
| hive.respect-table-format | Следует ли использовать существующий формат таблицы при записи новых партиций. | true |
| hive.immutable-partitions | Разрешена ли вставка данных в существующие партиции. Если true, то hive.insert-existing-partitions-behavior=APPEND запрещено. |
false |
| hive.insert-existing-partitions-behavior | Что происходит при вставке данных в существующую партицию: • APPEND — добавляет данные • OVERWRITE — перезаписывает • ERROR — запрещает изменение |
APPEND |
| hive.target-max-file-size | Максимальный желаемый размер новых файлов. | 1GB |
| hive.create-empty-bucket-files | Создавать ли пустые файлы для бакетов без данных. | false |
| hive.validate-bucketing | Проверка корректности распределения данных по бакетам при чтении. | true |
| hive.partition-statistics-sample-size | Количество партиций, анализируемых при вычислении статистики таблицы. | 100 |
| hive.max-partitions-per-writers | Максимальное число партиций на одного writer’а. | 100 |
| hive.max-partitions-for-eager-load | Максимальное число партиций, которые координатор может заранее загрузить для одной таблицы. | 100,000 |
| hive.max-partitions-per-scan | Максимальное число партиций для одного сканирования таблицы. | 1,000,000 |
| hive.non-managed-table-writes-enabled | Разрешить запись в внешние (non-managed) таблицы. | false |
| hive.non-managed-table-creates-enabled | Разрешить создание внешних (non-managed) таблиц. | true |
| hive.collect-column-statistics-on-write | Автоматически собирать статистику по колонкам при записи данных. | true |
| hive.file-status-cache-tables | Кэшировать список файлов для указанных таблиц. Примеры: fruit.apple,fruit.orange — только для этих таблиц fruit.*,vegetable.* — для всех таблиц в схемах fruit и vegetable * — для всех таблиц во всех схемах |
— |
| hive.file-status-cache.excluded-tables | Исключения из кэширования (противоположно предыдущему параметру). | — |
| hive.file-status-cache.max-retained-size | Максимальный размер данных, удерживаемых в кэше. | 1GB |
| hive.file-status-cache-expire-time | Время жизни кэшированных данных. | 1m |
| hive.per-transaction-file-status-cache.max-retained-size | Максимальный объём кэша для всех активных транзакций. | 100MB |
| hive.rcfile.time-zone | Часовой пояс для двоичных timestamp в RCFile. Для Hive 3.1+ должно быть UTC. |
Часовой пояс JVM |
| hive.timestamp-precision | Точность столбцов типа TIMESTAMP: MILLISECONDS, MICROSECONDS, NANOSECONDS. |
MILLISECONDS |
| hive.temporary-staging-directory-enabled | Использовать ли временную директорию для промежуточных файлов при записи. | true |
| hive.temporary-staging-directory-path | Путь к временной директории для операций записи. Можно использовать ${USER} для подстановки имени пользователя. |
/tmp/presto-${USER} |
| hive.hive-views.enabled | Включить поддержку Hive Views. | false |
| hive.hive-views.legacy-translation | Использовать устаревший механизм перевода Hive Views. | false |
| hive.parallel-partitioned-bucketed-writes | Повысить параллелизм при записи в партиционированные и бакетированные таблицы. | true |
| hive.query-partition-filter-required | Требовать наличие фильтра по партициям в запросе. | false |
| hive.query-partition-filter-required-schemas | Список схем, для которых обязательно использование фильтра по партициям (работает только если вышеуказанный параметр включён). | [] |
| hive.table-statistics-enabled | Включить сбор статистики таблиц (используется для CBO). | true |
| hive.auto-purge | Устанавливает значение свойства auto_purge для управляемых таблиц по умолчанию. |
false |
| hive.partition-projection-enabled | Включает поддержку partition projection (Athena-style). | true |
| hive.s3-glacier-filter | Фильтрация S3-объектов по классу хранения: • READ_ALL — читать все • READ_NON_GLACIER — только не Glacier • READ_NON_GLACIER_AND_RESTORED — не Glacier и восстановленные |
READ_ALL |
| hive.max-partition-drops-per-query | Максимальное количество партиций, которые можно удалить за один запрос. | 100,000 |
| hive.metastore.partition-batch-size.max | Максимальное число партиций, обрабатываемых за один батч. | 100 |
| hive.single-statement-writes | Включает автокоммит для всех операций записи (запрещает многооперационные транзакции). | false |
| hive.metadata.parallelism | Количество потоков для загрузки метаданных (например, таблиц). | 8 |
| hive.protobuf.descriptors.location | Путь к каталогу, где хранятся бинарные дескрипторы Protocol Buffers для таблиц с форматом ProtobufDeserializer. |
— |
| hive.protobuf.descriptors.cache.max-size | Максимальный размер кэша дескрипторов Protocol Buffers. | 64 |
| hive.protobuf.descriptors.cache.refresh-interval | Как часто перезагружать дескрипторы Protocol Buffers с диска. | 1d |
Поддержка отказоустойчивого выполнения
Коннектор поддерживает отказоустойчивое выполнение (Fault-tolerant execution) при обработке запросов. Поддерживаются как операции чтения, так и записи с любой политикой повторных попыток (retry policy).
Конфигурация доступа к файловым системам
Коннектор поддерживает доступ к следующим файловым системам:
- Azure Storage — поддержка файловой системы Azure Storage.
- Google Cloud Storage — поддержка файловой системы Google Cloud Storage.
- S3 — поддержка файловой системы Amazon S3.
- HDFS — поддержка файловой системы Hadoop Distributed File System.
Необходимо включить и настроить доступ к конкретной файловой системе. Поддержка устаревших конфигураций не рекомендуется и будет удалена.
Сопоставление типов (Type mapping)
Коннектор читает и записывает данные в поддерживаемых форматах файлов Avro, ORC и Parquet в соответствии со спецификацией Iceberg.
Поскольку Trino и Iceberg поддерживают разные наборы типов данных, коннектор выполняет преобразование некоторых типов при чтении или записи данных. Типы данных могут не совпадать полностью в обоих направлениях между Trino и источником данных. Подробные сведения о сопоставлении типов в каждом направлении приведены в следующих разделах.
Спецификация Iceberg определяет поддерживаемые типы данных и их сопоставление с форматами хранения в файлах Avro, ORC и Parquet:
- Iceberg → Avro
- Iceberg → ORC
- Iceberg → Parquet
Сопоставление типов Iceberg с типами Trino
Коннектор сопоставляет типы Iceberg с соответствующими типами Trino в соответствии со следующей таблицей:
| Iceberg type | Trino type |
|---|---|
| BOOLEAN | BOOLEAN |
| INT | INTEGER |
| LONG | BIGINT |
| FLOAT | REAL |
| DOUBLE | DOUBLE |
| DECIMAL(p,s) | DECIMAL(p,s) |
| DATE | DATE |
| TIME | TIME(6) |
| TIMESTAMP | TIMESTAMP(6) |
| TIMESTAMPTZ | TIMESTAMP(6) WITH TIME ZONE |
| STRING | VARCHAR |
| UUID | UUID |
| BINARY | VARBINARY |
| FIXED(L) | VARBINARY |
| STRUCT(…) | ROW(…) |
| LIST(e) | ARRAY(e) |
| MAP(k,v) | MAP(k,v) |
Функции
Функции доступны в системной схеме каждого каталога. Их можно вызывать в SQL-запросах. Например, следующий фрагмент кода демонстрирует, как выполнить функцию system.bucket в каталоге Iceberg:
|
1 |
SELECT system.bucket('trino', 16); |
bucket
Эта функция предоставляет доступ к трансформации bucket из Iceberg, позволяя пользователям определить, в какой бакет (bucket) попадает конкретное значение. Функция принимает два аргумента: значение партиции и количество бакетов.
Поддерживаемые типы для первого аргумента функции:
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- VARCHAR
- VARBINARY
- DATE
- TIMESTAMP
- TIMESTAMP WITH TIME ZONE
Например, если нужно определить, к какому номеру бакета будет отнесена конкретная строка, можно выполнить следующий запрос:
|
1 |
SELECT system.bucket('trino', 16); |
Эту функцию также можно использовать в выражении WHERE, чтобы выполнять операции только для определённого бакета:
|
1 2 3 |
SELECT count(*) FROM customer WHERE system.bucket(custkey, 16) = 2; |
Управление данными Trino и Iceberg
Функциональность управления данными включает поддержку операторов INSERT, UPDATE, DELETE, TRUNCATE и MERGE.
Удаление по партициям
Для партиционированных таблиц коннектор Iceberg поддерживает удаление целых партиций, если выражение WHERE содержит фильтры только по колонкам партиционирования с идентичной трансформацией (identity transform), которые могут соответствовать целым партициям.
Например, для таблицы, определённой в разделе «Партиционированные таблицы», следующий SQL-запрос удаляет все партиции, где значение столбца country равно ‘US’:
|
1 2 |
DELETE FROM example.testdb.customer_orders WHERE country = 'US'; |
Удаление партиций выполняется, если выражение WHERE удовлетворяет этим условиям.
Удаление на уровне строк
Таблицы, использующие вторую версию (v2) спецификации Iceberg, поддерживают удаление отдельных строк с помощью записи position delete-файлов.
Управление схемами и таблицами
Функциональность управления схемами и таблицами включает поддержку следующих операций:
- CREATE SCHEMA
- DROP SCHEMA
- ALTER SCHEMA
- CREATE TABLE
- CREATE TABLE AS
- DROP TABLE
- ALTER TABLE
- COMMENT
Эволюция схемы
Iceberg поддерживает эволюцию схемы, включая безопасное добавление, удаление и переименование колонок, в том числе во вложенных структурах.
Iceberg поддерживает изменение типов колонок только в случае операций расширения (widening):
- INTEGER → BIGINT
- REAL → DOUBLE
- DECIMAL(p,s) → DECIMAL(p2,s), если p2 > p (масштаб при этом изменяться не может)
Также может быть изменена схема партиционирования, при этом коннектор сохраняет возможность выполнять запросы к данным, созданным до изменения партиционирования.
ALTER TABLE EXECUTE
Коннектор поддерживает следующие команды, используемые с оператором ALTER TABLE EXECUTE.
optimize
Команда optimize используется для переписывания содержимого указанной таблицы таким образом, чтобы данные были объединены в меньшее количество, но более крупных файлов. Если таблица партиционирована, операция компактации данных выполняется отдельно для каждой партиции, выбранной для оптимизации. Эта операция повышает производительность при чтении.
Все файлы, размер которых меньше заданного параметра file_size_threshold (пороговое значение по умолчанию — 100 МБ), объединяются в случае, если выполняется одно из следующих условий для каждой партиции:
- более одного файла данных доступно для объединения;
- присутствует хотя бы один файл данных, к которому прикреплены файлы удаления (delete files).
|
1 |
ALTER TABLE test_table EXECUTE optimize |
Следующий запрос объединяет файлы в таблице, размер которых менее 128 мегабайт:
|
1 |
ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '128MB') |
Можно использовать предложение WHERE с колонками, по которым осуществляется партиционирование таблицы, чтобы ограничить оптимизацию определёнными партициями:
|
1 2 |
ALTER TABLE test_partitioned_table EXECUTE optimize WHERE partition_key = 1 |
Можно задать более сложное выражение WHERE, чтобы сузить область действия процедуры оптимизации. В следующем примере значения с типом timestamp приводятся к типу date, и используется сравнение, чтобы оптимизировать только те партиции, которые содержат данные начиная с 2022 года:
|
1 2 |
ALTER TABLE test_table EXECUTE optimize WHERE CAST(timestamp_tz AS DATE) > DATE '2021-12-31' |
Можно также использовать выражение WHERE с метаданными колонок, чтобы отфильтровать, какие файлы будут оптимизированы:
|
1 2 |
ALTER TABLE test_table EXECUTE optimize WHERE "$file_modified_time" > date_trunc('day', CURRENT_TIMESTAMP); |
optimize_manifests
Перезаписывает файлы манифестов, группируя их по колонкам партиционирования. Это может быть полезно для оптимизации планирования сканирования, когда существует множество мелких файлов манифестов или когда в запросах на чтение используются фильтры по партициям, но сами файлы манифестов не сгруппированы по партициям. Параметр таблицы Iceberg commit.manifest.target-size-bytes управляет максимальным размером файлов манифестов, создаваемых этой процедурой.
Процедура optimize_manifests выполняется следующим образом:
|
1 |
ALTER TABLE test_table EXECUTE optimize_manifests; |
expire_snapshots
Команда expire_snapshots удаляет все снапшоты, а также связанные с ними метаданные и файлы данных. Регулярное выполнение очистки снапшотов рекомендуется для удаления файлов данных, которые больше не используются, и для поддержания компактного размера метаданных таблицы. Процедура затрагивает все снапшоты, которые старше периода времени, заданного параметром retention_threshold.
Процедура expire_snapshots выполняется следующим образом:
|
1 |
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d'); |
Значение параметра retention_threshold должно быть больше или равно значению iceberg.expire-snapshots.min-retention, заданному в каталоге, иначе выполнение процедуры завершится с ошибкой следующего вида:
- Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d).
Значение по умолчанию для этого свойства — 7d.
remove_orphan_files
Команда remove_orphan_files удаляет все файлы из каталога данных таблицы, которые не связаны с файлами метаданных и которые старше значения, заданного параметром retention_threshold. Регулярное удаление «осиротевших» файлов рекомендуется для поддержания контролируемого размера каталога данных таблицы.
Процедура remove_orphan_files выполняется следующим образом:
|
1 |
ALTER TABLE test_table EXECUTE remove_orphan_files(retention_threshold => '7d'); |
|
1 2 3 4 5 6 |
metric_name | metric_value ----------------------------+-------------- processed_manifests_count | 2 active_files_count | 98 scanned_files_count | 97 deleted_files_count | 0 |
- Значение параметра retention_threshold должно быть больше или равно значению
iceberg.remove-orphan-files.min-retention, указанному в каталоге, иначе выполнение процедуры завершится с ошибкой следующего вида:
Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d).
Значение по умолчанию для этого свойства — 7d.
Результат выполнения запроса содержит следующие метрики:
| Имя свойства | Описание |
|---|---|
| processed_manifests_count | Количество файлов манифестов, прочитанных процедурой remove_orphan_files. |
| active_files_count | Количество файлов, принадлежащих снапшотам, которые ещё не были удалены (не истекли). |
| scanned_files_count | Количество файлов, просканированных из файловой системы. |
| deleted_files_count | Количество файлов, удалённых процедурой remove_orphan_files. |
Основные алгоритмы соединения в Trino
Trino применяет классический алгоритм хеш‑соединения (hash join) с распределением данных по узлам, и выбирает конкретную стратегию распределения соединения в зависимости от статистики и конфигурации.
Основные режимы:
- Broadcast join — одна из сторон соединения (обычно «меньшая» таблица, build‑сторона) транслируется (broadcast) на все узлы, которые обрабатывают другую таблицу (probe‑сторона). Каждая нода строит хеш‑таблицу по build‑стороне, затем probe‑сторона сканируется и сравнивается с этой хеш‑таблицей. Преимущество: минимум сетевого обмена данных, если build‑сторона маленькая. Недостаток: build‑сторона должна помещаться в память каждой ноды.
- Partitioned (hash‑distributed) join — обе стороны соединения распределяются (shuffle) по узлам по хешу ключа соединения. То есть строки обеих таблиц хэшируются по ключу соединения и направляются на узлы таким образом, что строки с одинаковыми ключами «встречаются» на одном узле. Затем на каждом узле строится хеш‑таблица и выполняется соединение. Этот режим требует больше сетевого обмена (shuffle) и может быть дороже, но позволяет соединять большие таблицы, когда broadcast невозможен.
- Automatic (cost‑based выбор) — если свойство join_distribution_type установлено в AUTOMATIC, Trino на основании статистики (row counts, размер таблиц, распределение, доступность статистики) выбирает между broadcast и partitioned.
Также Trino применяет динамическую фильтрацию (dynamic filtering / dynamic partition pruning), которая помогает уменьшить объём данных, участвующих в соединении, за счёт использования фильтров, полученных на build‑стороне, и применяемых к probe‑стороне.
Что такое build‑сторона и probe‑сторона
Когда Trino выполняет hash join, процесс выглядит так:
- Build‑сторона – это таблица (или её часть), по которой строится хеш‑таблица в памяти. Эта таблица «кладётся» в RAM для последующего быстрого поиска.
- Probe‑сторона – это таблица, которая итерируется построчно, и для каждой строки выполняется поиск совпадений в хеш‑таблице, построенной по build‑стороне. Таблица probe side в Trino не хранится где-то постоянно, а потоково обрабатывается (то есть не загружается в память целиком).
- Важно: для broadcast join build‑сторона должна помещаться в память всех узлов кластера. Если она слишком большая, это может привести к проблемам с памятью.
- Для partitioned join build‑сторона распределяется по узлам по хешу ключа, что снижает требования к памяти на отдельном узле, но увеличивает сетевой трафик (shuffle).
Что такое OOM
OOM (Out Of Memory) – это ситуация, когда процесс не может выделить память для выполнения операции и падает.
В контексте Trino и hash join OOM возникает, если:
- build‑сторона слишком большая для оперативной памяти узлов,
- probe‑сторона слишком большая и одновременно удерживается в памяти много промежуточных данных,
- большое количество промежуточных результатов или shuffle операций.
Как это работает с Iceberg + Hive + HDFS
Когда Trino работает с Iceberg таблицами (например, хранящимися на HDFS или объектном хранилище) и/или через Hive‑метастор, совокупность форматов и оптимизаций влияет на выбор алгоритма соединения и его эффективность. Вот ключевые моменты:
- Iceberg таблицы поддерживают богатую метаинформацию: файлы данных, manifest‑лист, верх/низ границ значений в файлах, статистику по колонкам, информацию о партициях.
- Благодаря этой метаинформации Trino может ограничивать ( prune ) число файлов и разделов, которые участвуют в соединении: например, если build‑сторона фильтруется, dynamic filtering может уменьшить probe‑сторону. С меньшим входным объёмом соединения легче сделать broadcast.
- При подключении через Hive‑коннектор (или Iceberg‑таблицы, которые используют Hive метастор) Trino поддерживает динамическое разделение (dynamic partition pruning) и bucket‑pruning, что помогает уменьшать ввод данных перед соединением.
- Если таблицы большие и статистика доступна (через Iceberg метаданные или Hive статистику), Trino может выбрать типа broadcast если «меньшая» сторона после фильтров мал, либо partitioned если нет.
Рекомендованные алгоритмы соединения для Iceberg в Trino
Исходя из архитектуры и ограничений, можно сформулировать рекомендации:
- Если одна из таблиц значительно меньше другой (после применения фильтров, dynamic pruning и т.д.), стоит использовать broadcast join — это быстрее, меньше сетевого обмена.
- Если обе таблицы большие или распределены, и build‑сторона не помещается в память на узлах, использовать partitioned join.
- Учитывать метаинформацию Iceberg (файлы, статистику) — чем лучше статистика, тем точнее выбор алгоритма и меньше данные, участвующие в соединении.
- Использовать dynamic filtering и partition pruning — особенно важно для Iceberg: метаданные позволяют заранее отсеивать лишние файлы, уменьшив объём входных данных для соединения.
- Настраивать параметры Trino — например
join_max_broadcast_table_size,join_distribution_type,join_reordering_strategy, и др.
Ограничения и нюансы
Даже если таблица маленькая, если статистика недоступна или сильно распределена, Trino может выбрать partitioned join в режиме AUTOMATIC.
Broadcast требует, чтобы build‑сторона (после фильтров) помещалась в память всех узлов — если нет, узлы могут страдать от OOM или медленной работы.
Partitioned join требует сильной сети (shuffle), и если данные сильно скошены (skew) по ключам соединения, производительность может быть низкой.
При работе с Iceberg таблицами хранение в объектном хранилище (например S3) или HDFS может влиять на задержки получения файлов и метаданных, что может замедлить фазу подготовки данных к соединению.
Не все оптимизации (например dynamic filtering) могут поддерживаться одинаково для всех коннекторов и всех форматов файлов. Нужно проверять, поддерживает ли ваш коннектор (Iceberg + Hive) нужные функции.
Возможный план оптимизации запроса Trino к Iceberg
Рекомендации по улучшению запроса
- Пересмотреть join‑план:
- Убедиться, что меньшая таблица идёт в качестве build‑стороны.
- Для больших таблиц рассмотреть partitioned join вместо broadcast join.
- Использовать dynamic filtering / partition pruning:
- Для Iceberg таблиц это особенно эффективно – фильтруются лишние файлы и партиции ещё до join, уменьшая размер probe‑стороны.
- Разделять JOIN на этапы:
- Если три таблицы очень большие, попробовать выполнить join двух таблиц, сохранить результат (CTE или временную таблицу), затем join с третьей.
- Настроить Trino параметры:
join_max_broadcast_table_size– уменьшить лимит для broadcast join, чтобы большие таблицы не помещались в память.query.max-memoryиquery.max-memory-per-node– увеличить лимит памяти на узел, если инфраструктура позволяет.spill-to-disk-enabled– включить сброс промежуточных данных на диск (для partitioned join). LEGACY
- Проверить skew:
- Если есть скошенные ключи join (одни ключи встречаются очень часто), Trino можно настроить для балансировки данных или добавить фильтры на уровне подзапросов.
- Использовать ANALYZE / статистику:
- Для Iceberg это
ANALYZE– оптимизатор будет точнее выбирать порядок join и build/probe.
- Для Iceberg это
- Минимизировать количество колонок:
- Использовать
projection pushdown– считывать только нужные колонки из Iceberg таблиц, чтобы уменьшить размер в памяти.
- Использовать
Обзор Explain и Explain Analyze
EXPLAIN
Синтаксис
|
1 |
EXPLAIN [ ( option [, ...] ) ] statement |
Где option может содержать:
|
1 2 |
FORMAT { TEXT | GRAPHVIZ | JSON } TYPE { LOGICAL | DISTRIBUTED | VALIDATE | IO } |
Explain показывает логический или распределённый план выполнения оператора, либо проверяет оператор на корректность.
По умолчанию отображается распределённый план DISTRIBUTED. Каждый фрагмент (fragment) распределённого плана выполняется одним или несколькими узлами Trino. Разделение на фрагменты отражает обмен данными (exchange) между узлами Trino. Тип фрагмента (Fragment type) определяет, как именно фрагмент выполняется узлами Trino и как данные распределяются между фрагментами:
- SINGLE: Фрагмент выполняется на одном узле.
- HASH: Фрагмент выполняется на фиксированном количестве узлов, при этом входные данные распределяются с помощью хеш-функции.
- ROUND_ROBIN: Фрагмент выполняется на фиксированном количестве узлов, при этом входные данные распределяются по принципу кругового распределения (round-robin).
- BROADCAST: Фрагмент выполняется на фиксированном количестве узлов, при этом входные данные рассылаются (транслируются) на все узлы.
- SOURCE: Фрагмент выполняется на тех узлах, где находятся входные splits, к которым осуществляется доступ.
EXPLAIN ANALYZE
Синтаксис
|
1 |
EXPLAIN ANALYZE [VERBOSE] statement |
Выполняет оператор и показывает распределённый план его выполнения вместе со стоимостью (cost) каждой операции.
Опция VERBOSE выводит более подробную информацию и низкоуровневую статистику; для её понимания может потребоваться знание внутренних механизмов и деталей реализации Trino.
Примечание:
Статистика может быть не полностью точной, особенно для запросов, которые выполняются очень быстро.
Session properties
Синтаксис
|
1 2 |
SET SESSION name = expression SET SESSION catalog.name = expression |
Устанавливает значение свойства сессии или свойства сессии конкретного каталога.
Свойство сессии — это параметр конфигурации, который пользователь может временно изменить на время текущего подключения к кластеру Trino.
Многие параметры конфигурации имеют соответствующее свойство сессии, принимающее те же значения, что и конфигурационное свойство.
Существует два типа свойств сессии:
- Системные свойства сессии (System session properties) — применяются ко всему кластеру. Большинство свойств сессии являются системными, если не указано иное.
- Свойства сессии каталога (Catalog session properties) — это свойства, определённые коннектором, которые можно установить отдельно для каждого каталога. Такие свойства задаются с префиксом имени каталога, например:
catalogname.property_name.
Свойства сессии привязаны к текущей сессии, поэтому пользователь может иметь несколько подключений к одному кластеру, и в каждом из них значения свойств могут отличаться.
После завершения сессии — при отключении или создании новой — все изменения, внесённые в свойства сессии, теряются.
Сброс на диск (Spill to disk)
В случае операций, требующих большого объёма памяти, Trino позволяет выгружать промежуточные результаты на диск. Цель этого механизма — обеспечить выполнение запросов, которым требуется больше памяти, чем разрешено лимитами на запрос или на узел.
Механизм похож на подкачку страниц (page swapping) на уровне операционной системы, однако реализован на уровне приложения, чтобы учитывать специфические потребности Trino.
Свойства, связанные с механизмом сброса на диск, описаны в разделе Spilling properties.
Предупреждение:
Функциональность сброса на диск является устаревшей (legacy) в Trino. Вместо неё рекомендуется использовать Fault-tolerant execution (устойчивое к сбоям выполнение) с политикой повторного выполнения задач (task retry policy) и настроенным Exchange manager.
Управление памятью и сброс
По умолчанию Trino завершает запросы, если потребление памяти во время их выполнения превышает значения параметров query_max_memory или query_max_memory_per_node.
Этот механизм обеспечивает справедливое распределение памяти между запросами и предотвращает взаимные блокировки (deadlocks), вызванные нехваткой памяти. Он эффективен, когда в кластере выполняется много небольших запросов, но приводит к принудительной остановке крупных запросов, выходящих за лимиты.
Чтобы избежать этой неэффективности, была введена концепция revocable memory (отзываемой памяти).
Запрос может использовать память, которая не учитывается в лимитах, однако менеджер памяти может отозвать её в любой момент. Когда память отзывается, исполнитель запроса (query runner) сбрасывает промежуточные данные из памяти на диск и продолжает их обработку позже.
На практике, если кластер простаивает и доступна вся память, ресурсоёмкий запрос может использовать всю память кластера.
Если же свободной памяти мало, тот же запрос может быть вынужден использовать диск для хранения промежуточных данных.
Запрос, вынужденный сбрасывать данные на диск, может выполняться в десятки раз дольше, чем запрос, полностью работающий в памяти.
Важно понимать, что включение функции сброса на диск не гарантирует успешного выполнения всех запросов, потребляющих много памяти. Возможна ситуация, когда Trino не сможет разбить промежуточные данные на достаточно маленькие части, и при попытке их загрузки снова произойдёт ошибка Out of memory.
Дисковое пространство для сброса
Сброс промежуточных результатов на диск и последующее их чтение — это дорогостоящие операции ввода-вывода (IO). Поэтому запросы, использующие сброс, часто ограничиваются скоростью диска.
Для повышения производительности рекомендуется указать несколько путей на разных локальных устройствах для хранения сброса (параметр spiller-spill-path в Spilling properties).
Системный диск не следует использовать для сброса, особенно если на нём работает JVM или записываются логи. Это может привести к нестабильности кластера. Также рекомендуется отслеживать загрузку (saturation) дисков, используемых для сброса.
Trino рассматривает пути сброса как независимые диски (JBOD), поэтому использовать RAID для этих целей не требуется.
Сжатие при сбросе
Если включено сжатие сброса с помощью свойства spill-compression-codec, то страницы, выгружаемые на диск, сжимаются перед записью.
Это снижает объём операций ввода-вывода, но увеличивает нагрузку на CPU из-за необходимости сжатия и распаковки данных.
Шифрование при сбросе
Если включено шифрование сброса (параметр spill-encryption-enabled в Spilling properties), содержимое сброса шифруется с помощью случайно сгенерированного ключа, уникального для каждого файла сброса.
Это повышает нагрузку на процессор и снижает скорость операций записи, но защищает выгруженные данные от несанкционированного восстановления.
При включённом шифровании рекомендуется уменьшить значение memory-revoking-threshold, чтобы компенсировать дополнительную задержку, связанную с шифрованием и дешифрованием данных при сбросе.
Поддерживаемые операции (Supported operations)
Не все операции в Trino поддерживают сброс на диск, и каждая из них обрабатывает его по-своему.
В настоящее время этот механизм реализован для следующих типов операций:
JOIN’ы
Во время выполнения операции JOIN одна из объединяемых таблиц сохраняется в памяти.
Эта таблица называется build table (строящаяся таблица).
Строки из другой таблицы (probe side) проходят <strong>потоком</strong> и передаются дальше, если они соответствуют строкам из build-таблицы.
Наиболее ресурсоёмкая по памяти часть JOIN — это именно build-таблица.
Когда параметр task.concurrency больше единицы, build-таблица разбивается на несколько партиций.
Количество партиций равно значению этого параметра.
Если build-таблица разбита на партиции, механизм сброса на диск может уменьшить пиковое использование памяти во время JOIN.
Когда запрос приближается к лимиту памяти, часть партиций build-таблицы сбрасывается на диск вместе со строками из другой таблицы, которые попадают в те же партиции.
Количество сброшенных партиций напрямую влияет на объём требуемого дискового пространства.
После этого сброшенные партиции читаются обратно по одной, чтобы завершить выполнение JOIN.
Благодаря этому механизму пиковое использование памяти оператором JOIN может быть уменьшено до размера наибольшей партиции build-таблицы.
При отсутствии перекоса данных (data skew) это примерно 1 / task.concurrency от общего размера build-таблицы.
Агрегации (Aggregations)
Агрегатные функции выполняют операции над группой значений и возвращают одно результирующее значение.
Если количество групп велико, то может потребоваться значительный объём памяти.
При включённом сбросе на диск, если памяти не хватает, промежуточные накопленные результаты агрегирования записываются на диск.
Позже они загружаются обратно и объединяются, используя меньше памяти.
Сортировка (ORDER BY)
При сортировке большого объёма данных также может понадобиться много памяти.
Если включён сброс на диск для ORDER BY, и памяти недостаточно, промежуточные отсортированные результаты записываются на диск.
Позже они читаются обратно и объединяются с меньшими затратами памяти.
Оконные функции (Window functions)
Оконные функции выполняют операцию над “окном” строк и возвращают одно значение для каждой строки.
Если окно содержит много строк, потребление памяти может быть значительным.
Когда включён сброс на диск для оконных функций, и памяти не хватает, промежуточные результаты записываются на диск, а затем подгружаются и объединяются, когда память снова доступна.
Однако есть ограничение: сброс не работает во всех случаях, например, если одно окно слишком велико и не может быть разбито на части.
Exchange (Обмен данными)
Термины, связанные с Exchange, описывают, как данные передаются между worker-узлами.
Передача может быть локальной или удалённой.
LocalExchange [exchange_type]
Передаёт данные локально, внутри одного worker’а, между различными стадиями выполнения запроса. Значение exchange_type может быть одним из типов обмена, описанных ниже.
RemoteExchange [exchange_type]
Передаёт данные между разными worker-узлами на разных этапах выполнения запроса.
Значение exchange_type также может быть одним из описанных ниже.
Типы логического Exchange (Logical Exchange types)
| Тип | Описание |
|---|---|
| GATHER | Один узел собирает результаты со всех остальных. Например, финальная стадия SELECT собирает данные со всех узлов и записывает результат (например, в Amazon S3). |
| REPARTITION | Распределяет строки между воркерами в соответствии с схемой партиционирования, требуемой для следующего оператора. |
| REPLICATE | Копирует данные строк на все воркеры. |
Scanning (Сканирование данных)
Эти термины описывают, как выполняется чтение данных во время запроса:
| Тип | Описание |
|---|---|
| TableScan | Сканирует исходные данные таблицы и применяет partition pruning (отсечение ненужных партиций) на основе фильтра. |
| ScanFilter | Сканирует исходные данные и применяет partition pruning на основе фильтра, а также дополнительные фильтры, которые нельзя применить на уровне партиций. |
| ScanFilterProject | Сначала сканирует данные и применяет фильтры (как в ScanFilter), затем изменяет структуру (layout) выходных данных для оптимизации последующих этапов выполнения. |
Fault-tolerant execution (Отказоустойчивое выполнение)
YouTube: The architecture behind fault tolerant execution | Starburst Virtual Events
По умолчанию, если узлу Trino не хватает ресурсов для выполнения задачи или он выходит из строя во время выполнения запроса, запрос завершается с ошибкой и должен быть запущен вручную повторно. Чем дольше выполняется запрос, тем выше вероятность того, что он столкнётся с подобными сбоями.
Отказоустойчивое выполнение или Fault-tolerant execution — это механизм в Trino, который позволяет кластеру смягчать последствия ошибок выполнения запросов путём повторного запуска запросов или отдельных задач при сбоях. При включённом отказоустойчивом выполнении промежуточные данные обмена (exchange data) сохраняются на диск и могут быть повторно использованы другим рабочим узлом в случае его отказа или любой другой ошибки во время выполнения запроса.
Примечание
Отказоустойчивость не применяется к неправильным запросам или другим ошибкам пользователя. Например, Trino не будет тратить ресурсы на повторный запуск запроса, который завершается ошибкой из-за невозможности разобрать SQL.
Пошаговое руководство по настройке кластера Trino с отказоустойчивым выполнением для повышения устойчивости обработки запросов можно найти в разделе Improve query processing resilience.
Конфигурация
Отказоустойчивое выполнение отключено по умолчанию. Чтобы включить эту функцию, установите конфигурационное свойство retry-policy в значение QUERY или TASK в зависимости от желаемой политики повторных попыток.
|
1 |
retry-policy=QUERY |
Предупреждение
Установка retry-policy может привести к сбоям запросов при использовании коннекторов, которые не поддерживают отказоустойчивое выполнение. В таких случаях возникает ошибка: “This connector does not support query retries”.
Поддержка отказоустойчивого выполнения SQL-операторов зависит от конкретного коннектора. Следующие коннекторы поддерживают отказоустойчивое выполнение:
- BigQuery connector
- Delta Lake connector
- Hive connector
- Iceberg connector
- MariaDB connector
- MongoDB connector
- MySQL connector
- Oracle connector
- PostgreSQL connector
- Redshift connector
- SQL Server connector
Следующие конфигурационные свойства (Fault-tolerant execution configuration properties) управляют поведением отказоустойчивого выполнения в кластере Trino:
|
Property name |
Description |
Default |
retry-policy |
Настраивает, что именно повторяется в случае сбоя: QUERY для повторного запуска всего запроса или TASK для индивидуального повторного запуска задач при их сбое. Смотрите раздел о политике повторов для дополнительной информации. Используйте эквивалентное свойство сессии retry_policy только на кластерах, настроенных для отказоустойчивого выполнения, и обычно только для деактивации с помощью NONE, поскольку переключение между режимами на кластере не тестируется. |
NONE |
retry-policy.allowed |
Список политик повторов, разрешённых для настройки на данном кластере. Это свойство используется, чтобы предотвратить установку пользователем политики повторов, которая не предназначена для использования на конкретном кластере. | NONE, QUERY, TASK |
exchange.deduplication-buffer-size |
Размер данных в буфере, расположенном в памяти координатора, который используется отказоустойчивым выполнением для хранения выходных данных стадий запроса. Если этот буфер заполняется во время выполнения запроса, запрос завершится ошибкой “Exchange manager must be configured for the failure recovery capabilities to be fully functional”, если только не настроен exchange manager. | 32MB |
fault-tolerant-execution.exchange-encryption-enabled |
Включает шифрование данных, записываемых при спуллинге. Установка этого свойства в значение false не рекомендуется, если Trino обрабатывает конфиденциальные данные. |
true |
Retry policy
Свойство конфигурации retry-policy или свойство сессии retry_policy определяет, будет ли Trino повторно выполнять весь запрос или отдельные задачи запроса в случае сбоя.
QUERY
Политика повторов QUERY указывает Trino автоматически повторно выполнять запрос в случае ошибки на рабочем узле. Политика QUERY рекомендуется, когда большая часть нагрузки кластера Trino состоит из множества небольших запросов.
По умолчанию Trino не реализует отказоустойчивость для запросов, чей результирующий набор превышает 32MB, например, для операторов SELECT, которые возвращают очень большой набор данных пользователю. Этот предел можно увеличить, изменив свойство конфигурации exchange.deduplication-buffer-size на значение больше стандартных 32MB, но это приводит к увеличению использования памяти на координаторе.
Чтобы включить отказоустойчивое выполнение для запросов с более крупным набором результатов, настоятельно рекомендуется настроить exchange manager, который использует внешнее хранилище для данных спуллинга, что позволяет хранить выгруженные данные за пределами размера буфера в памяти.
TASK
Политика повторов TASK указывает Trino повторно выполнять отдельные задачи запроса в случае сбоя. Чтобы использовать политику TASK, необходимо настроить exchange manager. Эта политика рекомендуется при выполнении больших пакетных запросов, поскольку кластер может более эффективно повторно выполнять небольшие задачи внутри запроса, а не повторять весь запрос.
Когда кластер настроен на использование политики TASK, некоторые связанные свойства конфигурации получают значения по умолчанию, соответствующие лучшим практикам для отказоустойчивого кластера. Однако это автоматическое изменение не влияет на кластеры, где эти свойства были настроены вручную. Если какие-либо из следующих свойств настроены в файле config.properties на кластере с политикой TASK, настоятельно рекомендуется установить для свойства управления запросами task.low-memory-killer.policy значение total-reservation-on-blocked-nodes, иначе запросы могут потребовать ручного завершения, если кластер исчерпает память.
Note
Политика TASK лучше всего подходит для больших пакетных запросов, но она может приводить к увеличению задержки для коротких запросов, выполняемых в большом количестве. Как лучшая практика, рекомендуется запускать выделенный кластер с политикой TASK для больших пакетных запросов, отдельно от другого кластера, который обрабатывает короткие запросы.
Encryption
Trino шифрует данные перед спуллингом в хранилище. Это предотвращает доступ к данным запроса для кого-либо, кроме кластера Trino, который их записал, включая администраторов системы хранения. Новый ключ шифрования случайным образом генерируется для каждого запроса, и ключ удаляется после завершения запроса.
Advanced configuration
Вы можете дополнительно настроить отказоустойчивое выполнение с помощью следующих свойств конфигурации. Значения по умолчанию для этих свойств должны подходить для большинства развертываний, но вы можете изменить их в целях тестирования или устранения неполадок.
Retry limits
Следующие свойства конфигурации контролируют пороги, при которых запросы или задачи перестают повторяться в случае повторяющихся сбоев:
| Property name | Description | Default value | Retry policy |
|---|---|---|---|
query-retry-attempts |
Максимальное количество попыток, которые Trino может выполнить для повторного запуска запроса, прежде чем объявить запрос неудавшимся. | 4 | Only QUERY |
task-retry-attempts-per-task |
Максимальное количество попыток, которые Trino может выполнить для повторного запуска одной задачи, прежде чем объявить запрос неудавшимся. | 4 | Only TASK |
retry-initial-delay |
Минимальное время, которое должен ожидать сбойный запрос или задача перед повторной попыткой. Может быть переопределено с помощью свойства сессии retry_initial_delay. |
10s | QUERY and TASK |
retry-max-delay |
Максимальное время ожидания перед повторной попыткой для сбойного запроса или задачи. Время ожидания увеличивается после каждого последующего сбоя. Может быть переопределено с помощью свойства сессии retry_max_delay. |
1m | QUERY and TASK |
retry-delay-scale-factor |
Множитель, на который увеличивается задержка повторной попытки при каждом сбое запроса или задачи. Может быть переопределено свойством сессии retry_delay_scale_factor. |
2.0 | QUERY and TASK |
Размер задач (Task sizing)
При политике повторных попыток TASK важно управлять объемом данных, обрабатываемых в каждой задаче. Если задачи слишком маленькие, то координация задач может занимать больше времени и ресурсов, чем выполнение самой задачи. Если задачи слишком большие, одна задача может требовать больше ресурсов, чем доступно на одном узле, и тем самым не позволит запросу завершиться.
Trino поддерживает ограниченную автоматическую настройку размера задач. Если во время выполнения задач с отказоустойчивостью возникают проблемы, можно настроить следующие параметры конфигурации для ручного управления размером задач. Эти параметры конфигурации применяются только при политике повторов TASK.
| Property name | Description | Default value |
|---|---|---|
fault-tolerant-execution-standard-split-size |
Стандартный размер данных split, обрабатываемых задачами, которые читают данные из исходных таблиц. Значение интерпретируется с учётом веса split. Если каталог указывает, что создаваемые split являются «легче» или «тяжелее» стандартных, количество split, обрабатываемых задачей, корректируется соответственно. Можно переопределить в текущей сессии с помощью параметра fault_tolerant_execution_standard_split_size. |
64MB |
fault-tolerant-execution-max-task-split-count |
Максимальное количество split, обрабатываемых одной задачей. Значение не учитывает вес split и служит защитой на случай, если каталог сообщает неправильный вес split. Можно переопределить в текущей сессии с помощью параметра fault_tolerant_execution_max_task_split_count. |
2048 |
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period |
Количество задач, создаваемых для любого этапа чтения (non-writer) при произвольном распределении, прежде чем увеличивается размер задачи. | 64 |
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor |
Коэффициент роста для адаптивного увеличения размера задач-чтения при произвольном распределении. Нижний предел — 1.0. При каждом увеличении новый целевой размер задачи = старый размер × этот коэффициент. | 1.26 |
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min |
Начальный/минимальный целевой размер входных данных для задач-чтения при произвольном распределении. | 512MB |
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max |
Максимальный целевой размер входных данных для задач-чтения при произвольном распределении. | 50GB |
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period |
Количество задач, создаваемых для любого этапа записи (writer) при произвольном распределении, прежде чем увеличивается размер задачи. | 64 |
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor |
Коэффициент роста для адаптивного увеличения размера задач-записи при произвольном распределении. Нижний предел — 1.0. Новый размер = старый × коэффициент. | 1.26 |
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min |
Начальный/минимальный целевой размер входных данных для задач-записи при произвольном распределении. | 4GB |
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max |
Максимальный целевой размер входных данных для задач-записи при произвольном распределении. | 50GB |
fault-tolerant-execution-hash-distribution-compute-task-target-size |
Целевой размер входных данных для задач-чтения при хеш-распределении. | 512MB |
fault-tolerant-execution-hash-distribution-write-task-target-size |
Целевой размер входных данных для задач-записи при хеш-распределении. | 4GB |
fault-tolerant-execution-hash-distribution-write-task-target-max-count |
Мягкая верхняя граница количества задач-записи на этапе хеш-распределения. | 2000 |
Назначение узлов (Node allocation)
При политике повторов TASK узлы назначаются задачам на основе доступной памяти и предполагаемого использования памяти.
Если задача завершается неудачей из-за превышения доступной памяти на узле, она перезапускается с запросом выделить под неё весь узел.
Начальная оценка требований задачи по памяти является статической и задаётся параметром конфигурации fault-tolerant-execution-task-memory.
Этот параметр используется только при политике повторов TASK.
| Property name | Description | Default value |
|---|---|---|
| fault-tolerant-execution-task-memory | Начальная оценка объёма данных в памяти, требуемых задачей, используемая для распределения задач по узлам (bin-packing). Может быть переопределена в текущей сессии с помощью параметра fault_tolerant_execution_task_memory. |
5GB |
Прочая настройка
Следующий параметр может использоваться для управления отказоустойчивым выполнением:
| Property name | Description | Default value | Retry policy |
|---|---|---|---|
| fault-tolerant-execution-task-descriptor-storage-max-memory | Максимальный объём памяти для хранения дескрипторов задач на координаторе для отказоустойчивых запросов. Дополнительная память требуется, чтобы позволить переназначение задач при сбоях. | (Размер heap JVM * 0.15) | Only TASK |
| fault-tolerant-execution-max-partition-count | Максимальное количество партиций для распределённых join и агрегаций. Аналогично свойству query.max-hash-partition-count. Не рекомендуется увеличивать значение выше 50, поскольку это может привести к нестабильности и снижению производительности. Может быть переопределено с помощью параметра fault_tolerant_execution_max_partition_count. |
50 | Only TASK |
| fault-tolerant-execution-min-partition-count | Минимальное количество партиций для распределённых join и агрегаций. Аналогично query.min-hash-partition-count. Может быть переопределено параметром fault_tolerant_execution_min_partition_count. |
4 | Only TASK |
| fault-tolerant-execution-min-partition-count-for-write | Минимальное количество партиций для распределённых join и агрегаций в запросах записи. Аналог query.min-hash-partition-count-for-write. Может быть переопределено параметром fault_tolerant_execution_min_partition_count_for_write. |
50 | Only TASK |
| max-tasks-waiting-for-node-per-query | Максимальное количество задач, которые могут ожидать выделения узла для одного запроса, прежде чем планирование других задач этого запроса будет приостановлено. | 50 | Only TASK |
Менеджер обмена (Exchange manager)
Буферизация обмена (exchange spooling) отвечает за хранение и управление сброшенными (spooled) данными для отказоустойчивого выполнения.
Вы можете настроить менеджер обмена, основанный на файловой системе, который сохраняет сброшенные данные в указанном месте, таком как AWS S3 и совместимые с S3 системы, Azure Blob Storage, Google Cloud Storage, Alluxio или HDFS.
Конфигурация
Чтобы настроить менеджер обмена, создайте новый конфигурационный файл etc/exchange-manager.properties на координаторе и всех рабочих узлах.
В этом файле установите параметр конфигурации exchange-manager.name в значение filesystem или hdfs, а затем задайте дополнительные параметры конфигурации в зависимости от используемого решения для хранения данных.
Следующая таблица перечисляет доступные параметры конфигурации для файла exchange-manager.properties, их значения по умолчанию и файловые системы, для которых может быть настроен каждый параметр:
| Property name | Description | Default value | Supported filesystem |
|---|---|---|---|
| exchange.base-directories | Разделённый запятыми список URI-локаций, которые менеджер обмена использует для хранения данных спулинга. | — | Любая |
| exchange.sink-buffer-pool-min-size | Минимальный размер пула буферов для приёмника (sink) обмена. Чем больше размер пула, тем выше параллелизм записи и использование памяти. | 10 | Любая |
| exchange.sink-buffers-per-partition | Количество буферов на партицию в пуле буферов. Чем больше размер пула, тем выше параллелизм записи и использование памяти. | 2 | Любая |
| exchange.sink-max-file-size | Максимальный размер данных файлов, записываемых приёмниками обмена. | 1GB | Любая |
| exchange.source-concurrent-readers | Количество параллельных читателей для чтения из spooling-хранилища. Большее число увеличивает параллелизм чтения и использование памяти. | 4 | Любая |
| exchange.s3.aws-access-key | AWS access key. Требуется для подключения к AWS S3 и GCS, можно игнорировать для других S3-совместимых систем. | — | AWS S3, GCS |
| exchange.s3.aws-secret-key | AWS secret key. Требуется для подключения к AWS S3 и GCS, можно игнорировать для других S3-совместимых систем. | — | AWS S3, GCS |
| exchange.s3.iam-role | IAM-роль, которую необходимо принять. | — | AWS S3, GCS |
| exchange.s3.external-id | External ID для политики доверия IAM-роли. | — | AWS S3, GCS |
| exchange.s3.region | Регион S3-бакета. | — | AWS S3, GCS |
| exchange.s3.endpoint | Endpoint S3-хранилища при использовании S3-совместимой системы, не относящейся к AWS. Для AWS S3 можно игнорировать, если только политика бакета не требует HTTPS. При необходимости TLS установить https-endpoint, например: https://s3.us-east-1.amazonaws.com. Для GCS указать: https://storage.googleapis.com. | — | Любое S3-совместимое хранилище |
| exchange.s3.max-error-retries | Максимальное количество повторов запроса клиентом S3 менеджера обмена. | 10 | Любое S3-совместимое хранилище |
| exchange.s3.path-style-access | Включает использование path-style доступа для всех запросов к S3. | false | Любое S3-совместимое хранилище |
| exchange.s3.upload.part-size | Размер части (part) для multipart-загрузки S3. | 5MB | Любое S3-совместимое хранилище |
| exchange.gcs.json-key-file-path | Путь к JSON-файлу, содержащему ключ сервисного аккаунта Google Cloud Platform. Не указывать вместе с exchange.gcs.json-key. |
— | GCS |
| exchange.gcs.json-key | Ключ сервисного аккаунта GCP в формате JSON. Не указывать вместе с exchange.gcs.json-key-file-path. |
— | GCS |
| exchange.azure.endpoint | Endpoint Azure Blob, используемый для доступа к контейнеру спулинга. Не указывать вместе с exchange.azure.connection-string. |
— | Azure Blob Storage |
| exchange.azure.connection-string | Строка подключения, используемая для доступа к контейнеру спулинга. Не указывать вместе с exchange.azure.endpoint. |
— | Azure Blob Storage |
| exchange.azure.block-size | Размер блока данных для параллельной загрузки Azure block blob. | 4MB | Azure Blob Storage |
| exchange.azure.max-error-retries | Максимальное количество повторов запроса Azure-клиентом менеджера обмена. | 10 | Azure Blob Storage |
| exchange.alluxio.block-size | Размер блока данных для Alluxio-хранилища. | 4MB | Alluxio |
| exchange.alluxio.site-file-path | Путь к файлу конфигурации Alluxio (например, /etc/alluxio-site.properties). Файл должен существовать на всех узлах кластера Trino. Следуйте документации по конфигурации клиента Alluxio. |
— | Alluxio |
| exchange.hdfs.block-size | Размер блока данных для HDFS. | 4MB | HDFS |
| exchange.hdfs.skip-directory-scheme-validation | Пропустить проверку схемы директории для поддержки HDFS-совместимых файловых систем. | false | HDFS |
| hdfs.config.resources | Разделённый запятыми список путей к файлам конфигурации HDFS, например /etc/hdfs-site.xml. Файлы должны существовать на всех узлах кластера Trino. |
— | HDFS |
Чтобы снизить общую нагрузку на ввод-вывод у менеджера обмена, значение конфигурационного параметра exchange.compression-codec по умолчанию установлено в LZ4. Кроме того, сжатие и распаковка файлов выполняются автоматически, а некоторые детали могут быть сконфигурированы.
Также рекомендуется настроить правило жизненного цикла (lifecycle rule) для бакета, чтобы автоматически удалять заброшенные объекты в случае сбоя узла.
AWS S3
Следующий пример файла exchange-manager.properties задаёт бакет AWS S3 в качестве места хранения данных спулинга. Обратите внимание, что место назначения не обязательно должно находиться в AWS — это может быть любая S3-совместимая система хранения. Хотя менеджер обмена рассчитан на поддержку S3-совместимых систем, только AWS S3 и MinIO протестированы на совместимость. Для других систем необходимо проводить собственное тестирование и консультироваться с поставщиком.
|
1 2 3 4 5 |
exchange-manager.name=filesystem exchange.base-directories=s3://exchange-spooling-bucket exchange.s3.region=us-west-1 exchange.s3.aws-access-key=example-access-key exchange.s3.aws-secret-key=example-secret-key |
Можно настроить несколько S3-бакетов для распределения данных спулинга, уменьшая I/O-нагрузку на один бакет. Если запрос завершается ошибкой:
“software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your request rate”
это означает, что рабочая нагрузка является I/O-интенсивной, и вам следует указать несколько бакетов в exchange.base-directories, чтобы распределить нагрузку:
|
1 |
exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2 |
Azure Blob Storage
Следующий пример файла exchange-manager.properties задаёт контейнер Azure Blob Storage в качестве места хранения данных спулинга. Нужно использовать именно Azure Blob Storage, а не Azure Data Lake Storage или другую иерархическую систему хранения в Azure.
Google Cloud Storage
Чтобы включить спулинг обмена в GCS в Trino, измените endpoint запросов на URI https://storage.googleapis.com
и настройте AWS access/secret keys, используя HMAC-ключи GCS. Если вы развернули Trino в GCP, необходимо либо создать сервисный аккаунт с доступом к вашему бакету спулинга, либо указать путь к файлу учётных данных GCS.
Следующий пример exchange-manager.properties задаёт бакет GCS в качестве места хранения данных спулинга:
|
1 2 3 4 5 6 7 |
exchange-manager.name=filesystem exchange.base-directories=gs://exchange-spooling-bucket exchange.s3.region=us-west-1 exchange.s3.aws-access-key=example-access-key exchange.s3.aws-secret-key=example-secret-key exchange.s3.endpoint=https://storage.googleapis.com exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json |
Alluxio
Следующий пример exchange-manager.properties задаёт Alluxio в качестве места хранения данных спулинга:
|
1 2 3 |
exchange-manager.name=alluxio exchange.base-directories=alluxio://alluxio-master:19998/exchange-spooling-directory exchange.alluxio.site-file-path=/path/to/alluxio-site.properties |
HDFS
Следующий пример конфигурации exchange-manager.properties указывает HDFS как место для spooling-хранилища:
|
1 2 3 |
exchange-manager.name=hdfs exchange.base-directories=hadoop-master:9000/exchange-spooling-directory hdfs.config.resources=/usr/lib/hadoop/etc/hadoop/core-site.xml |
Когда вы хотите использовать совместимую с Hadoop файловую систему в качестве spooling-хранилища, вам нужно включить параметр exchange.hdfs.skip-directory-scheme-validation в exchange-manager.properties, если вы настраиваете exchange.base-directories с использованием специфичной схемы (не hdfs).
Также могут понадобиться следующие шаги:
- Настроить реализацию
AbstractFileSystemвcore-site.xml. - Добавить соответствующие клиентские JAR-файлы в каталог
${Trino_HOME}/plugin/exchange-hdfsна всех узлах кластера Trino. - Локальное файловое хранилище
Следующий пример конфигурации exchange-manager.properties указывает локальный каталог /tmp/trino-exchange-manager как spooling-хранилище:
Примечание:
Использование локальной файловой системы для exchange рекомендуется только в автономных (standalone), непроизводственных кластерах.
Локальный каталог можно использовать в распределённом кластере только если он является общим и доступен со всех узлов.
|
1 2 |
exchange-manager.name=filesystem exchange.base-directories=/tmp/trino-exchange-manager |
Адаптивные оптимизации плана
Режим fault-tolerant execution предоставляет несколько адаптивных оптимизаций плана, которые динамически изменяют план выполнения запроса на основе статистики во время выполнения.
Повышение устойчивости обработки запросов
Вы можете настроить Trino так, чтобы он был более устойчивым к сбоям во время обработки запросов, включив режим fault-tolerant execution. Это позволяет Trino выполнять более крупные запросы, такие как пакетные операции, без падения запроса при сбоях рабочих узлов.
Когда этот режим включён, кластер Trino буферизует данные, используемые рабочими узлами во время обработки запроса. Если обработка на рабочем узле по какой-либо причине завершается неудачей — например, из-за сетевого сбоя или нехватки ресурсов — координатор переназначает выполнение неудавшегося фрагмента работы другому рабочему узлу. Это позволяет продолжать выполнение запроса, используя буферизованные данные.
Архитектура
Узел-координатор использует настроенный сервис exchange manager, который буферизует данные во время обработки запроса во внешнем хранилище, например в S3-бакете. Рабочие узлы отправляют данные в буфер во время выполнения своих задач.
Рекомендации и особенности
Кластер с fault-tolerant execution лучше всего подходит для больших пакетных запросов. Пользователи могут столкнуться с задержками или похожими эффектами при большом количестве коротких запросов. Поэтому рекомендуется использовать отдельный кластер с включённым fault-tolerant режимом для пакетных операций, отдельно от кластера, работающего с высокой нагрузкой по количеству запросов.
Каталоги, использующие следующие коннекторы, поддерживают fault-tolerant выполнение операций чтения и записи:
- Delta Lake connector
- Hive connector
- Iceberg connector
- MySQL connector
- PostgreSQL connector
- SQL Server connector
Каталоги, использующие другие коннекторы, поддерживают fault-tolerant только для операций чтения. При включенном fault-tolerant режиме операции записи на таких каталогах будут завершаться ошибкой.
Exchange-менеджер может отправлять большие объёмы данных в хранилище exchange, что приводит к высокой нагрузке на I/O. Чтобы сбалансировать нагрузку, вы можете настроить несколько хранилищ для exchange.
Конфигурация
Ниже приведены шаги настройки кластера Trino с fault-tolerant execution, используя S3-совместимое хранилище для exchange.
Настройте S3-бакет, который будет использоваться как exchange-хранилище. В примере используется AWS S3, но в документации также описаны другие варианты. Можно использовать несколько бакетов.
Для каждого бакета в AWS получите следующую информацию:
- S3 URI бакета, например:
s3://exchange-spooling-bucket - Регион бакета, например:
us-west-1 - AWS access key и secret key для доступа
Для Kubernetes-развёртывания Trino добавьте в Helm chart следующую конфигурацию exchange-менеджера в секции server.exchangeManager и additionalExchangeManagerProperties, используя данные о S3:
|
1 2 3 4 5 6 7 8 9 |
server: exchangeManager: name=filesystem base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2 additionalExchangeManagerProperties: exchange.s3.region=us-west-1 exchange.s3.aws-access-key=example-access-key exchange.s3.aws-secret-key=example-secret-key |
В установках вне Kubernetes те же свойства должны быть указаны в файле exchange-manager.properties на координаторе и всех рабочих узлах.
Добавьте следующую конфигурацию для fault-tolerant execution в секцию additionalConfigProperties Helm chart:
|
1 2 |
additionalConfigProperties: retry-policy=TASK |
В установках вне Kubernetes это свойство должно быть добавлено в файл config.properties на координаторе и рабочих узлах.
Разверните кластер повторно или перезапустите его (для non-Kubernetes установок).
Теперь ваш кластер Trino настроен для fault-tolerant выполнения запросов. Если выполнение запроса обычно прерывалось бы из-за сбоя, fault-tolerant execution возобновит обработку, чтобы обеспечить успешное завершение запроса.
Task properties
task.concurrency
- Тип: integer (целое число)
- Ограничения: значение должно быть степенью двойки
- Значение по умолчанию: количество физических CPU на узле,
с минимальным значением 2 и максимальным 32. В режиме Fault-tolerant execution (устойчивого выполнения) по умолчанию — 8. - Сессионное свойство: task_concurrency
Определяет стандартный уровень локальной параллельности для операторов, таких как JOIN и AGGREGATION. Это значение следует увеличивать или уменьшать в зависимости от количества параллельно выполняющихся запросов и загрузки ресурсов worker-узлов.
Низкие значения подходят для кластеров, где одновременно выполняется много запросов,
так как кластер уже загружен, и увеличение параллелизма приведёт к замедлению из-за переключения контекста и прочих накладных расходов.
Высокие значения лучше для кластеров, где выполняется только один или несколько запросов одновременно.
task.http-response-threads
- Тип: integer (целое число)
- Минимальное значение: 1
- Значение по умолчанию: 100
Определяет максимальное количество потоков, которые могут быть созданы для обработки HTTP-ответов. Потоки создаются по мере необходимости и удаляются, когда простаивают, поэтому большое значение не создаёт избыточной нагрузки, если запросов немного.
Большее количество потоков может быть полезно в кластерах с большим числом параллельных запросов или в кластерах с сотнями или тысячами worker-узлов.
task.http-timeout-threads
- Тип: integer (целое число)
- Минимальное значение: 1
- Значение по умолчанию: 3
Количество потоков, используемых для обработки тайм-аутов при формировании HTTP-ответов.
Это значение следует увеличить, если все потоки часто находятся в использовании.
Нагрузку можно отслеживать через объект JMX:trino.server:name=AsyncHttpExecutionMBean:TimeoutExecutor
Если значение ActiveCount всегда равно PoolSize, значит, нужно увеличить число потоков.
task.info-update-interval
- Тип: duration (время, длительность)
- Минимальное значение: 1 мс
- Максимальное значение: 10 с
- Значение по умолчанию: 3 с
Определяет, как часто обновляется информация о задачах (task information), которая используется при планировании выполнения.
Большее значение уменьшает нагрузку на CPU координатора (coordinator), но может привести к менее оптимальному распределению сплитов (split scheduling).
task.max-drivers-per-task
- Тип: integer (целое число)
- Минимальное значение: 1
- Значение по умолчанию: 2 147 483 647 (максимум для int)
Определяет максимальное количество драйверов (drivers), которые могут выполняться в задаче (task) одновременно. Установка ограничения снижает вероятность того, что задача создаст слишком много драйверов, что может улучшить производительность при параллельном выполнении нескольких запросов.
Однако слишком малое значение может привести к неэффективному использованию ресурсов,
если одновременно выполняется слишком мало запросов.
task.max-partial-aggregation-memory
- Тип: data size (объём данных)
- Значение по умолчанию: 16 MB
Определяет максимальный размер промежуточных (partial) результатов агрегирования при распределённых агрегациях. Увеличение этого значения позволяет хранить больше групп локально перед их сбросом, что снижает сетевой трафик и нагрузку на CPU, но увеличивает использование памяти.
task.max-worker-threads
- Тип: integer (целое число)
- Значение по умолчанию: (количество CPU на узле × 2)
Устанавливает количество потоков, используемых воркерами (workers) для обработки сплитов (splits). Если загрузка CPU на воркере низкая, и все потоки используются, увеличение этого параметра может повысить пропускную способность. Однако это приводит к увеличению использования heap-памяти, а слишком высокое значение может снизить производительность из-за частого переключения контекста.
Количество активных потоков можно увидеть в свойстве RunningSplits объекта JMX:trino.execution.executor:name=TaskExecutor.RunningSplits
task.min-drivers
- Тип: integer (целое число)
- Значение по умолчанию: (
task.max-worker-threads× 2)
Определяет целевое количество выполняющихся leaf splits (листовых сплитов) на воркере. Это минимальное значение, так как каждая leaf-задача гарантированно выполняет как минимум 3 сплита. Нелистовые задачи также выполняются, чтобы избежать взаимоблокировок (deadlocks).
Меньшее значение может улучшить отзывчивость для новых задач, но может привести к недоиспользованию ресурсов. Большее значение повышает загрузку ресурсов, но требует больше памяти.
task.min-drivers-per-task
- Тип: integer (целое число)
- Минимальное значение: 1
- Значение по умолчанию: 3
Определяет минимальное количество драйверов (drivers), которые гарантированно выполняются одновременно для одной задачи (task), если у неё есть оставшиеся сплиты для обработки.
task.min-writer-count
- Тип: integer (целое число)
- Значение по умолчанию: 1
- Сессионное свойство:
task_min_writer_count
Определяет количество параллельных потоков записи (writer threads) на одного воркера для одного запроса, если не используется предпочтительное партиционирование (preferred partitioning) и масштабирование записывающих потоков (task writer scaling).
Увеличение этого значения может ускорить запись, особенно если запрос не ограничен I/O и может эффективно использовать дополнительный CPU для параллельной записи.
Однако некоторые коннекторы могут упираться в CPU при записи (например, из-за сжатия).
Слишком высокое значение может перегрузить кластер, особенно если движок вставляет данные в партиционированную таблицу без preferred partitioning — в этом случае каждый поток записи может писать во все партиции, что способно вызвать ошибку Out of memory, так как запись в партицию требует выделения памяти под буферизацию.
Exchange properties
Обмены (Exchanges) передают данные между узлами Trino на разных стадиях выполнения запроса.
Настройка этих параметров может помочь устранить проблемы коммуникации между узлами
или улучшить использование сетевых ресурсов.
Дополнительно можно настроить использование HTTP-клиентов обмена (exchange HTTP client).
exchange.client-threads
- Тип: integer (целое число)
- Минимальное значение: 1
- Значение по умолчанию: 25
Определяет количество потоков, используемых exchange-клиентами для получения данных с других узлов Trino.
Большее значение может повысить производительность на крупных кластерах или при высокой степени параллельности (high concurrency). Однако слишком высокое значение может, наоборот, снизить производительность из-за частых переключений контекста и увеличенного потребления памяти.
exchange.concurrent-request-multiplier
- Тип: integer (целое число)
- Минимальное значение: 1
- Значение по умолчанию: 3
Этот множитель определяет, сколько одновременных запросов может выполняться в зависимости от доступной памяти буфера (buffer memory).
Максимальное количество запросов вычисляется эвристически, на основе количества клиентов, которые могут поместиться в доступное место буфера, умноженного на этот множитель.
Пример:
Если exchange.max-buffer-size = 32 MB, уже занято 20 MB, а средний размер запроса — 2 MB, то максимальное количество клиентов = multiplier * ((32MB - 20MB) / 2MB) = multiplier * 6.
Настройка этого параметра изменяет поведение этой эвристики — увеличение значения может повысить параллельность и улучшить сетевую загрузку.
exchange.compression-codec
- Тип: string (строка)
- Допустимые значения: NONE, LZ4, ZSTD
- Значение по умолчанию: NONE
Определяет кодек сжатия, который используется при сжатии и распаковке файлов во время обмена данными между узлами или при взаимодействии с хранилищем обмена (exchange storage) в режиме Fault-tolerant execution.
exchange.data-integrity-verification
- Тип: string (строка)
- Допустимые значения: NONE, ABORT, RETRY
- Значение по умолчанию: ABORT
Определяет поведение системы при проблемах с целостностью данных. По умолчанию режим ABORT приводит к прерыванию запроса, если во время встроенной проверки обнаружены ошибки целостности данных.
NONE— отключает проверку целостности.ABORT— останавливает выполнение запроса при обнаружении ошибок.RETRY— при обнаружении ошибки повторяет попытку обмена данными.
exchange.max-buffer-size
- Тип: data size (объём данных)
- Значение по умолчанию: 32MB
Определяет размер буфера в клиенте обмена (exchange client), который хранит данные, полученные с других узлов, до их обработки.
Больший размер буфера может повысить пропускную способность сети в крупных кластерах и сократить время выполнения запроса, но при этом уменьшает объём памяти, доступный для других задач.
exchange.max-response-size
- Тип: data size (объём данных)
- Минимальное значение: 1MB
- Значение по умолчанию: 16MB
Устанавливает максимальный размер ответа, возвращаемого при обмене данными между узлами. Ответ помещается в буфер клиента обмена, который делится между всеми параллельными запросами.
Увеличение значения может улучшить сетевую пропускную способность при высокой сетевой задержке (latency).
Уменьшение значения может улучшить производительность на крупных кластерах, так как снижает дисбаланс нагрузки (skew) — буфер сможет одновременно обрабатывать больше задач, вместо того чтобы удерживать большие объемы данных от меньшего числа задач.
sink.max-buffer-size
- Тип: data size (объём данных)
- Значение по умолчанию: 32MB
Размер выходного буфера для данных задачи (task), которые ожидают, пока их заберут вышестоящие задачи (upstream tasks). Если вывод задачи разделён по хэшу (hash partitioned), этот буфер общий для всех потребителей этих разделов.
Увеличение значения может повысить сетевую пропускную способность при передаче данных между стадиями запроса — особенно если высокая сетевой задержка (latency) или в кластере много узлов.
sink.max-broadcast-buffer-size
- Тип: data size (объём данных)
- Значение по умолчанию: 200MB
Размер буфера широковещательной (broadcast) передачи для данных задачи, которые ждут, пока их заберут вышестоящие задачи. Этот буфер используется для хранения и передачи данных со стороны build в операциях реплицированных (broadcast) join’ов.
Если буфер слишком мал, это может ограничить масштабирование задач со стороны probe в join-операциях, когда в кластер добавляются новые узлы.
Resource management properties
query.max-cpu-time
- Тип: duration (длительность)
- Значение по умолчанию:
1_000_000_000d
Максимальное количество CPU-времени, которое запрос может использовать во всём кластере. Если запрос превышает этот лимит, он принудительно завершается.
query.max-memory-per-node
- Тип: heap size (объём памяти кучи)
- Значение по умолчанию: 30% от максимального размера кучи (heap) на узле
Максимальный объём пользовательской памяти, которую запрос может использовать на одном рабочем узле (worker).
Пользовательская память — это память, выделяемая во время выполнения запроса для объектов, которые напрямую связаны с самим запросом.
Примеры:
- хэш-таблицы, создаваемые во время выполнения;
- память, используемая при сортировке и агрегациях.
Если использование памяти для запроса на любом узле достигает этого лимита — запрос завершается.
Предупреждение:
Сумма значений query.max-memory-per-node и memory.heap-headroom-per-node должна быть меньше максимального размера кучи JVM на узле.
Примечание:
Не применяется для запросов с включёнными повторными попытками задач (retry-policy=TASK).
query.max-memory
- Тип: data size (объём данных)
- Значение по умолчанию: 20GB
Максимальный объём пользовательской памяти, которую запрос может использовать во всём кластере. Это память, выделяемая для операций, связанных с конкретным запросом, например, хэш-таблицы, сортировки, буферы агрегаций и т.д.
Если использование памяти запросом на всех узлах вместе превышает этот лимит — запрос завершается.
Предупреждение:query.max-total-memory должно быть больше, чем query.max-memory.
Примечание:
Не применяется для запросов с retry-policy=TASK.
query.max-total-memory
- Тип: data size (объём данных)
- Значение по умолчанию: (
query.max-memory * 2)
Максимальный объём всей памяти, которую запрос может использовать в кластере, включая revocable memory — то есть память, которую система может отобрать при нехватке ресурсов.
Если общий объём выделенной памяти превышает этот лимит — запрос завершается.
Предупреждение:query.max-total-memory должно быть больше, чем query.max-memory.
Примечание:
Не применяется для запросов с retry-policy=TASK.
memory.heap-headroom-per-node
- Тип: heap size (объём памяти кучи)
- Значение по умолчанию: 30% от максимального размера кучи JVM на узле
Это объём памяти, который резервируется в куче JVM в качестве запаса (headroom) для операций, не отслеживаемых самим Trino.
Предупреждение:
Сумма query.max-memory-per-node и memory.heap-headroom-per-node должна быть меньше максимального размера кучи JVM на узле.
exchange.deduplication-buffer-size
- Тип: data size (объём данных)
- Значение по умолчанию: 32MB
Размер буфера, используемого для временных данных (spooled data) во время Fault-tolerant execution — режима выполнения с устойчивостью к сбоям.
Query management properties
query.client.timeout
- Тип: duration
- Значение по умолчанию: 5m
Определяет, как долго кластер может работать без связи с клиентским приложением (например, CLI), прежде чем сочтёт запрос «заброшенным» и отменит его выполнение.
query.execution-policy
- Тип: string
- Значение по умолчанию: phased
- Сессионное свойство:
execution_policy
Определяет алгоритм организации выполнения всех стадий (stages) запроса.
Доступны следующие политики выполнения:
phased— выполняет стадии последовательно, чтобы избежать блокировок из-за зависимостей между стадиями. Этот режим обеспечивает максимальное использование ресурсов кластера и минимальное общее время выполнения запроса.all-at-once— запускает все стадии запроса одновременно. В начале это даёт высокую загрузку ресурсов, но из-за зависимостей между стадиями часть работы простаивает, что увеличивает время ожидания и общее время выполнения запроса.
query.determine-partition-count-for-write-enabled
- Тип: boolean
- Значение по умолчанию: false
- Сессионное свойство:
determine_partition_count_for_write_enabled
Включает возможность определять количество партиций для операций записи на основе объёма данных, прочитанных и обработанных запросом.
query.max-hash-partition-count
- Тип: integer
- Значение по умолчанию: 100
- Сессионное свойство:
max_hash_partition_count
Задает максимальное количество партиций, используемых при выполнении распределённых операций — таких как JOIN, AGGREGATION, partitioned window functions и других.
query.min-hash-partition-count
- Тип: integer
- Значение по умолчанию: 4
- Сессионное свойство: min_hash_partition_count
Минимальное количество партиций, используемых при выполнении распределённых операций, таких как JOIN, AGGREGATION, partitioned window functions и других.
query.min-hash-partition-count-for-write
- Тип: integer
- Значение по умолчанию: 50
- Сессионное свойство:
min_hash_partition_count_for_write
Минимальное количество партиций, используемых при выполнении распределённых операций в записывающих запросах (INSERT, CREATE TABLE AS SELECT, EXECUTE и т. д.), например JOIN, AGGREGATION или оконных функций с партиционированием.
query.max-writer-task-count
- Тип: integer
- Значение по умолчанию: 100
- Сессионное свойство:
max_writer_task_count
Максимальное количество задач (tasks), участвующих в записи данных во время выполнения запросов INSERT, CREATE TABLE AS SELECT и EXECUTE.
Это ограничение применяется только, если включена одна из опций:
redistribute-writes,илиscale-writers.
query.low-memory-killer.policy
- Тип: string
- Значение по умолчанию:
total-reservation-on-blocked-nodes
Определяет поведение кластера при нехватке памяти, когда необходимо принудительно завершить один или несколько запросов.
Поддерживаемые значения:
none— не завершать запросы при нехватке памяти.total-reservation— завершить запрос, который использует больше всего памяти в данный момент.total-reservation-on-blocked-nodes— завершить запрос, который использует больше всего памяти именно на узлах, где закончилась память.
💡 Примечание:
Данный параметр применяется только для запросов, у которых отключены повторные попытки на уровне задач (retry-policy установлено в NONE или QUERY).
task.low-memory-killer.policy
- Тип: string
- Значение по умолчанию: total-reservation-on-blocked-nodes
Определяет поведение системы при нехватке памяти — каким образом Trino будет завершать выполняющиеся задачи (tasks).
Поддерживаемые значения:
none— не завершать задачи при нехватке памяти.total-reservation-on-blocked-nodes— завершать задачи, которые принадлежат запросам с включёнными повторными попытками на уровне задач (task retries) и которые используют больше всего памяти на узлах, где память закончилась.least-waste— завершать задачи, принадлежащие запросам с включёнными task retries, которые потребляют значительное количество памяти на узлах с нехваткой памяти, но избегать завершения задач, которые выполняются уже долго, чтобы не тратить зря проделанную работу.
Примечание:
Применяется только для запросов с включёнными повторными попытками на уровне задач (retry-policy=TASK).
query.max-execution-time
- Тип: duration
- Значение по умолчанию: 100d
- Сессионное свойство:
query_max_execution_time
Максимально допустимое время активного выполнения запроса в кластере (без учёта анализа, планирования или ожидания в очереди). После превышения этого времени запрос будет автоматически завершён.
query.max-length
- Тип: integer
- Значение по умолчанию: 1,000,000
- Максимальное значение: 1,000,000,000
Максимальное количество символов в тексте SQL-запроса. Если запрос превышает этот предел, он не будет обработан, и Trino вернёт ошибку QUERY_TEXT_TOO_LARGE.
query.max-planning-time
- Тип: duration
- Значение по умолчанию: 10m
- Сессионное свойство:
query_max_planning_time
Максимально допустимое время, которое может быть потрачено на планирование выполнения запроса. После истечения этого времени координатор попытается остановить запрос. Однако некоторые операции на этапе планирования могут быть неотменяемыми, поэтому завершение не всегда происходит мгновенно.
query.max-run-time
- Тип: длительность (duration)
- Значение по умолчанию: 100d
- Параметр сессии:
query_max_run_time
Максимальное время, в течение которого запрос может выполняться в кластере, прежде чем будет принудительно завершён. Это время включает этапы анализа и планирования, а также ожидание в очереди — по сути, это максимальное «время жизни» запроса с момента его создания.
query.max-scan-physical-bytes
- Тип: размер данных (data size)
- Параметр сессии:
query_max_scan_physical_bytes
Максимальное количество байт, которые могут быть просканированы во время выполнения запроса. Когда этот лимит достигается, выполнение запроса останавливается, чтобы предотвратить чрезмерное использование ресурсов.
query.max-write-physical-size
- Тип: размер данных (data size)
- Параметр сессии:
query_max_write_physical_size
Максимальный физический объём данных, который может быть записан во время выполнения запроса. При достижении лимита запрос завершается, чтобы избежать избыточной нагрузки на систему.
query.max-stage-count
- Тип: целое число (integer)
- Значение по умолчанию: 150
- Минимальное значение: 1
Максимальное количество стадий (stages), которые может сгенерировать один запрос. Если запрос создаёт больше стадий, чем разрешено, он завершается с ошибкой QUERY_HAS_TOO_MANY_STAGES.
Предупреждение:
Если установить слишком большое значение, это может вызвать нестабильность кластера — запросы с большим числом стадий могут привести к ошибкам других запросов с сообщением:
REMOTE_TASK_ERROR и Max requests queued per destination exceeded for HttpDestination …
query.max-history
- Тип: целое число (integer)
- Значение по умолчанию: 100
Максимальное количество запросов, сохраняемых в истории запросов для статистики и отображения в веб-интерфейсе. Когда лимит достигнут, старые запросы удаляются.
Чтобы хранить информацию о запросах дольше и в большем объёме, необходимо использовать прослушиватель событий (event listener), который записывает события во внешнюю систему.
query.min-expire-age
- Тип: длительность (duration)
- Значение по умолчанию: 15m
Минимальный возраст запроса в истории перед его удалением. Просроченные запросы удаляются из буфера истории и больше не отображаются в веб-интерфейсе.</p>
<p>Чтобы сохранять события запросов и подробную информацию о них во внешней системе, также используется
event listener.
query.remote-task.enable-adaptive-request-size
- Тип: логический (boolean)
- Значение по умолчанию: true
- Параметр сессии:
remote_task_adaptive_update_request_size_enabled
Включает динамическое разделение запросов, отправляемых задачами на сервер. Это помогает избежать ошибок «out of memory» при работе с большими схемами. Настройки по умолчанию оптимизированы для типичных сценариев и должны изменяться только опытными пользователями, работающими с очень крупными таблицами.
query.remote-task.guaranteed-splits-per-task
- Тип: целое число (integer)
- Значение по умолчанию: 3
- Параметр сессии:
remote_task_guaranteed_splits_per_request
Минимальное количество splits (подзадач), которые должны быть назначены каждой удалённой задаче, чтобы гарантировать минимальный объём работы для выполнения. Требует, чтобы параметр query.remote-task.enable-adaptive-request-size был включён.
query.remote-task.max-error-duration
- Тип: длительность (duration)
- Значение по умолчанию: 1m
Максимальное время ожидания связи между координатором и удалённой задачей. Если координатор не получает обновления от задачи в течение этого времени, задача считается неудавшейся (failed).
query.remote-task.max-request-size
- Тип: размер данных (data size)
- Значение по умолчанию: 8MB
- Параметр сессии:
remote_task_max_request_size
Максимальный размер одного запроса, который может быть выполнен удалённой задачей. Требует включённого параметра query.remote-task.enable-adaptive-request-size.
query.remote-task.request-size-headroom
- Тип: размер данных (data size)
- Значение по умолчанию: 2MB
- Параметр сессии:
remote_task_request_size_headroom
Определяет запас (headroom) по памяти, который выделяется сверх размера данных запроса. Также требует включённого параметра query.remote-task.enable-adaptive-request-size.
query.info-url-template
- Тип: строка (string)
- Значение по умолчанию: URL страницы информации о запросе на координаторе
Позволяет перенаправлять клиентов на альтернативное местоположение для получения информации о запросах. URL должен содержать плейсхолдер идентификатора запроса — ${QUERY_ID}.
Пример:
|
1 |
https://example.com/query/${QUERY_ID} |
Во время выполнения ${QUERY_ID} заменяется фактическим идентификатором запроса.
retry-policy
- Тип: строка (string)
- Значение по умолчанию: NONE
Определяет политику повторного выполнения (retry policy) для Fault-tolerant execution — отказоустойчивого выполнения.
Поддерживаемые значения:
NONE— отключает отказоустойчивое выполнение.TASK— повторно выполняет отдельные задачи (tasks) внутри запроса при сбое. Требует настройки exchange manager.QUERY— повторно выполняет весь запрос целиком при сбое.
Optimizer properties
optimizer.dictionary-aggregation
- Тип: boolean
- Значение по умолчанию: false
- Сессионное свойство: dictionary_aggregation
Включает оптимизацию для агрегаций, выполняемых над словарями (dictionaries).
optimizer.optimize-metadata-queries
- Тип: boolean
- Значение по умолчанию: false
- Сессионное свойство: optimize_metadata_queries
Включает оптимизацию некоторых агрегатных запросов с использованием значений, хранящихся в метаданных.
Это позволяет Trino выполнять простые запросы за постоянное время.
На данный момент оптимизация применяется к функциям max, min и approx_distinct по ключам партиций и другим агрегатам, не зависящим от количества строк (включая DISTINCT-агрегации).
Использование этого параметра может существенно ускорить выполнение некоторых запросов.
⚠️ Важно: возможны некорректные результаты, если коннектор возвращает ключи партиций, для которых нет строк.
Например, коннектор Hive может вернуть пустые партиции, созданные другими системами (Trino такие не создает).
optimizer.distinct-aggregations-strategy
- Тип: string
- Допустимые значения:
AUTOMATIC, MARK_DISTINCT, SINGLE_STEP, PRE_AGGREGATE, SPLIT_TO_SUBQUERIES - Значение по умолчанию: AUTOMATIC
- Сессионное свойство: distinct_aggregations_strategy
Определяет стратегию обработки нескольких DISTINCT-агрегаций.
Режимы работы:
SINGLE_STEP— вычисляет DISTINCT-агрегации за один шаг без предварительной агрегации.
Может работать медленно, если количество уникальных ключей группировки невелико.MARK_DISTINCT— использует механизм MarkDistinct для нескольких DISTINCT-агрегаций или их комбинаций с обычными агрегатами.PRE_AGGREGATE— выполняет DISTINCT-агрегации с комбинацией этапов агрегации и предагрегации.SPLIT_TO_SUBQUERIES— разбивает входные данные агрегации на независимые подзапросы, где каждый подзапрос вычисляет одну DISTINCT-агрегацию, повышая параллелизм.AUTOMATIC— Trino автоматически выбирает подходящую стратегию.
По умолчанию предпочтительна стратегия SINGLE_STEP, но если уровень параллелизма ограничен (из-за малого числа ключей группировки), Trino выбирает другую стратегию на основе статистики данных.
optimizer.push-aggregation-through-outer-join
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: push_aggregation_through_outer_join
Когда агрегирующая функция находится над внешним соединением (outer join), и все столбцы из внешней стороны соединения присутствуют в группировке, Trino может переместить агрегацию ниже соединения.
Эта оптимизация особенно полезна для коррелированных скалярных подзапросов, которые Trino переписывает как агрегации над внешним соединением.
Пример:
|
1 2 3 4 5 6 |
SELECT * FROM item i WHERE i.i_current_price > ( SELECT AVG(j.i_current_price) FROM item j WHERE i.i_category = j.i_category ); |
Включение этой оптимизации может существенно ускорить запросы,
так как уменьшает объем данных, обрабатываемых оператором JOIN.
Однако для запросов с очень избирательными соединениями она может, наоборот, немного замедлить выполнение.
optimizer.push-table-write-through-union
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: push_table_write_through_union
Позволяет параллелизировать запись данных при использовании UNION ALL в запросах, которые создают или заполняют таблицы.
Эта оптимизация ускоряет запись результирующих таблиц,
поскольку устраняет необходимость дополнительной синхронизации при объединении результатов.
Включение параметра полезно, когда скорость записи ещё не достигла предела производительности,
но на перегруженных системах может, наоборот, замедлить выполнение запросов.
optimizer.push-filter-into-values-max-row-count
- Тип: integer
- Значение по умолчанию: 100
- Минимальное значение: 0
- Сессионное свойство: push_filter_into_values_max_row_count
Определяет порог количества строк в операторе VALUES,
ниже которого планировщик может выполнить фильтрацию напрямую над VALUES,
чтобы оптимизировать план запроса.
optimizer.join-reordering-strategy
- Тип: string
- Допустимые значения:
AUTOMATIC,ELIMINATE_CROSS_JOINS,NONE - Значение по умолчанию: AUTOMATIC
- Сессионное свойство:
join_reordering_strategy
Определяет стратегию перестановки (reordering) соединений (JOIN).
Режимы работы:
NONE— сохраняет порядок таблиц, указанный в запросе.ELIMINATE_CROSS_JOINS— переставляет соединения, чтобы избежать кросс-соединений (CROSS JOIN), где это возможно, при этом максимально сохраняет исходный порядок таблиц.AUTOMATIC— перебирает все возможные варианты порядка соединений и использует стоимостную модель (cost-based estimation), чтобы выбрать порядок с наименьшей вычислительной стоимостью.
Если статистика недоступна или стоимость не может быть вычислена, Trino автоматически переключается на стратегию ELIMINATE_CROSS_JOINS.
optimizer.max-reordered-joins
- Тип: integer
- Значение по умолчанию: 8
- Сессионное свойство: max_reordered_joins
Определяет максимальное количество соединений, которые оптимизатор может переставить одновременно,
если используется стоимостная стратегия (cost-based).
⚠️ Предупреждение:
Количество возможных комбинаций соединений растёт факториально по мере увеличения числа таблиц.
Поэтому повышение этого значения может серьёзно замедлить выполнение запросов и планирование.
optimizer.optimize-duplicate-insensitive-joins
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: optimize_duplicate_insensitive_joins
Уменьшает количество строк, возвращаемых при выполнении JOIN,
если оптимизатор обнаруживает, что дублирующиеся строки в результате можно безопасно пропустить.
Это может повысить производительность без изменения итоговых данных.
optimizer.use-exact-partitioning
- Тип: boolean
- Значение по умолчанию: false
- Сессионное свойство: use_exact_partitioning
Указывает, что данные должны быть перераспределены (repartitioned),
если схема партиционирования предыдущего этапа не в точности совпадает с той,
которую ожидает следующий этап выполнения запроса.
Полезно для обеспечения корректного распределения данных,
но может увеличить затраты на шифлинг (data shuffling) и повлиять на производительность.
optimizer.use-table-scan-node-partitioning
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: use_table_scan_node_partitioning
Использует разделение таблицы на узлы (node partitioning), предоставленное коннектором, при чтении таблиц.
Например, в Hive это соответствует buckets (корзинам) таблицы.
Если включено (true) и выполняется минимальное соотношение партиций к задачам,
каждая партиция таблицы читается отдельным воркером.
Минимальное соотношение задаётся параметром
optimizer.table-scan-node-partitioning-min-bucket-to-task-ratio.
Назначение партиций распределяется между воркерами для параллельной обработки.
Использование node partitioning при чтении таблицы может ускорить выполнение запросов,
так как уменьшает сложность выполнения (например, может не потребоваться перестановка данных по всему кластеру при агрегациях).
Однако, если количество партиций меньше числа воркеров,
уровень параллелизма может снизиться.
optimizer.table-scan-node-partitioning-min-bucket-to-task-ratio
- Тип: double
- Значение по умолчанию: 0.5
- Сессионное свойство: table_scan_node_partitioning_min_bucket_to_task_ratio
Определяет минимальное соотношение «количество бакетов / количество задач»,
необходимое для использования node partitioning при чтении таблицы.
Если число бакетов в таблице мало по сравнению с числом воркеров,
Trino распределяет чтение таблицы по всем воркерам для повышения параллелизма.
optimizer.colocated-joins-enabled
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: colocated_join
Использует колоцированные соединения (co-located joins),
если обе стороны соединения (JOIN) имеют одинаковое партиционирование таблицы по ключам соединения,
и выполняются условия, указанные в optimizer.use-table-scan-node-partitioning.
Например, при соединении таблиц Hive с одинаковыми схемами бакетизации,
Trino может выполнить соединение без обмена данными между воркерами,
что значительно повышает производительность запросов.
optimizer.filter-conjunction-independence-factor
- Тип: double
- Значение по умолчанию: 0.75
- Минимальное значение: 0
- Максимальное значение: 1
- Сессионное свойство: filter_conjunction_independence_factor
Определяет степень предположения независимости при оценке селективности (избирательности) сочетания нескольких предикатов (условий) в фильтре WHERE.
🔹 Чем ниже значение, тем более консервативной будет оценка — оптимизатор будет считать, что между столбцами предикатов существует большая корреляция.
🔹 Значение 0 означает, что оптимизатор предполагает полную зависимость столбцов, и селективность всего фильтра определяется наиболее селективным условием.
optimizer.join-multi-clause-independence-factor
- Тип: double
- Значение по умолчанию: 0.25
- Минимальное значение: 0
- Максимальное значение: 1
- Сессионное свойство: join_multi_clause_independence_factor
Задаёт степень предположения независимости условий при оценке результата соединений (JOIN) с несколькими условиями.
🔹 Более низкие значения дают консервативную оценку, предполагая сильную корреляцию между столбцами в условиях соединения.
🔹 При значении 0 оптимизатор считает, что столбцы в условиях соединения полностью коррелированы,
и селективность соединения определяется наиболее селективным условием.
optimizer.non-estimatable-predicate-approximation.enabled
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: non_estimatable_predicate_approximation_enabled
Включает аппроксимацию (приблизительную оценку) количества строк,
возвращаемых фильтрами, чья стоимость не может быть точно оценена,
даже при наличии полной статистики.
Это позволяет оптимизатору строить более эффективные планы выполнения,
даже при наличии фильтров, для которых ранее не удавалось выполнить оценку.
optimizer.join-partitioned-build-min-row-count
- Тип: integer
- Значение по умолчанию: 1000000
- Минимальное значение: 0
- Сессионное свойство: join_partitioned_build_min_row_count
Минимальное количество строк на «build»-стороне соединения (JOIN), при котором используется поисковое соединение с разбиением на партиции (partitioned join lookup).
Если количество строк на build-стороне оценивается как меньше указанного порога, используется однопоточный поиск для повышения производительности соединения.
🔹 Значение 0 отключает эту оптимизацию.
optimizer.min-input-size-per-task
- Тип: размер данных (data size)
- Значение по умолчанию: 5GB
- Минимальное значение: 0MB
- Сессионное свойство: min_input_size_per_task
Минимальный объём входных данных, необходимый на одну задачу (task).
Эта настройка помогает оптимизатору определить количество хэш-разбиений (hash partitions) для операций JOIN и AGGREGATION.
🔹 Ограничение количества разбиений для небольших запросов повышает конкурентность в больших кластерах, где одновременно выполняются множество малых запросов.
🔹 Оценённое значение всегда находится в пределах между параметрами min_hash_partition_count и max_hash_partition_count.
🔹 Значение 0MB отключает оптимизацию.
optimizer.min-input-rows-per-task
- Тип: integer
- Значение по умолчанию: 10000000
- Минимальное значение: 0
- Сессионное свойство: min_input_rows_per_task
Минимальное количество входных строк, необходимое на одну задачу (task).
Подобно предыдущему параметру, используется оптимизатором для определения количества хэш-разбиений при соединениях и агрегациях.
🔹 Снижение количества разбиений для малых запросов повышает общую эффективность и параллелизм при высокой загрузке кластера.
🔹 Значение 0 отключает оптимизацию.
optimizer.use-cost-based-partitioning
- Тип: boolean
- Значение по умолчанию: true
- Сессионное свойство: use_cost_based_partitioning
При включении оптимизатор использует оценку стоимости выполнения (cost-based optimization),
чтобы определить, требуется ли повторное разбиение (repartitioning) данных после уже выполненного этапа разбиения.
Это позволяет избежать ненужных перераспределений данных и повысить общую производительность выполнения запроса.


















Leave a Reply