Оптимизация запросов в Trino. Обзор функциональности и настроек

Данная статья — это перевод некоторых разделов документации по Trino 478.

Contents

Материалы по теме Trino

Trino Glossary

Для начала рассмотрим ряд терминов по Trino.

  • Predicate Pushdown (проталкивание фильтров): Trino пытается “протолкнуть” фильтры (WHERE, JOIN условия) из SQL-запроса как можно ближе к источнику данных (Iceberg, Parquet, ORC и т.д.), чтобы уменьшить объём прочитанных данных. Работает на уровне manifest-файлов (min/max statistics), data files (file-level stats), иногда даже на уровне Parquet row groups (если включен deep filtering).
  • Partition Pruning (отсечение партиций): Если таблица Iceberg партиционирована, Trino анализирует выражения фильтра (WHERE, JOIN ON) и отсекает целые партиции, которые точно не могут удовлетворить условию.
  • Dynamic filtering (Динамические фильтры) — это оптимизация, при которой фильтры, полученные во время выполнения из одной части запроса (например, из результата join), автоматически применяются к другой части запроса, чтобы уменьшить объём данных, считываемых из источника. То есть используются значения для фильтрации, которые получаются динамически в процессе выполнения, а не на этапе планирования.
  • Optimizer — это компонент, который берёт разобранный SQL (из Parser), строит логический план выполнения (операции: scan, filter, join, aggregate и т.д.), и далее преобразует его в физический план, который реально будет выполняться на worker-узлах. Задача optimizer — это минимизировать время и стоимость запроса, выбирая лучший способ выполнения (порядок join’ов, pushdown, partition pruning и т.д.).
  • Cost-based Optimization (CBO) — это механизм планировщика, который на основе статистики таблиц (например, объёма данных, распределения значений и т.д.) оценивает «стоимость» различных вариантов выполнения запроса (например, порядок join’ов, стратегия распределения данных) и выбирает тот план, который, по оценке, будет наиболее эффективным (то есть дешевым).
  • Распределенное соединение (Join distribution) — стратегия выполнения операции соединения в распределенной среде:
    • PARTITIONED JOIN (Партиционированное соединение)«каждый делает свою часть работы»: обе таблицы разбиваются (partitioned) по ключу соединения (например, customer_id), и каждый worker Trino получает только свою часть данных, чтобы соединить локально.
    • BROADCAST JOIN (Широковещательное соединение)«раздать маленькое всем»: если одна таблица (например, B) маленькая, Trino просто рассылает (broadcast) её копию на все узлы. Каждый worker соединяет свою часть большой таблицы A с полной копией B.
  • [Legacy] Spill в Trino позволяет выгружать промежуточные результаты операций, интенсивно использующих память (таких как агрегации, соединения и сортировки), из оперативной памяти на локальный диск для предотвращения сбоев запросов из-за нехватки памяти. Система отслеживает использование памяти и, при превышении заданного порога, перемещает данные на диск, продолжая обработку с меньшим потреблением памяти.
  • Trino Rule — это правило оптимизации, то есть отдельное “мини-правило”, которое говорит оптимизатору: «Если ты видишь вот такую конструкцию в плане запроса — перепиши её вот так, потому что это будет быстрее / проще / эффективнее.»
  • Trino exchange — это механизм, отвечающий за передачу промежуточных данных между различными рабочими узлами (worker’ами) во время выполнения распределенного запроса. Этот процесс необходим, чтобы операторы (например, JOIN, AGGREGATION) могли корректно и эффективно работать с данными, распределенными по разным серверам кластера.
  • Fragment — это самостоятельный кусок плана запроса, который выполняется на одном или нескольких worker-узлах параллельно. Каждый fragment — это логически независимая часть вычислений (например, чтение данных, join, агрегация), между которыми Trino передаёт результаты по сети. Проще говоря, Trino делит запрос на фрагменты, чтобы выполнять их распределённо и эффективно: один fragment готовит данные, другой их обрабатывает, а coordinator собирает итог.
  • Build Side (Сторона построения) — Данные с этой стороны используются для создания структуры данных в памяти, которая обеспечивает быстрый поиск — обычно хеш-таблицы (hash table).
  • Probe Side (Сторона проверки/сканирования) — Данные с этой стороны используются для проверки соответствий в уже построенной хеш-таблице. Система потоково (строка за строкой) сканирует «probe side» таблицу. Для каждой строки вычисляется хеш-ключ, и выполняется поиск в хеш-таблице, созданной на «build side», для нахождения соответствующих строк.
  • Split — это минимальная единица данных для обработки, которую worker-узел читает и выполняет локально. Каждый split обычно соответствует части файла или нескольких файлов (например, блок Parquet или ORC) и включает информацию о том, где находятся эти данные и какие колонки нужно читать. Trino делит таблицу на множество split-ов, чтобы распараллелить выполнение запроса: каждый worker обрабатывает один или несколько split-ов одновременно, что позволяет эффективно использовать кластер и ускорять чтение больших объёмов данных.
  • Федеративные запросы в Trino — возможность выполнять один SQL-запрос, который объединяет данные из нескольких разных источников данных (например, PostgreSQL, S3, Kafka и т.д.) без необходимости перемещать их в одно хранилище или выполнять сложный ETL-процесс. Trino подключается к источникам, получает необходимые данные, а затем обрабатывает их параллельно и возвращает результат.
  • Координатор (Coordinator) — сервер Trino, который управляет входящими SQL-запросами, планирует выполнение запросов и распределяет задачи между рабочими узлами (workers).
  • Рабочий узел (Worker) — сервер Trino, который выполняет задачи, назначенные координатором, и обрабатывает данные.
  • Плагин (Plugin) — набор кода, реализующий SPI (Service Provider Interface) Trino для добавления новых коннекторов, типов данных, функций SQL и других возможностей.
  • Fault tolerance executor — это механизм, который обеспечивает отказоустойчивое выполнение запросов, позволяя кластеру автоматически повторять выполнение запросов или отдельных задач при сбоях. При включенном режиме fault-tolerant execution промежуточные данные сохраняются в указанном пространстве (hdfs, s3) и могут быть использованы другими воркерами для повторного выполнения, что снижает вероятность провала запроса из-за сбоев и не требует ручного перезапуска.

Splits в Trino

Split — часть данных таблицы источника, которая может быть отсканирована независимо.

За выбор размера сплита отвечают параметры hive.max-initial-split-size и hive.max-split-size.

Основные термины и концепции Trino

Чтобы понять работу Trino, сначала нужно разобраться с основными терминами и концепциями, которые используются в его документации.

Хотя базовые запросы и операторы SQL понять просто, чтобы эффективно использовать Trino, важно знать, как работают такие понятия, как stages (стадии) и splits (фрагменты).

Если вы администратор или разработчик Trino, вам также нужно понимать, как стадии преобразуются в tasks (задачи) и как задачи состоят из drivers (драйверов), обрабатывающих данные.

Этот раздел даёт чёткие определения ключевых концепций, встречающихся в документации Trino, начиная от самых общих до самых конкретных.

Примечание:
За дополнительной информацией можно обратиться к книге Trino: The Definitive Guide и исследовательской работе Presto_SQL_on_Everything.pdf, в которых подробно описываются архитектура и концепции Trino.

Архитектура (Architecture)

Trino — это распределённый движок выполнения запросов, который обрабатывает данные параллельно на множестве серверов.

Существует два типа серверов Trino:

  • координаторы (coordinators)
  • воркеры (workers)

Кластер (Cluster)

Кластер Trino состоит из нескольких узлов (nodes) — одного координатора и одного или нескольких воркеров.

Пользователи подключаются к координатору через свой SQL-клиент. Координатор взаимодействует с воркерами, а оба типа узлов получают доступ к источникам данных, настроенным в каталогах (catalogs).

Обработка каждого запроса — это состояние процесса (stateful operation). Координатор распределяет нагрузку и выполняет её параллельно на всех воркерах.

Каждый узел запускает Trino в одном экземпляре JVM, а обработка внутри узла дополнительно распараллеливается через потоки (threads).

Узел (Node)

Любой сервер Trino в кластере считается узлом.

Технически, узел — это Java-процесс, запущенный с программой Trino, но на практике под узлом часто подразумевают физическую машину, поскольку рекомендуется запускать только один процесс Trino на сервер.

Координатор (Coordinator)

Координатор Trino — это сервер, отвечающий за:

  • разбор SQL-запросов (parsing),
  • построение плана выполнения (planning),
  • управление воркерами.

Он является «мозгом» системы Trino и тем узлом, к которому подключается клиент для выполнения запросов.

Каждая установка Trino должна содержать один координатор и как минимум один воркер.
Для целей разработки или тестирования можно настроить один сервер, выполняющий обе роли одновременно.

Координатор:

  • отслеживает активность воркеров,
  • координирует выполнение запроса,
  • создаёт логическую модель запроса, состоящую из стадий (stages),
  • преобразует их в набор связанных задач (tasks), которые выполняются на кластере воркеров.

Координатор взаимодействует с воркерами и клиентами через REST API.

Воркер (Worker)

Воркер Trino — это сервер, который выполняет задачи (tasks) и обрабатывает данные.

Он:

  • извлекает данные из коннекторов,
  • обменивается промежуточными результатами с другими воркерами,
  • передаёт результаты координатору, который собирает их и возвращает клиенту.

Когда воркер запускается, он регистрируется на сервере обнаружения (discovery server), встроенном в координатор.

После этого координатор может использовать воркер для выполнения задач.

Воркеры, как и координатор, обмениваются данными через REST API.

Клиент (Client)

Клиенты позволяют подключаться к Trino, отправлять SQL-запросы и получать результаты.
Клиенты могут обращаться ко всем настроенным источникам данных через каталоги (catalogs).

Клиенты представляют собой полнофункциональные приложения, драйверы или библиотеки, которые позволяют подключаться к Trino как из стандартных инструментов (например, SQL IDE или BI-систем), так и из кастомных приложений или скриптов.

Клиентские приложения включают CLI-инструменты, десктопные программы, веб-приложения и SaaS-решения с такими возможностями, как:

  • интерактивное написание SQL-запросов в редакторах,
  • визуальное построение запросов,
  • выполнение запросов и отображение результатов,
  • построение визуализаций (графики, диаграммы),
  • создание отчётов и дашбордов.

Если клиентское приложение поддерживает другие языки запросов или использует графический интерфейс для построения запроса, оно должно транслировать запрос в SQL, который поддерживает Trino.

Плагин (Plugin)

Trino использует плагинную архитектуру, которая позволяет расширять функциональность движка и интегрироваться с различными источниками данных и внешними системами.

Источник данных (Data source)

Trino — это движок выполнения запросов, который может работать с различными источниками данных, включая:

  • data lakes и lakehouses,
  • реляционные СУБД,
  • key-value хранилища,
  • и множество других типов систем хранения данных.

Источники данных предоставляют Trino данные для выполнения запросов.

Чтобы получить доступ к ним, необходимо настроить каталог (catalog), указав соответствующий коннектор (connector) для конкретного источника данных.

После этого можно использовать любой поддерживаемый клиент для выполнения SQL-запросов к этим источникам с использованием функционала Trino и возможностей клиента.

В документации Trino часто встречаются термины connector, catalog, schema и table — это ключевые понятия модели Trino, описывающие представление внешних источников данных.

Коннектор (Connector)

Коннектор — это адаптер, который позволяет Trino работать с конкретным источником данных — например, с data lake (через Hadoop/Hive или Apache Iceberg) или с реляционной БД (например, PostgreSQL).

По сути, коннектор можно рассматривать как драйвер для базы данных.

Он реализует Service Provider Interface (SPI) Trino, предоставляя стандартный API для взаимодействия движка с источником данных.

Trino включает множество встроенных коннекторов, включая:

  • Коннекторы для data lakes и lakehouses: Delta Lake, Hive, Hudi, Iceberg.
  • Коннекторы для реляционных СУБД: MySQL, PostgreSQL, Oracle, SQL Server.
  • Коннекторы для других систем: Cassandra, ClickHouse, OpenSearch, Pinot, Prometheus, SingleStore, Snowflake.
  • Вспомогательные коннекторы: JMX, System, TPC-H и другие.

Каждый каталог использует конкретный коннектор.

В конфигурационном файле каталога всегда присутствует обязательное свойство connector.name, которое определяет, какой коннектор используется.

Каталог (Catalog)

Каталог Trino — это набор конфигурационных свойств, необходимых для подключения к определённому источнику данных.

Каталог включает:

  • выбранный коннектор,
  • параметры подключения (URL, учётные данные и т. д.).

Каталоги описываются в файлах свойств (properties), которые хранятся в конфигурационном каталоге Trino (etc/catalog/).
Имя файла определяет имя каталога. Например, файл etc/example.properties создаёт каталог с именем example.

Вы можете настроить несколько каталогов — с разными или одинаковыми коннекторами — для доступа к различным источникам данных.

Например:

  • два каталога с Hive-коннектором для подключения к двум разным data lakes,
  • один каталог с Hive-коннектором для data lake и другой с Iceberg-коннектором для lakehouse,
  • несколько каталогов с PostgreSQL-коннектором для подключения к разным БД.

Каталог содержит одну или несколько схем (schemas), которые в свою очередь содержат объекты — таблицы, представления (views), материализованные представления и т. д.

Любой объект в Trino (например, таблица) имеет полное имя (fully-qualified name), начинающееся с имени каталога.

Например:
example.test_data.test — таблица test в схеме test_data каталога example.

Схема (Schema)

Схема — это способ организации таблиц внутри каталога.

Каталог и схема вместе определяют набор таблиц и других объектов, доступных для выполнения запросов.

При работе с Hive или реляционными СУБД (например, MySQL) схема в Trino напрямую соответствует схеме в целевой базе данных.
Для других типов коннекторов схемы могут определяться иначе, в зависимости от логики источника данных.

Таблица (Table)

Таблица — это набор неупорядоченных строк (rows), организованных в именованные столбцы (columns) с определёнными типами данных.
Это полностью соответствует понятию таблицы в любой реляционной базе данных.

Маппинг типов (type mapping) между исходными данными и типами Trino определяется коннектором, и может различаться в зависимости от коннектора.

Модель выполнения запросов (Query execution model)

Trino выполняет SQL-операторы (statements) и преобразует их в запросы (queries), которые обрабатываются распределённым кластером, состоящим из координатора и воркеров.

Оператор (Statement)

Trino выполняет SQL-операторы, совместимые со стандартом ANSI SQL.

Когда в документации Trino упоминается statement, имеется в виду именно оператор в смысле стандарта ANSI SQL — то есть конструкция, состоящая из клауз, выражений и предикатов.

Можно задаться вопросом, зачем различать понятия statement и query.
Это важно, потому что в Trino statement — это просто текст SQL-запроса, который пользователь отправляет системе.

Когда Trino выполняет оператор, он создаёт query — запрос, то есть внутреннее представление оператора с планом выполнения, который затем распределяется по узлам кластера Trino (воркерам).

Запрос (Query)

Когда Trino разбирает оператор (parses a statement), он преобразует его в запрос (query) и создаёт распределённый план выполнения (distributed query plan).
Этот план реализуется в виде набора связанных стадий (stages), выполняющихся на воркерах Trino.

Когда вы запрашиваете информацию о выполняющемся запросе, Trino возвращает снимок (snapshot) состояния всех компонентов, участвующих в обработке и формировании результирующего набора данных.

Разница между statement и query проста:

  • Statement — это SQL-текст, отправленный в Trino.
  • Query — это структура и набор компонентов (стадий, задач, разбиений и т.д.), созданных для выполнения этого SQL-запроса.

Таким образом, query включает в себя стадии (stages), задачи (tasks), разбиения (splits), коннекторы (connectors) и другие элементы, совместно обеспечивающие выполнение оператора.

Стадия (Stage)

Trino выполняет запрос, разбивая его на иерархию стадий.

Например, если Trino нужно агрегировать данные из миллиарда строк в Hive, он создаёт:

  • корневую стадию (root stage) — для агрегирования итоговых данных,
  • и несколько дочерних стадий, реализующих отдельные части распределённого плана.

Иерархия стадий напоминает дерево, где:

  • корневая стадия агрегирует результаты других стадий,
  • а каждая стадия описывает часть плана выполнения запроса.

Важно: сами стадии не выполняются на воркерах — они лишь представляют логическую модель выполнения, которую использует координатор.

Задача (Task)

Стадии не выполняются напрямую на воркерах.

Чтобы понять, как они исполняются, нужно знать, что каждая стадия реализуется как набор задач (tasks), распределённых между воркерами Trino.

Задачи — это “рабочие лошади” Trino.

Распределённый план запроса разбивается на стадии, которые затем транслируются в задачи.
Задачи обрабатывают данные (splits) и передают результаты дальше.

Каждая задача:

  • имеет входные и выходные потоки данных,
  • может выполняться параллельно с другими задачами,
  • включает драйверы (drivers), которые реализуют низкоуровневое выполнение.

Разбиение (Split)

Задачи работают с разбиениями (splits) — это фрагменты большого набора данных.

На нижних уровнях распределённого плана стадии получают данные из коннекторов через сплиты.

На высших уровнях — промежуточные стадии получают данные из других стадий (через обмены — exchanges).

Когда Trino планирует выполнение запроса:

  • Координатор запрашивает у коннектора список доступных сплитов для таблицы.
  • Затем он распределяет сплиты по задачам,
  • и отслеживает, какие воркеры обрабатывают какие сплиты.

Драйвер (Driver)

Каждая задача содержит один или несколько параллельных драйверов.

Драйвер — это последовательность операторов (operators), которые:

  • принимают данные,
  • применяют преобразования,
  • и выдают результат, который передаётся другой задаче на следующей стадии.

Можно считать драйвер физической реализацией набора операторов в памяти.
Это наименьшая единица параллелизма в архитектуре Trino.
Каждый драйвер имеет один вход и один выход.

Оператор (Operator)

Оператор — это компонент, который принимает, преобразует и производит данные.

Примеры:

  • Table scan operator — извлекает данные из коннектора, создавая поток строк.
  • Filter operator — получает данные и применяет предикат (условие), возвращая только подходящие строки.

Операторы объединяются в драйверы, образуя цепочку обработки данных.

Обмен (Exchange)

Exchanges (обмены) обеспечивают передачу данных между узлами Trino при выполнении разных стадий запроса.

Задачи записывают результаты в выходные буферы (output buffers).

Другие задачи потребляют эти данные через exchange clients.

Таким образом, обмены обеспечивают связь между стадиями и позволяют Trino выполнять сложные запросы распределённо и параллельно.

Установка Trino + Iceberg + Rest Catalog + Minio

Для того, чтобы поиграться с инструментом, можно запустить в Docker — настройки в GitHub https://github.com/ivanshamaev/trino-iceberg-minio

Статистика таблиц (Table statistics)

Trino поддерживает оптимизации запросов, основанные на статистике. Чтобы запрос мог использовать эти оптимизации, Trino должен иметь статистическую информацию о таблицах, участвующих в этом запросе.

Статистика таблиц представляет собой оценки характеристик хранимых данных. Эти оценки предоставляются планировщику запросов коннекторами и позволяют повышать производительность при обработке запросов.

Доступная статистика

В Trino доступны следующие виды статистики:

Для таблицы:

  • количество строк (row count): общее количество строк в таблице

Для каждого столбца в таблице:

  • размер данных (data size): объём данных, который необходимо прочитать
  • доля null-значений (nulls fraction): доля значений null
  • количество уникальных значений (distinct value count): число различных значений
  • минимальное значение (low value): наименьшее значение в столбце
  • максимальное значение (high value): наибольшее значение в столбце

Набор доступных статистических данных для конкретного запроса зависит от используемого коннектора и может различаться для разных таблиц. Например, коннектор Hive в настоящее время не предоставляет статистику по размеру данных.

Статистику таблиц можно просмотреть через SQL-интерфейс Trino с помощью команды SHOW STATS.

В зависимости от поддержки со стороны коннектора, статистика таблиц обновляется Trino при выполнении операторов управления данными, таких как INSERT, UPDATE или DELETE. Например, коннекторы Delta Lake, Hive и Iceberg поддерживают управление статистикой таблиц непосредственно из Trino.

Также можно инициировать сбор статистики с помощью команды ANALYZE. Это необходимо в случаях, когда другие системы изменяют данные без участия Trino, и, следовательно, статистика, отслеживаемая Trino, становится неактуальной. Другие коннекторы полагаются на базовый источник данных для управления статистикой таблиц или вовсе не поддерживают использование статистики таблиц.

Cost in EXPLAIN

Во время планирования стоимость, связанная с каждым узлом плана, вычисляется на основе статистики таблиц, участвующих в запросе. Эта рассчитанная стоимость выводится как часть результата выполнения оператора EXPLAIN.

Информация о стоимости отображается в дереве плана в формате {rows: XX (XX), cpu: XX, memory: XX, network: XX}. Параметр rows указывает ожидаемое количество строк, которое будет выведено каждым узлом плана во время выполнения. Значение в скобках после количества строк обозначает ожидаемый размер данных, выводимых узлом плана, в байтах. Остальные параметры показывают оценочное количество ресурсов CPU, памяти и сети, которые будут использованы при выполнении данного узла плана. Эти значения не представляют собой реальные единицы измерения, а служат для сравнения относительных затрат между различными узлами плана, что позволяет оптимизатору выбрать наилучший план выполнения запроса. Если какое-либо значение неизвестно, выводится символ ?.

Пример:

Результат:

Обычно для каждого узла плана выводится только одно значение стоимости. Однако когда оператор Scan совмещён с операторами Filter и/или Project, выводится несколько структур стоимости, каждая из которых соответствует отдельной логической части объединённого оператора. Например, для оператора ScanFilterProject отображаются три структуры стоимости, соответствующие, соответственно, частям Scan, Filter и Project.

Оценочная стоимость также выводится в EXPLAIN ANALYZE, дополнительно к фактическим статистическим данным о времени выполнения.

Cost-based optimizations

Trino поддерживает несколько оптимизаций, основанных на стоимости, описанных ниже.


Типы Joins в Trino (вводная часть перед рассмотрением оптимизаций)

Trino имеет так называемый оптимизатор на основе стоимости (Cost-Based Optimizer, CBO). Он анализирует статистику по таблицам (размер, распределение данных и т. д.) и автоматически решает, какой алгоритм соединения будет самым эффективным для данного запроса.

Broadcast Join

Это работает, когда одна из таблиц (меньшая) достаточно маленькая, чтобы поместиться в оперативную память на всех рабочих узлах.

  • «Строим» хэш-таблицу: Trino берет маленькую таблицу (называемую build side) и создает из нее специальную, очень быструю таблицу для поиска — хэш-таблицу.
  • «Рассылаем» ее: Эта хэш-таблица рассылается на каждый рабочий узел, где будет выполняться соединение.
  • «Проверяем»: Затем, другая, большая таблица (называемая probe side) построчно сканируется на каждом узле. Для каждой строки из большой таблицы происходит очень быстрый поиск совпадения в локальной хэш-таблице.
  • Соединяем: При нахождении совпадения, строки объединяются.

Когда это используется: Когда одна таблица значительно меньше другой. Это самый быстрый способ соединения, так как не нужно передавать много данных по сети между узлами.

Probe — Build schema

Partitioned Join

Этот метод применяется, когда обе таблицы слишком велики, чтобы их можно было целиком хранить в памяти каждого узла.

  • «Перемешиваем» данные: Trino берет обе большие таблицы и распределяет их между рабочими узлами, используя хэш-функцию от ключевого столбца соединения. Например, все строки, у которых join_key имеет хэш 1, отправляются на первый узел, с хэшем 2 — на второй и так далее.
  • «Соединяем локально»: После того как данные распределены, каждый рабочий узел получает только часть обеих таблиц. Теперь каждый узел может выполнить локальное объединение (как в broadcast join) со своей частью данных, так как все потенциальные совпадения теперь находятся на одном и том же узле.
  • Собираем результат: Конечный результат собирается со всех узлов.

Когда это используется: Когда обе таблицы большие. Это медленнее, чем broadcast join, из-за необходимости перемешивать данные по сети, но позволяет работать с очень большими объемами, которые не помещаются в память одного узла.


Главная идея: Probe side всегда обрабатывается потоком, строка за строкой, чтобы избежать полной загрузки большого объёма данных в оперативную память. Trino не хранит эти данные у себя, а работает с ними напрямую из источника.


Перебор соединений (Join enumeration)

Порядок, в котором выполняются соединения (joins) в запросе, может существенно влиять на его производительность. Основным фактором, оказывающим наибольшее влияние на производительность, является объём данных, обрабатываемых и передаваемых по сети. Если соединение, генерирующее большой объём данных, выполняется на ранней стадии выполнения запроса, то последующие этапы вынуждены обрабатывать эти большие объёмы дольше, чем это необходимо, что увеличивает время выполнения и потребление ресурсов.

С помощью перебора соединений на основе стоимости Trino использует статистику таблиц, предоставляемую коннекторами, для оценки стоимости различных порядков соединений и автоматически выбирает порядок с минимальной рассчитанной стоимостью.

Стратегия перебора соединений управляется параметром сессии join_reordering_strategy, при этом параметр конфигурации optimizer.join-reordering-strategy задаёт значение по умолчанию.

Возможные значения:

  • AUTOMATIC (по умолчанию) — включает полностью автоматический перебор соединений.
  • ELIMINATE_CROSS_JOINS — устраняет ненужные кросс-соединения.
  • NONE — чисто синтаксический порядок соединений.

Если используется AUTOMATIC и статистика недоступна или стоимость не может быть рассчитана по какой-либо причине, Trino применяет стратегию ELIMINATE_CROSS_JOINS.

Выбор стратегии распределения соединений (Join distribution selection)

Trino использует алгоритм соединения на основе хешей (hash join). Для каждого оператора соединения создаётся хеш-таблица на основе одного из входов соединения — так называемой build side. Второй вход, probe side, затем итерируется, и для каждой строки выполняется поиск совпадающих записей в хеш-таблице.

Существует два типа распределения соединений:

  • Partitioned (распределённое): каждый узел, участвующий в запросе, строит хеш-таблицу только на части данных.
  • Broadcast (рассылочное): каждый узел строит хеш-таблицу по всем данным, которые реплицируются на каждый узел.

Каждый тип имеет свои преимущества и недостатки. Partitioned — Распределённые соединения требуют перераспределения обеих таблиц с использованием хеша по ключу соединения. Такие соединения могут быть медленнее, чем рассылочные, но позволяют выполнять значительно более крупные соединения. Broadcast — Рассылочные соединения быстрее, если сторона build значительно меньше стороны probe. Однако Broadcast JOINs требуют, чтобы таблицы на стороне build после фильтрации помещались в память каждого узла, тогда как распределённые соединения требуют, чтобы данные помещались только в совокупную распределённую память всех узлов.

При выборе распределения соединений, основанном на стоимости, Trino автоматически определяет, использовать ли Partitioned или Broadcast соединение. При переборе соединений на основе стоимости Trino также автоматически выбирает, какая сторона будет probe, а какая build.

Стратегия распределения соединений управляется параметром сессии join_distribution_type, а параметр конфигурации join-distribution-type задаёт значение по умолчанию.

Допустимые значения:

  • AUTOMATIC (по умолчанию) — тип распределения соединения выбирается автоматически для каждого соединения.
  • BROADCAST — используется рассылочное соединение для всех случаев.
  • PARTITIONED — используется распределённое соединение для всех случаев.

Ограничение размера реплицируемой таблицы

Тип распределения соединения выбирается автоматически, когда стратегия перебора соединений установлена в AUTOMATIC или когда тип распределения соединения установлен в AUTOMATIC. В обоих случаях можно ограничить максимальный размер реплицируемой таблицы с помощью параметра конфигурации join-max-broadcast-table-size или параметра сессии join_max_broadcast_table_size. Это позволяет повысить параллелизм выполнения в кластере и предотвратить неудачные планы в случае, если оптимизатор на основе стоимости неверно оценивает размер соединяемых таблиц.

По умолчанию максимальный размер реплицируемой таблицы ограничен 100 МБ.

Синтаксический порядок соединений

Если оптимизация на основе стоимости не используется, Trino применяет синтаксический порядок соединений по умолчанию. Хотя формальных способов оптимизации запросов в этом режиме нет, можно воспользоваться особенностями реализации соединений в Trino для повышения производительности.

Trino использует соединения в памяти (in-memory hash joins). При обработке выражения JOIN Trino загружает самую правую таблицу соединения в память в качестве build side, затем потоково читает следующую справа таблицу в качестве probe side для выполнения соединения. Если запрос содержит несколько соединений, результат первого соединения остаётся в памяти как build side, а следующая таблица справа используется как probe side, и так далее для последующих соединений. Если порядок соединений усложнён, например, за счёт скобок, задающих приоритеты соединений, Trino может выполнять несколько нижестоящих соединений одновременно, но каждая стадия следует той же логике, включая финальное объединение результатов.

Из-за такого поведения оптимально синтаксически упорядочивать соединения в SQL-запросах от самых больших таблиц к самым маленьким, что позволяет минимизировать использование памяти.

Пример:


Предупреждение:
Такой способ оптимизации не является функцией Trino. Это побочный эффект реализации соединений, поэтому данное поведение может измениться без уведомления.


Реализации коннекторов

Чтобы оптимизатор Trino мог использовать стратегии, основанные на стоимости, реализация коннектора должна предоставлять статистику таблиц.

Trino может передавать выполнение запросов, либо отдельных частей запросов, во внешний источник данных. Это означает, что определённое предикатное условие, агрегирующая функция или другая операция передаются для выполнения непосредственно в базовую базу данных или систему хранения.

Результаты такого pushdown-а дают следующие преимущества:

  • Повышение общей производительности запросов
  • Снижение сетевого трафика между Trino и источником данных
  • Снижение нагрузки на удалённый источник данных

Эти преимущества часто приводят к заметному сокращению затрат.

Поддержка pushdown-а является специфичной для каждого коннектора и соответствующей базы данных или системы хранения.

Проброс предикатов (Predicate pushdown)

Predicate pushdown оптимизирует фильтрацию строк. Он использует вычисленный фильтр, как правило, сформированный из условия в операторе WHERE, чтобы исключить ненужные строки. Обработка фильтра передаётся коннектором в источник данных, где и выполняется.

Если проброс предиката для конкретного условия выполнен успешно, в плане выполнения (EXPLAIN) запроса не будет операции ScanFilterProject для этого условия.

Проброс проекций (Projection pushdown)

Projection pushdown оптимизирует фильтрацию по колонкам. Он использует столбцы, указанные в предложении SELECT и других частях запроса, чтобы ограничить доступ только к этим столбцам. Обработка передаётся коннектором в источник данных, и источник данных считывает и возвращает только необходимые столбцы.

Если проброс проекций выполнен успешно, план выполнения (EXPLAIN) запроса обращается только к соответствующим колонкам в Layout операции TableScan.

Проброс разыменования (Dereference pushdown)

Projection pushdown и dereference pushdown оба ограничивают доступ только к нужным столбцам, однако dereference pushdown является более избирательным. Он ограничивает доступ, позволяя считывать только конкретные поля внутри верхнего или вложенного типа данных ROW.

Например, если в таблице коннектора Hive есть столбец типа ROW с несколькими полями, и запрос обращается только к одному полю, dereference pushdown позволяет файловому ридеру считать только это одно поле внутри строки. То же самое применяется к полям вложенных структур внутри верхнего уровня ROW. Это может привести к существенной экономии объёма данных, считываемых из системы хранения.

Проброс агрегации (Aggregation pushdown)

Проброс агрегации может выполняться при соблюдении следующих условий:

  • Если коннектор в целом поддерживает проброс агрегации.
  • Если коннектор поддерживает проброс конкретной функции или функций.
  • Если структура запроса допускает выполнение проброса.

Проверить, выполняется ли проброс для конкретного запроса, можно с помощью анализа плана выполнения (EXPLAIN). Если агрегирующая функция была успешно передана коннектору, в плане выполнения не будет показан оператор Aggregate. В плане будут отображаться только операции, выполняемые непосредственно Trino.

Например, мы загрузили набор данных TPC-H в базу данных PostgreSQL и затем выполнили запрос к нему через коннектор PostgreSQL:

Получить план выполнения можно, добавив в начало запроса ключевое слово EXPLAIN:

План выполнения этого запроса не содержит оператора Aggregate с функцией count, так как эта операция выполняется коннектором. Функция count(*) отображается как часть оператора TableScan коннектора PostgreSQL. Это подтверждает, что проброс был выполнен успешно.

Существует ряд факторов, которые могут помешать выполнению проброса:

  • добавление дополнительных условий в запрос;
  • использование другой агрегирующей функции, которую нельзя пробросить в коннектор;
  • использование коннектора, не поддерживающего проброс для конкретной функции.

В этом случае в плане выполнения будет показана операция Aggregate, выполняемая Trino. Это чёткий признак того, что проброс в удалённый источник данных не выполняется, и агрегирование обрабатывается самим Trino.

Ограничения

Проброс агрегации не поддерживает ряд более сложных конструкций:

  • сложные группировки, такие как ROLLUP, CUBE или GROUPING SETS;
  • выражения внутри вызова агрегирующей функции, например sum(a * b);
  • приведение типов (coercions), например sum(integer_column);
  • агрегирования с сортировкой;
  • агрегирования с фильтром.

Проброс соединений (Join pushdown)

Проброс соединений позволяет коннектору передавать операцию соединения таблиц (JOIN) в нижележащий источник данных. Это может значительно повысить производительность и позволяет Trino обрабатывать оставшуюся часть запроса на меньшем объёме данных.

Поддержка проброса соединений зависит от конкретного источника данных, а значит — и от используемого коннектора.

Тем не менее, существуют общие условия, необходимые для выполнения проброса соединения:

  • все предикаты (условия), участвующие в соединении, должны поддерживать проброс;
  • все таблицы, участвующие в соединении, должны находиться в одном каталоге.

Проверить, выполняется ли проброс соединения, можно с помощью плана выполнения запроса (EXPLAIN). Если соединение было проброшено в источник данных, в плане не будет показан оператор Join:

Ниже приведён пример плана, полученного при выполнении запроса через коннектор PostgreSQL к данным TPC-H в базе PostgreSQL. Он не содержит оператора Join, что означает успешный проброс соединения:

Обычно проброс соединений полезен, однако в некоторых случаях он может привести к увеличению количества строк (например, при соединении больших таблиц), что может повлиять на производительность.

Проброс ограничения (Limit pushdown)

Операторы LIMIT или FETCH FIRST уменьшают количество возвращаемых записей. Проброс ограничения позволяет коннектору передавать обработку таких запросов в источник данных.

Это может улучшить производительность и значительно снизить объём данных, передаваемых из источника в Trino.

Такие запросы включают конструкции:

или

Реализация и поддержка зависят от конкретного коннектора, поскольку разные источники данных обладают разными возможностями.

Проброс Top-N (Top-N pushdown)

Комбинация LIMIT/FETCH FIRST с оператором ORDER BY создаёт запрос, который выбирает небольшое количество строк из большого отсортированного набора данных.
Так как порядок строк имеет значение, оптимизация таких запросов отличается от обычного проброса LIMIT.

Проброс такого запроса называется Top-N pushdown, поскольку он возвращает первые N строк. Эта оптимизация позволяет передать выполнение сортировки и выборки верхних N записей источнику данных, что значительно снижает нагрузку на Trino.

Такие запросы включают части:

или

Поддержка и реализация зависят от коннектора, так как разные источники данных используют разный синтаксис и методы выполнения.

Пример 1. Проброс Top-N в PostgreSQL

План выполнения (EXPLAIN):

Результат:

Здесь видно, что операция сортировки и ограничения выполняется на стороне PostgreSQL — признак успешного Top-N pushdown.

Пример 2. Отсутствие Top-N pushdown в коннекторе TPC-H

План выполнения:

В данном случае операция TopN[5 by (custkey ASC NULLS LAST)] выполняется самим Trino, а не источником данных, что означает отсутствие Top-N pushdown.

Примечание:

В отличие от примера с PostgreSQL, в плане запроса TPC-H присутствует оператор TopN в Fragment 0 — это явный признак того, что проброс Top-N не выполняется.

Адаптивные оптимизации планов (Adaptive plan optimizations)

Trino предоставляет несколько адаптивных оптимизаций планов выполнения, которые динамически изменяют план запроса на основе статистики, собираемой во время выполнения.
Эти оптимизации доступны только при включённом режиме отказоустойчивого выполнения (Fault-tolerant execution).

Чтобы отключить все адаптивные оптимизации планов, необходимо установить конфигурационное свойство
fault-tolerant-execution-adaptive-query-planning-enabled=false.

Эквивалентное свойство для сессии — fault_tolerant_execution_adaptive_query_planning_enabled.

Адаптивное переупорядочивание распределённых соединений (Adaptive reordering of partitioned joins)

По умолчанию Trino включает адаптивное переупорядочивание соединений с распределением по разделам (partitioned joins).
Эта оптимизация позволяет Trino динамически менять порядок входных данных для соединений на основе фактических размеров сторон build и probe во время выполнения запроса.

Это особенно полезно, когда статистика таблиц недоступна заранее — Trino может принимать более эффективные решения о порядке соединений, основываясь на реальной информации, собранной во время выполнения.

Чтобы отключить эту оптимизацию, установите конфигурационное свойство
fault-tolerant-execution-adaptive-join-reordering-enabled=false.

Эквивалентное свойство для сессии — fault_tolerant_execution_adaptive_join_reordering_enabled.

Trino Iceberg connector

Apache Iceberg — это открытый табличный формат для работы с большими аналитическими наборами данных.

Коннектор Iceberg позволяет выполнять запросы к данным, хранящимся в файлах, записанных в формате Iceberg, как определено в Iceberg Table Spec.
Коннектор поддерживает спецификации таблиц Apache Iceberg версий 1 и 2.

Состояние таблицы хранится в метаданных. Каждое изменение состояния таблицы создаёт новый файл метаданных и заменяет старый атомарной операцией.

Файл метаданных таблицы содержит информацию о схеме таблицы, конфигурации партиционирования, пользовательских свойствах, а также снимках (snapshots) содержимого таблицы.

Данные Iceberg хранятся в файлах форматов Parquet, ORC или Avro — в зависимости от свойства format, заданного при определении таблицы.

Iceberg был разработан для устранения известных проблем масштабируемости Hive. Hive хранит метаданные таблиц в metastore, который работает поверх реляционной базы данных (например, MySQL). Hive отслеживает пути к партициям в metastore, но не управляет отдельными файлами данных. При выполнении запроса через коннектор Hive в Trino сначала запрашиваются пути к партициям из metastore, затем вызывается файловая система для получения списка всех файлов данных в каждой партиции, после чего считываются метаданные из каждого файла.

В отличие от этого, Iceberg хранит пути к файлам данных в самих файлах метаданных, поэтому при выполнении запросов обращается к файловой системе только для чтения нужных файлов данных.

Требования (Requirements)

Для использования Iceberg необходимы следующие условия:

  • Сетевой доступ от координатора Trino и рабочих узлов (workers) к распределённому объектному хранилищу.
  • Доступ к одному из поддерживаемых сервисов каталога: Hive metastore (HMS), AWS Glue catalog, JDBC catalog, REST catalog, Nessie server или Snowflake catalog.
  • Файлы данных, сохранённые в одном из поддерживаемых форматов — Parquet (по умолчанию), ORC или Avro — и размещённые в поддерживаемой файловой системе.

Чтобы использовать Hive Thrift Metastore, необходимо задать параметр конфигурации
hive.metastore=thrift и дополнительно указать подробности с помощью следующих свойств конфигурации:

Property name Description Default
hive.metastore.uri URIs Hive metastore для подключения по протоколу Thrift. Если указан список URI через запятую, используется первый URI, остальные — резервные. Обязательное свойство. Пример: thrift://192.0.2.3:9083 или thrift://192.0.2.3:9083,thrift://192.0.2.4:9083
hive.metastore.username Имя пользователя, используемое Trino для доступа к Hive metastore
hive.metastore.authentication.type Тип аутентификации Hive metastore. Возможные значения: NONE или KERBEROS NONE
hive.metastore.thrift.client.connect-timeout Время ожидания соединения для клиента metastore 10s
hive.metastore.thrift.client.read-timeout Время ожидания чтения сокета для клиента metastore 10s
hive.metastore.thrift.impersonation.enabled Включить имитацию пользователя (end user impersonation) Hive metastore
hive.metastore.thrift.use-spark-table-statistics-fallback Использовать статистику таблиц, сгенерированную Apache Spark, если статистика Hive недоступна true
hive.metastore.thrift.delegation-token.cache-ttl Время жизни кэша делегирующих токенов для metastore 1h
hive.metastore.thrift.delegation-token.cache-maximum-size Максимальный размер кэша делегирующих токенов 1000
hive.metastore.thrift.client.ssl.enabled Использовать SSL при подключении к metastore false
hive.metastore.thrift.client.ssl.key Путь к приватному ключу и сертификату клиента (key store)
hive.metastore.thrift.client.ssl.key-password Пароль для приватного ключа
hive.metastore.thrift.client.ssl.trust-certificate Путь к цепочке сертификатов сервера (trust store). Обязателен при включенном SSL
hive.metastore.thrift.client.ssl.trust-certificate-password Пароль для trust store
hive.metastore.service.principal Kerberos principal сервиса Hive metastore
hive.metastore.client.principal Kerberos principal, используемый Trino для подключения к Hive metastore
hive.metastore.client.keytab Путь к keytab клиента Hive metastore
hive.metastore.thrift.delete-files-on-drop Активное удаление файлов для управляемых таблиц при операциях drop table/partition, если metastore не удаляет файлы false
hive.metastore.thrift.assume-canonical-partition-keys Разрешить метастору предполагать, что значения колонок партиций можно конвертировать в строки. Улучшает производительность фильтров по партициям. Колонки типа TIMESTAMP не канонизируются false
hive.metastore.thrift.client.socks-proxy SOCKS-прокси для Thrift Hive metastore
hive.metastore.thrift.client.max-retries Максимальное число повторных попыток запросов к metastore 9
hive.metastore.thrift.client.backoff-scale-factor Множитель для увеличения задержки повторных попыток запросов 2.0
hive.metastore.thrift.client.max-retry-time Общее максимально допустимое время для повторных попыток запроса к metastore 30s
hive.metastore.thrift.client.min-backoff-delay Минимальная задержка между повторными попытками запроса 1s
hive.metastore.thrift.client.max-backoff-delay Максимальная задержка между повторными попытками запроса 1s
hive.metastore.thrift.txn-lock-max-wait Максимальное время ожидания блокировки транзакции Hive 10m
hive.metastore.thrift.catalog-name «Имя каталога Hive metastore» — концепция абстракции в Hive для подключения к независимым каталогам. По умолчанию: hive. Если оставить пустым, будет использоваться каталог по умолчанию hive

Конфигурационные свойства Hive-каталога, специфичные для Iceberg

При использовании Hive-каталога коннектор Iceberg поддерживает те же общие параметры конфигурации Thrift Metastore, что и описанные ранее, с добавлением следующего свойства:

iceberg.hive-catalog.locking-enabled — фиксация изменений (commit) таблиц с использованием блокировок Hive. Значение по умолчанию = true.

Установка iceberg.hive-catalog.locking-enabled=false приведёт к фиксации изменений таблиц без использования блокировок Hive. Это свойство следует устанавливать в false только если выполняются все нижеперечисленные условия:

  • HIVE-26882 доступен на сервере Hive Metastore. Требуется версия 2.3.10, 4.0.0-beta-1 или новее.
  • HIVE-28121 доступен на сервере Hive Metastore, если он работает с MySQL или MariaDB. Требуется версия 2.3.10, 4.1.0, 4.0.1 или новее.

Все остальные каталоги, выполняющие фиксацию изменений в те же таблицы, что и данный каталог, также должны работать на Iceberg версии 1.3 или выше и иметь отключённые блокировки Hive при фиксации.

Общая конфигурация

Чтобы настроить коннектор Iceberg, необходимо создать файл свойств каталога etc/catalog/example.properties, который ссылается на коннектор Iceberg.

Каталог Hive Metastore является реализацией по умолчанию.

Необходимо выбрать и настроить одну из поддерживаемых файловых систем.

Замените свойство конфигурации fs.x.enabled на соответствующую настройку для требуемой файловой системы.

Также доступны другие типы каталогов метаданных, указанные в разделе требований данной темы. Каждый тип метастора имеет собственные специфические параметры конфигурации, а также общие параметры конфигурации метастора.

Следующие параметры конфигурации Iceberg являются независимыми от того, какая реализация каталога используется:

Property name Description Default
iceberg.catalog.type Определяет тип метастора. Возможные значения: hive_metastore, glue, jdbc, rest, nessie, snowflake hive_metastore
iceberg.file-format Определяет формат файлов хранения данных Iceberg. Возможные значения: PARQUET, ORC, AVRO PARQUET
iceberg.compression-codec Кодек сжатия при записи файлов. Возможные значения: NONE, SNAPPY, LZ4, ZSTD, GZIP ZSTD
iceberg.use-file-size-from-metadata Использовать размеры файлов из метаданных вместо файловой системы. Только как временное решение для версии < 0.11.0 Iceberg true
iceberg.max-partitions-per-writer Максимальное число партиций, обрабатываемых одним writer. Эквивалентное свойство сессии: max_partitions_per_writer 100
iceberg.target-max-file-size Целевой максимальный размер записываемых файлов; фактический размер может быть больше 1GB
iceberg.unique-table-location Использовать случайные, уникальные местоположения таблиц true
iceberg.dynamic-filtering.wait-timeout Максимальное время ожидания завершения динамических фильтров при генерации сплитов 1s
iceberg.delete-schema-locations-fallback Удалять ли местоположения схем, если Trino не может определить наличие внешних файлов false
iceberg.minimum-assigned-split-weight Минимальный вес, назначаемый каждому сплиту (0, 1]. Низкое значение улучшает производительность для таблиц с маленькими файлами 0.05
iceberg.table-statistics-enabled Включение статистики таблиц. Эквивалентное свойство сессии: statistics_enabled. Используется для cost-based оптимизаций true
iceberg.extended-statistics.enabled Включение сбора расширенной статистики с ANALYZE. Эквивалентное свойство сессии: extended_statistics_enabled true
iceberg.extended-statistics.collect-on-write Включение сбора расширенной статистики при записи. Эквивалентное свойство сессии: collect_extended_statistics_on_write true
iceberg.projection-pushdown-enabled Включение pushdown проекций true
iceberg.hive-catalog-name Каталог для переадресации при ссылке на Hive таблицу
iceberg.register-table-procedure.enabled Разрешить вызов процедуры register_table пользователем false
iceberg.add-files-procedure.enabled Разрешить вызов процедуры add_files пользователем false
iceberg.query-partition-filter-required Принудительно требовать фильтр по партициям для запросов к схемам, указанным в iceberg.query-partition-filter-required-schemas. Эквивалентное свойство сессии: query_partition_filter_required false
iceberg.query-partition-filter-required-schemas Список схем, для которых Trino применяет фильтр по ключам партиций. Эквивалентное свойство сессии: query_partition_filter_required_schemas. Используется, если iceberg.query-partition-filter-required=true []
iceberg.incremental-refresh-enabled Включение инкрементного обновления materialized view. Эквивалентное свойство сессии: incremental_refresh_enabled true
iceberg.metadata-cache.enabled Включение in-memory кэша метаданных на координаторе. Не используется при fs.cache.enabled=true true
iceberg.object-store-layout.enabled Включение object store file layout Iceberg. Добавляет детерминированный хэш после пути записи false
iceberg.expire-snapshots.min-retention Минимальный период хранения для команды expire_snapshots. Эквивалентное свойство сессии: expire_snapshots_min_retention 7d
iceberg.remove-orphan-files.min-retention Минимальный период хранения для команды remove_orphan_files. Эквивалентное свойство сессии: remove_orphan_files_min_retention 7d
iceberg.idle-writer-min-file-size Минимальный объем данных, записанных одним writer, чтобы считаться idle и закрытым движком. Эквивалентное свойство сессии: idle_writer_min_file_size 16MB
iceberg.sorted-writing-enabled Включение сортированной записи в таблицы с заданным порядком сортировки. Эквивалентное свойство сессии: sorted_writing_enabled true
iceberg.sorted-writing.local-staging-path Локальная директория для staging при записи сортированных таблиц. Поддерживается ${USER} для разных пользователей. Без настройки используется целевое хранилище
iceberg.allowed-extra-properties Список дополнительных свойств, разрешённых для Iceberg таблиц. * — разрешить все []
iceberg.split-manager-threads Количество потоков для генерации сплитов В 2 раза больше числа процессоров на координаторе
iceberg.metadata.parallelism Количество потоков для получения метаданных. В данный момент параллелизируется только загрузка таблиц 8
iceberg.file-delete-threads Количество потоков для удаления файлов при выполнении expire_snapshots В 2 раза больше числа процессоров на координаторе
iceberg.bucket-execution Включение bucket-aware выполнения. Использует физическую информацию о бакетах для оптимизации запросов и уменьшения обмена данными true

Hive connector

Коннектор Hive в Trino позволяет выполнять запросы к данным, хранящимся в хранилище данных Apache Hive.

Hive представляет собой комбинацию трёх основных компонентов:

  • Файлы данных в различных форматах — обычно хранятся в Hadoop Distributed File System (HDFS) или в объектных хранилищах, таких как Amazon S3.
  • Метаданные, описывающие, как файлы данных сопоставляются со схемами и таблицами. Эти метаданные хранятся в базе данных (например, MySQL) и доступны через службу Hive Metastore (HMS).
  • Язык запросов HiveQL, который выполняется в распределённой среде вычислений, такой как MapReduce или Tez.

Trino использует только первые два компонента — данные и метаданные. Trino не использует HiveQL и не зависит от среды выполнения Hive.

Требования

Чтобы использовать Hive-коннектор, необходимо:

  • Служба Hive Metastore (HMS) или совместимая реализация (например, AWS Glue).
  • Настроенная файловая система, поддерживаемая Trino, указанная в конфигурационном файле каталога (catalog).

Сетевой доступ ко всем нужным системам:

  • координатор и все worker-узлы должны иметь доступ к Hive Metastore и системе хранения данных;
  • доступ к Hive Metastore по протоколу Thrift по умолчанию осуществляется через порт 9083.

Файлы данных должны быть в поддерживаемом формате.
Формат можно задать через свойство таблицы format и соответствующие параметры.

Поддерживаемые форматы файлов

  • ORC
  • Parquet
  • Avro
  • Поддерживаемые SerDe (сериализаторы/десериализаторы)

Для сериализуемых форматов допустимы только определённые SerDe:

  • RCText — RCFile с использованием ColumnarSerDe
  • RCBinary — RCFile с использованием LazyBinaryColumnarSerDe
  • SequenceFile (Text)org.apache.hadoop.io.Text
  • SequenceFile (BytesWritable) — Использует com.twitter.elephantbird.hive.serde.ProtobufDeserializer для десериализации protobuf-записей
  • CSVorg.apache.hadoop.hive.serde2.OpenCSVSerde
  • JSONorg.apache.hive.hcatalog.data.JsonSerDe
  • OPENX_JSONorg.openx.data.jsonserde.JsonSerDe (OpenX JSON SerDe) — подробнее см. реализацию Trino в исходном коде
  • TextFile — Обычный текстовый формат
  • ESRIcom.esri.hadoop.hive.serde.EsriJsonSerDe — SerDe для данных ESRI JSON

Хочешь, я переведу следующую часть про конфигурацию Hive-коннектора (например, hive.config.properties и параметры hive.metastore.uri, hive.s3.*)?

Общая конфигурация Hive-коннектора

В таблице документации Trino приведён список общих параметров конфигурации для коннектора Hive. Помимо них, существуют дополнительные наборы параметров, описанные в других разделах документации (например, для настройки S3, GCS, Azure Blob, метастора, ORC/Parquet и др.).

Имя параметра Описание Значение по умолчанию
hive.recursive-directories Включает чтение данных из поддиректорий в путях таблиц или партиций. Если выключено, поддиректории игнорируются. Аналогично свойству hive.mapred.supports.subdirectories в Hive. false
hive.ignore-absent-partitions Игнорировать партиции, если путь в файловой системе не существует, вместо ошибки. Это может привести к пропуску ожидаемых данных. false
hive.storage-format Формат файлов по умолчанию при создании новых таблиц. ORC
hive.orc.use-column-names Доступ к колонкам ORC по именам. По умолчанию доступ осуществляется по порядковому номеру. Эквивалент свойства сессии orc_use_column_names. false
hive.parquet.use-column-names Доступ к колонкам Parquet по именам. Если false, используется порядок колонок. Эквивалент parquet_use_column_names. true
hive.parquet.time-zone Часовой пояс для чтения и записи Parquet. По умолчанию часовой пояс JVM
hive.compression-codec Кодек сжатия при записи файлов. Возможные значения: NONE, SNAPPY, LZ4, ZSTD, GZIP. GZIP
hive.force-local-scheduling Принудительное выполнение сплитов на том же узле, где работает DataNode, обслуживающий данные. Полезно, если Trino установлен рядом с каждым DataNode. false
hive.respect-table-format Следует ли использовать существующий формат таблицы при записи новых партиций. true
hive.immutable-partitions Разрешена ли вставка данных в существующие партиции. Если true, то hive.insert-existing-partitions-behavior=APPEND запрещено. false
hive.insert-existing-partitions-behavior Что происходит при вставке данных в существующую партицию: • APPEND — добавляет данные • OVERWRITE — перезаписывает • ERROR — запрещает изменение APPEND
hive.target-max-file-size Максимальный желаемый размер новых файлов. 1GB
hive.create-empty-bucket-files Создавать ли пустые файлы для бакетов без данных. false
hive.validate-bucketing Проверка корректности распределения данных по бакетам при чтении. true
hive.partition-statistics-sample-size Количество партиций, анализируемых при вычислении статистики таблицы. 100
hive.max-partitions-per-writers Максимальное число партиций на одного writer’а. 100
hive.max-partitions-for-eager-load Максимальное число партиций, которые координатор может заранее загрузить для одной таблицы. 100,000
hive.max-partitions-per-scan Максимальное число партиций для одного сканирования таблицы. 1,000,000
hive.non-managed-table-writes-enabled Разрешить запись в внешние (non-managed) таблицы. false
hive.non-managed-table-creates-enabled Разрешить создание внешних (non-managed) таблиц. true
hive.collect-column-statistics-on-write Автоматически собирать статистику по колонкам при записи данных. true
hive.file-status-cache-tables Кэшировать список файлов для указанных таблиц. Примеры: fruit.apple,fruit.orange — только для этих таблиц fruit.*,vegetable.* — для всех таблиц в схемах fruit и vegetable * — для всех таблиц во всех схемах
hive.file-status-cache.excluded-tables Исключения из кэширования (противоположно предыдущему параметру).
hive.file-status-cache.max-retained-size Максимальный размер данных, удерживаемых в кэше. 1GB
hive.file-status-cache-expire-time Время жизни кэшированных данных. 1m
hive.per-transaction-file-status-cache.max-retained-size Максимальный объём кэша для всех активных транзакций. 100MB
hive.rcfile.time-zone Часовой пояс для двоичных timestamp в RCFile. Для Hive 3.1+ должно быть UTC. Часовой пояс JVM
hive.timestamp-precision Точность столбцов типа TIMESTAMP: MILLISECONDS, MICROSECONDS, NANOSECONDS. MILLISECONDS
hive.temporary-staging-directory-enabled Использовать ли временную директорию для промежуточных файлов при записи. true
hive.temporary-staging-directory-path Путь к временной директории для операций записи. Можно использовать ${USER} для подстановки имени пользователя. /tmp/presto-${USER}
hive.hive-views.enabled Включить поддержку Hive Views. false
hive.hive-views.legacy-translation Использовать устаревший механизм перевода Hive Views. false
hive.parallel-partitioned-bucketed-writes Повысить параллелизм при записи в партиционированные и бакетированные таблицы. true
hive.query-partition-filter-required Требовать наличие фильтра по партициям в запросе. false
hive.query-partition-filter-required-schemas Список схем, для которых обязательно использование фильтра по партициям (работает только если вышеуказанный параметр включён). []
hive.table-statistics-enabled Включить сбор статистики таблиц (используется для CBO). true
hive.auto-purge Устанавливает значение свойства auto_purge для управляемых таблиц по умолчанию. false
hive.partition-projection-enabled Включает поддержку partition projection (Athena-style). true
hive.s3-glacier-filter Фильтрация S3-объектов по классу хранения: • READ_ALL — читать все • READ_NON_GLACIER — только не Glacier • READ_NON_GLACIER_AND_RESTORED — не Glacier и восстановленные READ_ALL
hive.max-partition-drops-per-query Максимальное количество партиций, которые можно удалить за один запрос. 100,000
hive.metastore.partition-batch-size.max Максимальное число партиций, обрабатываемых за один батч. 100
hive.single-statement-writes Включает автокоммит для всех операций записи (запрещает многооперационные транзакции). false
hive.metadata.parallelism Количество потоков для загрузки метаданных (например, таблиц). 8
hive.protobuf.descriptors.location Путь к каталогу, где хранятся бинарные дескрипторы Protocol Buffers для таблиц с форматом ProtobufDeserializer.
hive.protobuf.descriptors.cache.max-size Максимальный размер кэша дескрипторов Protocol Buffers. 64
hive.protobuf.descriptors.cache.refresh-interval Как часто перезагружать дескрипторы Protocol Buffers с диска. 1d

Поддержка отказоустойчивого выполнения

Коннектор поддерживает отказоустойчивое выполнение (Fault-tolerant execution) при обработке запросов. Поддерживаются как операции чтения, так и записи с любой политикой повторных попыток (retry policy).

Конфигурация доступа к файловым системам

Коннектор поддерживает доступ к следующим файловым системам:

  • Azure Storage — поддержка файловой системы Azure Storage.
  • Google Cloud Storage — поддержка файловой системы Google Cloud Storage.
  • S3 — поддержка файловой системы Amazon S3.
  • HDFS — поддержка файловой системы Hadoop Distributed File System.

Необходимо включить и настроить доступ к конкретной файловой системе. Поддержка устаревших конфигураций не рекомендуется и будет удалена.

Сопоставление типов (Type mapping)

Коннектор читает и записывает данные в поддерживаемых форматах файлов Avro, ORC и Parquet в соответствии со спецификацией Iceberg.

Поскольку Trino и Iceberg поддерживают разные наборы типов данных, коннектор выполняет преобразование некоторых типов при чтении или записи данных. Типы данных могут не совпадать полностью в обоих направлениях между Trino и источником данных. Подробные сведения о сопоставлении типов в каждом направлении приведены в следующих разделах.

Спецификация Iceberg определяет поддерживаемые типы данных и их сопоставление с форматами хранения в файлах Avro, ORC и Parquet:

  • Iceberg → Avro
  • Iceberg → ORC
  • Iceberg → Parquet

Сопоставление типов Iceberg с типами Trino

Коннектор сопоставляет типы Iceberg с соответствующими типами Trino в соответствии со следующей таблицей:

Iceberg type Trino type
BOOLEAN BOOLEAN
INT INTEGER
LONG BIGINT
FLOAT REAL
DOUBLE DOUBLE
DECIMAL(p,s) DECIMAL(p,s)
DATE DATE
TIME TIME(6)
TIMESTAMP TIMESTAMP(6)
TIMESTAMPTZ TIMESTAMP(6) WITH TIME ZONE
STRING VARCHAR
UUID UUID
BINARY VARBINARY
FIXED(L) VARBINARY
STRUCT(…) ROW(…)
LIST(e) ARRAY(e)
MAP(k,v) MAP(k,v)

Функции

Функции доступны в системной схеме каждого каталога. Их можно вызывать в SQL-запросах. Например, следующий фрагмент кода демонстрирует, как выполнить функцию system.bucket в каталоге Iceberg:

bucket

Эта функция предоставляет доступ к трансформации bucket из Iceberg, позволяя пользователям определить, в какой бакет (bucket) попадает конкретное значение. Функция принимает два аргумента: значение партиции и количество бакетов.

Поддерживаемые типы для первого аргумента функции:

  • TINYINT
  • SMALLINT
  • INTEGER
  • BIGINT
  • VARCHAR
  • VARBINARY
  • DATE
  • TIMESTAMP
  • TIMESTAMP WITH TIME ZONE

Например, если нужно определить, к какому номеру бакета будет отнесена конкретная строка, можно выполнить следующий запрос:

Эту функцию также можно использовать в выражении WHERE, чтобы выполнять операции только для определённого бакета:

Управление данными Trino и Iceberg

Функциональность управления данными включает поддержку операторов INSERT, UPDATE, DELETE, TRUNCATE и MERGE.

Удаление по партициям

Для партиционированных таблиц коннектор Iceberg поддерживает удаление целых партиций, если выражение WHERE содержит фильтры только по колонкам партиционирования с идентичной трансформацией (identity transform), которые могут соответствовать целым партициям.

Например, для таблицы, определённой в разделе «Партиционированные таблицы», следующий SQL-запрос удаляет все партиции, где значение столбца country равно ‘US’:

Удаление партиций выполняется, если выражение WHERE удовлетворяет этим условиям.

Удаление на уровне строк

Таблицы, использующие вторую версию (v2) спецификации Iceberg, поддерживают удаление отдельных строк с помощью записи position delete-файлов.

Управление схемами и таблицами

Функциональность управления схемами и таблицами включает поддержку следующих операций:

  • CREATE SCHEMA
  • DROP SCHEMA
  • ALTER SCHEMA
  • CREATE TABLE
  • CREATE TABLE AS
  • DROP TABLE
  • ALTER TABLE
  • COMMENT

Эволюция схемы

Iceberg поддерживает эволюцию схемы, включая безопасное добавление, удаление и переименование колонок, в том числе во вложенных структурах.

Iceberg поддерживает изменение типов колонок только в случае операций расширения (widening):

  • INTEGER → BIGINT
  • REAL → DOUBLE
  • DECIMAL(p,s) → DECIMAL(p2,s), если p2 > p (масштаб при этом изменяться не может)

Также может быть изменена схема партиционирования, при этом коннектор сохраняет возможность выполнять запросы к данным, созданным до изменения партиционирования.

ALTER TABLE EXECUTE

Коннектор поддерживает следующие команды, используемые с оператором ALTER TABLE EXECUTE.

optimize

Команда optimize используется для переписывания содержимого указанной таблицы таким образом, чтобы данные были объединены в меньшее количество, но более крупных файлов. Если таблица партиционирована, операция компактации данных выполняется отдельно для каждой партиции, выбранной для оптимизации. Эта операция повышает производительность при чтении.

Все файлы, размер которых меньше заданного параметра file_size_threshold (пороговое значение по умолчанию — 100 МБ), объединяются в случае, если выполняется одно из следующих условий для каждой партиции:

  • более одного файла данных доступно для объединения;
  • присутствует хотя бы один файл данных, к которому прикреплены файлы удаления (delete files).

Следующий запрос объединяет файлы в таблице, размер которых менее 128 мегабайт:

Можно использовать предложение WHERE с колонками, по которым осуществляется партиционирование таблицы, чтобы ограничить оптимизацию определёнными партициями:

Можно задать более сложное выражение WHERE, чтобы сузить область действия процедуры оптимизации. В следующем примере значения с типом timestamp приводятся к типу date, и используется сравнение, чтобы оптимизировать только те партиции, которые содержат данные начиная с 2022 года:

Можно также использовать выражение WHERE с метаданными колонок, чтобы отфильтровать, какие файлы будут оптимизированы:

optimize_manifests

Перезаписывает файлы манифестов, группируя их по колонкам партиционирования. Это может быть полезно для оптимизации планирования сканирования, когда существует множество мелких файлов манифестов или когда в запросах на чтение используются фильтры по партициям, но сами файлы манифестов не сгруппированы по партициям. Параметр таблицы Iceberg commit.manifest.target-size-bytes управляет максимальным размером файлов манифестов, создаваемых этой процедурой.

Процедура optimize_manifests выполняется следующим образом:

expire_snapshots

Команда expire_snapshots удаляет все снапшоты, а также связанные с ними метаданные и файлы данных. Регулярное выполнение очистки снапшотов рекомендуется для удаления файлов данных, которые больше не используются, и для поддержания компактного размера метаданных таблицы. Процедура затрагивает все снапшоты, которые старше периода времени, заданного параметром retention_threshold.

Процедура expire_snapshots выполняется следующим образом:

Значение параметра retention_threshold должно быть больше или равно значению iceberg.expire-snapshots.min-retention, заданному в каталоге, иначе выполнение процедуры завершится с ошибкой следующего вида:

  • Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d).
    Значение по умолчанию для этого свойства — 7d.

remove_orphan_files

Команда remove_orphan_files удаляет все файлы из каталога данных таблицы, которые не связаны с файлами метаданных и которые старше значения, заданного параметром retention_threshold. Регулярное удаление «осиротевших» файлов рекомендуется для поддержания контролируемого размера каталога данных таблицы.

Процедура remove_orphan_files выполняется следующим образом:

  • Значение параметра retention_threshold должно быть больше или равно значению iceberg.remove-orphan-files.min-retention, указанному в каталоге, иначе выполнение процедуры завершится с ошибкой следующего вида:

Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d).
Значение по умолчанию для этого свойства — 7d.

Результат выполнения запроса содержит следующие метрики:

Имя свойства Описание
processed_manifests_count Количество файлов манифестов, прочитанных процедурой remove_orphan_files.
active_files_count Количество файлов, принадлежащих снапшотам, которые ещё не были удалены (не истекли).
scanned_files_count Количество файлов, просканированных из файловой системы.
deleted_files_count Количество файлов, удалённых процедурой remove_orphan_files.

Основные алгоритмы соединения в Trino

Trino применяет классический алгоритм хеш‑соединения (hash join) с распределением данных по узлам, и выбирает конкретную стратегию распределения соединения в зависимости от статистики и конфигурации.

Основные режимы:

  • Broadcast join — одна из сторон соединения (обычно «меньшая» таблица, build‑сторона) транслируется (broadcast) на все узлы, которые обрабатывают другую таблицу (probe‑сторона). Каждая нода строит хеш‑таблицу по build‑стороне, затем probe‑сторона сканируется и сравнивается с этой хеш‑таблицей. Преимущество: минимум сетевого обмена данных, если build‑сторона маленькая. Недостаток: build‑сторона должна помещаться в память каждой ноды.
  • Partitioned (hash‑distributed) join — обе стороны соединения распределяются (shuffle) по узлам по хешу ключа соединения. То есть строки обеих таблиц хэшируются по ключу соединения и направляются на узлы таким образом, что строки с одинаковыми ключами «встречаются» на одном узле. Затем на каждом узле строится хеш‑таблица и выполняется соединение. Этот режим требует больше сетевого обмена (shuffle) и может быть дороже, но позволяет соединять большие таблицы, когда broadcast невозможен.
  • Automatic (cost‑based выбор) — если свойство join_distribution_type установлено в AUTOMATIC, Trino на основании статистики (row counts, размер таблиц, распределение, доступность статистики) выбирает между broadcast и partitioned.

Также Trino применяет динамическую фильтрацию (dynamic filtering / dynamic partition pruning), которая помогает уменьшить объём данных, участвующих в соединении, за счёт использования фильтров, полученных на build‑стороне, и применяемых к probe‑стороне.

Что такое build‑сторона и probe‑сторона

Когда Trino выполняет hash join, процесс выглядит так:

  • Build‑сторона – это таблица (или её часть), по которой строится хеш‑таблица в памяти. Эта таблица «кладётся» в RAM для последующего быстрого поиска.
  • Probe‑сторона – это таблица, которая итерируется построчно, и для каждой строки выполняется поиск совпадений в хеш‑таблице, построенной по build‑стороне. Таблица probe side в Trino не хранится где-то постоянно, а потоково обрабатывается (то есть не загружается в память целиком). 
  • Важно: для broadcast join build‑сторона должна помещаться в память всех узлов кластера. Если она слишком большая, это может привести к проблемам с памятью.
  • Для partitioned join build‑сторона распределяется по узлам по хешу ключа, что снижает требования к памяти на отдельном узле, но увеличивает сетевой трафик (shuffle).

Что такое OOM

OOM (Out Of Memory) – это ситуация, когда процесс не может выделить память для выполнения операции и падает.

В контексте Trino и hash join OOM возникает, если:

  • build‑сторона слишком большая для оперативной памяти узлов,
  • probe‑сторона слишком большая и одновременно удерживается в памяти много промежуточных данных,
  • большое количество промежуточных результатов или shuffle операций.

Как это работает с Iceberg + Hive + HDFS

Когда Trino работает с Iceberg таблицами (например, хранящимися на HDFS или объектном хранилище) и/или через Hive‑метастор, совокупность форматов и оптимизаций влияет на выбор алгоритма соединения и его эффективность. Вот ключевые моменты:

  • Iceberg таблицы поддерживают богатую метаинформацию: файлы данных, manifest‑лист, верх/низ границ значений в файлах, статистику по колонкам, информацию о партициях.
  • Благодаря этой метаинформации Trino может ограничивать ( prune ) число файлов и разделов, которые участвуют в соединении: например, если build‑сторона фильтруется, dynamic filtering может уменьшить probe‑сторону. С меньшим входным объёмом соединения легче сделать broadcast.
  • При подключении через Hive‑коннектор (или Iceberg‑таблицы, которые используют Hive метастор) Trino поддерживает динамическое разделение (dynamic partition pruning) и bucket‑pruning, что помогает уменьшать ввод данных перед соединением.
  • Если таблицы большие и статистика доступна (через Iceberg метаданные или Hive статистику), Trino может выбрать типа broadcast если «меньшая» сторона после фильтров мал, либо partitioned если нет.

Рекомендованные алгоритмы соединения для Iceberg в Trino

Исходя из архитектуры и ограничений, можно сформулировать рекомендации:

  • Если одна из таблиц значительно меньше другой (после применения фильтров, dynamic pruning и т.д.), стоит использовать broadcast join — это быстрее, меньше сетевого обмена.
  • Если обе таблицы большие или распределены, и build‑сторона не помещается в память на узлах, использовать partitioned join.
  • Учитывать метаинформацию Iceberg (файлы, статистику) — чем лучше статистика, тем точнее выбор алгоритма и меньше данные, участвующие в соединении.
  • Использовать dynamic filtering и partition pruning — особенно важно для Iceberg: метаданные позволяют заранее отсеивать лишние файлы, уменьшив объём входных данных для соединения.
  • Настраивать параметры Trino — например join_max_broadcast_table_size, join_distribution_type, join_reordering_strategy, и др.

Ограничения и нюансы

Даже если таблица маленькая, если статистика недоступна или сильно распределена, Trino может выбрать partitioned join в режиме AUTOMATIC.

Broadcast требует, чтобы build‑сторона (после фильтров) помещалась в память всех узлов — если нет, узлы могут страдать от OOM или медленной работы.

Partitioned join требует сильной сети (shuffle), и если данные сильно скошены (skew) по ключам соединения, производительность может быть низкой.

При работе с Iceberg таблицами хранение в объектном хранилище (например S3) или HDFS может влиять на задержки получения файлов и метаданных, что может замедлить фазу подготовки данных к соединению.

Не все оптимизации (например dynamic filtering) могут поддерживаться одинаково для всех коннекторов и всех форматов файлов. Нужно проверять, поддерживает ли ваш коннектор (Iceberg + Hive) нужные функции.

Возможный план оптимизации запроса Trino к Iceberg

Рекомендации по улучшению запроса

  • Пересмотреть join‑план:
    • Убедиться, что меньшая таблица идёт в качестве build‑стороны.
    • Для больших таблиц рассмотреть partitioned join вместо broadcast join.
  • Использовать dynamic filtering / partition pruning:
    • Для Iceberg таблиц это особенно эффективно – фильтруются лишние файлы и партиции ещё до join, уменьшая размер probe‑стороны.
  • Разделять JOIN на этапы:
    • Если три таблицы очень большие, попробовать выполнить join двух таблиц, сохранить результат (CTE или временную таблицу), затем join с третьей.
  • Настроить Trino параметры:
    • join_max_broadcast_table_size – уменьшить лимит для broadcast join, чтобы большие таблицы не помещались в память.
    • query.max-memory и query.max-memory-per-node – увеличить лимит памяти на узел, если инфраструктура позволяет.
    • spill-to-disk-enabled – включить сброс промежуточных данных на диск (для partitioned join). LEGACY
  • Проверить skew:
    • Если есть скошенные ключи join (одни ключи встречаются очень часто), Trino можно настроить для балансировки данных или добавить фильтры на уровне подзапросов.
  • Использовать ANALYZE / статистику:
    • Для Iceberg это ANALYZE – оптимизатор будет точнее выбирать порядок join и build/probe.
  • Минимизировать количество колонок:
    • Использовать projection pushdown – считывать только нужные колонки из Iceberg таблиц, чтобы уменьшить размер в памяти.

Обзор Explain и Explain Analyze

EXPLAIN

Синтаксис

Где option может содержать:

Explain показывает логический или распределённый план выполнения оператора, либо проверяет оператор на корректность.

По умолчанию отображается распределённый план DISTRIBUTED. Каждый фрагмент (fragment) распределённого плана выполняется одним или несколькими узлами Trino. Разделение на фрагменты отражает обмен данными (exchange) между узлами Trino. Тип фрагмента (Fragment type) определяет, как именно фрагмент выполняется узлами Trino и как данные распределяются между фрагментами:

  • SINGLE: Фрагмент выполняется на одном узле.
  • HASH: Фрагмент выполняется на фиксированном количестве узлов, при этом входные данные распределяются с помощью хеш-функции.
  • ROUND_ROBIN: Фрагмент выполняется на фиксированном количестве узлов, при этом входные данные распределяются по принципу кругового распределения (round-robin).
  • BROADCAST: Фрагмент выполняется на фиксированном количестве узлов, при этом входные данные рассылаются (транслируются) на все узлы.
  • SOURCE: Фрагмент выполняется на тех узлах, где находятся входные splits, к которым осуществляется доступ.

EXPLAIN ANALYZE

Синтаксис

Выполняет оператор и показывает распределённый план его выполнения вместе со стоимостью (cost) каждой операции.

Опция VERBOSE выводит более подробную информацию и низкоуровневую статистику; для её понимания может потребоваться знание внутренних механизмов и деталей реализации Trino.

Примечание:
Статистика может быть не полностью точной, особенно для запросов, которые выполняются очень быстро.

Session properties

Синтаксис

Устанавливает значение свойства сессии или свойства сессии конкретного каталога.

Свойство сессии — это параметр конфигурации, который пользователь может временно изменить на время текущего подключения к кластеру Trino.
Многие параметры конфигурации имеют соответствующее свойство сессии, принимающее те же значения, что и конфигурационное свойство.

Существует два типа свойств сессии:

  • Системные свойства сессии (System session properties) — применяются ко всему кластеру. Большинство свойств сессии являются системными, если не указано иное.
  • Свойства сессии каталога (Catalog session properties) — это свойства, определённые коннектором, которые можно установить отдельно для каждого каталога. Такие свойства задаются с префиксом имени каталога, например: catalogname.property_name.

Свойства сессии привязаны к текущей сессии, поэтому пользователь может иметь несколько подключений к одному кластеру, и в каждом из них значения свойств могут отличаться.
После завершения сессии — при отключении или создании новой — все изменения, внесённые в свойства сессии, теряются.

Сброс на диск (Spill to disk)

В случае операций, требующих большого объёма памяти, Trino позволяет выгружать промежуточные результаты на диск. Цель этого механизма — обеспечить выполнение запросов, которым требуется больше памяти, чем разрешено лимитами на запрос или на узел.

Механизм похож на подкачку страниц (page swapping) на уровне операционной системы, однако реализован на уровне приложения, чтобы учитывать специфические потребности Trino.

Свойства, связанные с механизмом сброса на диск, описаны в разделе Spilling properties.

Предупреждение:
Функциональность сброса на диск является устаревшей (legacy) в Trino. Вместо неё рекомендуется использовать Fault-tolerant execution (устойчивое к сбоям выполнение) с политикой повторного выполнения задач (task retry policy) и настроенным Exchange manager.

Управление памятью и сброс

По умолчанию Trino завершает запросы, если потребление памяти во время их выполнения превышает значения параметров query_max_memory или query_max_memory_per_node.
Этот механизм обеспечивает справедливое распределение памяти между запросами и предотвращает взаимные блокировки (deadlocks), вызванные нехваткой памяти. Он эффективен, когда в кластере выполняется много небольших запросов, но приводит к принудительной остановке крупных запросов, выходящих за лимиты.

Чтобы избежать этой неэффективности, была введена концепция revocable memory (отзываемой памяти).
Запрос может использовать память, которая не учитывается в лимитах, однако менеджер памяти может отозвать её в любой момент. Когда память отзывается, исполнитель запроса (query runner) сбрасывает промежуточные данные из памяти на диск и продолжает их обработку позже.

На практике, если кластер простаивает и доступна вся память, ресурсоёмкий запрос может использовать всю память кластера.
Если же свободной памяти мало, тот же запрос может быть вынужден использовать диск для хранения промежуточных данных.
Запрос, вынужденный сбрасывать данные на диск, может выполняться в десятки раз дольше, чем запрос, полностью работающий в памяти.

Важно понимать, что включение функции сброса на диск не гарантирует успешного выполнения всех запросов, потребляющих много памяти. Возможна ситуация, когда Trino не сможет разбить промежуточные данные на достаточно маленькие части, и при попытке их загрузки снова произойдёт ошибка Out of memory.

Дисковое пространство для сброса

Сброс промежуточных результатов на диск и последующее их чтение — это дорогостоящие операции ввода-вывода (IO). Поэтому запросы, использующие сброс, часто ограничиваются скоростью диска.
Для повышения производительности рекомендуется указать несколько путей на разных локальных устройствах для хранения сброса (параметр spiller-spill-path в Spilling properties).

Системный диск не следует использовать для сброса, особенно если на нём работает JVM или записываются логи. Это может привести к нестабильности кластера. Также рекомендуется отслеживать загрузку (saturation) дисков, используемых для сброса.

Trino рассматривает пути сброса как независимые диски (JBOD), поэтому использовать RAID для этих целей не требуется.

Сжатие при сбросе

Если включено сжатие сброса с помощью свойства spill-compression-codec, то страницы, выгружаемые на диск, сжимаются перед записью.
Это снижает объём операций ввода-вывода, но увеличивает нагрузку на CPU из-за необходимости сжатия и распаковки данных.

Шифрование при сбросе

Если включено шифрование сброса (параметр spill-encryption-enabled в Spilling properties), содержимое сброса шифруется с помощью случайно сгенерированного ключа, уникального для каждого файла сброса.
Это повышает нагрузку на процессор и снижает скорость операций записи, но защищает выгруженные данные от несанкционированного восстановления.

При включённом шифровании рекомендуется уменьшить значение memory-revoking-threshold, чтобы компенсировать дополнительную задержку, связанную с шифрованием и дешифрованием данных при сбросе.

Поддерживаемые операции (Supported operations)

Не все операции в Trino поддерживают сброс на диск, и каждая из них обрабатывает его по-своему.
В настоящее время этот механизм реализован для следующих типов операций:

JOIN’ы

Во время выполнения операции JOIN одна из объединяемых таблиц сохраняется в памяти.
Эта таблица называется build table (строящаяся таблица).
Строки из другой таблицы (probe side) проходят <strong>потоком</strong> и передаются дальше, если они соответствуют строкам из build-таблицы.
Наиболее ресурсоёмкая по памяти часть JOIN — это именно build-таблица.

Когда параметр task.concurrency больше единицы, build-таблица разбивается на несколько партиций.
Количество партиций равно значению этого параметра.

Если build-таблица разбита на партиции, механизм сброса на диск может уменьшить пиковое использование памяти во время JOIN.
Когда запрос приближается к лимиту памяти, часть партиций build-таблицы сбрасывается на диск вместе со строками из другой таблицы, которые попадают в те же партиции.
Количество сброшенных партиций напрямую влияет на объём требуемого дискового пространства.

После этого сброшенные партиции читаются обратно по одной, чтобы завершить выполнение JOIN.

Благодаря этому механизму пиковое использование памяти оператором JOIN может быть уменьшено до размера наибольшей партиции build-таблицы.
При отсутствии перекоса данных (data skew) это примерно 1 / task.concurrency от общего размера build-таблицы.

Агрегации (Aggregations)

Агрегатные функции выполняют операции над группой значений и возвращают одно результирующее значение.
Если количество групп велико, то может потребоваться значительный объём памяти.
При включённом сбросе на диск, если памяти не хватает, промежуточные накопленные результаты агрегирования записываются на диск.
Позже они загружаются обратно и объединяются, используя меньше памяти.

Сортировка (ORDER BY)

При сортировке большого объёма данных также может понадобиться много памяти.
Если включён сброс на диск для ORDER BY, и памяти недостаточно, промежуточные отсортированные результаты записываются на диск.
Позже они читаются обратно и объединяются с меньшими затратами памяти.

Оконные функции (Window functions)

Оконные функции выполняют операцию над “окном” строк и возвращают одно значение для каждой строки.
Если окно содержит много строк, потребление памяти может быть значительным.
Когда включён сброс на диск для оконных функций, и памяти не хватает, промежуточные результаты записываются на диск, а затем подгружаются и объединяются, когда память снова доступна.

Однако есть ограничение: сброс не работает во всех случаях, например, если одно окно слишком велико и не может быть разбито на части.

Exchange (Обмен данными)

Термины, связанные с Exchange, описывают, как данные передаются между worker-узлами.
Передача может быть локальной или удалённой.

LocalExchange [exchange_type]

Передаёт данные локально, внутри одного worker’а, между различными стадиями выполнения запроса. Значение exchange_type может быть одним из типов обмена, описанных ниже.

RemoteExchange [exchange_type]

Передаёт данные между разными worker-узлами на разных этапах выполнения запроса.
Значение exchange_type также может быть одним из описанных ниже.

Типы логического Exchange (Logical Exchange types)

Тип Описание
GATHER Один узел собирает результаты со всех остальных. Например, финальная стадия SELECT собирает данные со всех узлов и записывает результат (например, в Amazon S3).
REPARTITION Распределяет строки между воркерами в соответствии с схемой партиционирования, требуемой для следующего оператора.
REPLICATE Копирует данные строк на все воркеры.

Scanning (Сканирование данных)

Эти термины описывают, как выполняется чтение данных во время запроса:

Тип Описание
TableScan Сканирует исходные данные таблицы и применяет partition pruning (отсечение ненужных партиций) на основе фильтра.
ScanFilter Сканирует исходные данные и применяет partition pruning на основе фильтра, а также дополнительные фильтры, которые нельзя применить на уровне партиций.
ScanFilterProject Сначала сканирует данные и применяет фильтры (как в ScanFilter), затем изменяет структуру (layout) выходных данных для оптимизации последующих этапов выполнения.

Fault-tolerant execution (Отказоустойчивое выполнение)

YouTube: The architecture behind fault tolerant execution | Starburst Virtual Events


По умолчанию, если узлу Trino не хватает ресурсов для выполнения задачи или он выходит из строя во время выполнения запроса, запрос завершается с ошибкой и должен быть запущен вручную повторно. Чем дольше выполняется запрос, тем выше вероятность того, что он столкнётся с подобными сбоями.

Отказоустойчивое выполнение или Fault-tolerant execution — это механизм в Trino, который позволяет кластеру смягчать последствия ошибок выполнения запросов путём повторного запуска запросов или отдельных задач при сбоях. При включённом отказоустойчивом выполнении промежуточные данные обмена (exchange data) сохраняются на диск и могут быть повторно использованы другим рабочим узлом в случае его отказа или любой другой ошибки во время выполнения запроса.

Примечание

Отказоустойчивость не применяется к неправильным запросам или другим ошибкам пользователя. Например, Trino не будет тратить ресурсы на повторный запуск запроса, который завершается ошибкой из-за невозможности разобрать SQL.

Пошаговое руководство по настройке кластера Trino с отказоустойчивым выполнением для повышения устойчивости обработки запросов можно найти в разделе Improve query processing resilience.

Конфигурация

Отказоустойчивое выполнение отключено по умолчанию. Чтобы включить эту функцию, установите конфигурационное свойство retry-policy в значение QUERY или TASK в зависимости от желаемой политики повторных попыток.


Предупреждение

Установка retry-policy может привести к сбоям запросов при использовании коннекторов, которые не поддерживают отказоустойчивое выполнение. В таких случаях возникает ошибка: “This connector does not support query retries”.


Поддержка отказоустойчивого выполнения SQL-операторов зависит от конкретного коннектора. Следующие коннекторы поддерживают отказоустойчивое выполнение:

  • BigQuery connector
  • Delta Lake connector
  • Hive connector
  • Iceberg connector
  • MariaDB connector
  • MongoDB connector
  • MySQL connector
  • Oracle connector
  • PostgreSQL connector
  • Redshift connector
  • SQL Server connector

Следующие конфигурационные свойства (Fault-tolerant execution configuration properties) управляют поведением отказоустойчивого выполнения в кластере Trino:

Property name

Description

Default

retry-policy Настраивает, что именно повторяется в случае сбоя: QUERY для повторного запуска всего запроса или TASK для индивидуального повторного запуска задач при их сбое. Смотрите раздел о политике повторов для дополнительной информации. Используйте эквивалентное свойство сессии retry_policy только на кластерах, настроенных для отказоустойчивого выполнения, и обычно только для деактивации с помощью NONE, поскольку переключение между режимами на кластере не тестируется. NONE
retry-policy.allowed Список политик повторов, разрешённых для настройки на данном кластере. Это свойство используется, чтобы предотвратить установку пользователем политики повторов, которая не предназначена для использования на конкретном кластере. NONE, QUERY, TASK
exchange.deduplication-buffer-size Размер данных в буфере, расположенном в памяти координатора, который используется отказоустойчивым выполнением для хранения выходных данных стадий запроса. Если этот буфер заполняется во время выполнения запроса, запрос завершится ошибкой “Exchange manager must be configured for the failure recovery capabilities to be fully functional”, если только не настроен exchange manager. 32MB
fault-tolerant-execution.exchange-encryption-enabled Включает шифрование данных, записываемых при спуллинге. Установка этого свойства в значение false не рекомендуется, если Trino обрабатывает конфиденциальные данные. true

Retry policy

Свойство конфигурации retry-policy или свойство сессии retry_policy определяет, будет ли Trino повторно выполнять весь запрос или отдельные задачи запроса в случае сбоя.

QUERY

Политика повторов QUERY указывает Trino автоматически повторно выполнять запрос в случае ошибки на рабочем узле. Политика QUERY рекомендуется, когда большая часть нагрузки кластера Trino состоит из множества небольших запросов.

По умолчанию Trino не реализует отказоустойчивость для запросов, чей результирующий набор превышает 32MB, например, для операторов SELECT, которые возвращают очень большой набор данных пользователю. Этот предел можно увеличить, изменив свойство конфигурации exchange.deduplication-buffer-size на значение больше стандартных 32MB, но это приводит к увеличению использования памяти на координаторе.

Чтобы включить отказоустойчивое выполнение для запросов с более крупным набором результатов, настоятельно рекомендуется настроить exchange manager, который использует внешнее хранилище для данных спуллинга, что позволяет хранить выгруженные данные за пределами размера буфера в памяти.

TASK

Политика повторов TASK указывает Trino повторно выполнять отдельные задачи запроса в случае сбоя. Чтобы использовать политику TASK, необходимо настроить exchange manager. Эта политика рекомендуется при выполнении больших пакетных запросов, поскольку кластер может более эффективно повторно выполнять небольшие задачи внутри запроса, а не повторять весь запрос.

Когда кластер настроен на использование политики TASK, некоторые связанные свойства конфигурации получают значения по умолчанию, соответствующие лучшим практикам для отказоустойчивого кластера. Однако это автоматическое изменение не влияет на кластеры, где эти свойства были настроены вручную. Если какие-либо из следующих свойств настроены в файле config.properties на кластере с политикой TASK, настоятельно рекомендуется установить для свойства управления запросами task.low-memory-killer.policy значение total-reservation-on-blocked-nodes, иначе запросы могут потребовать ручного завершения, если кластер исчерпает память.

Note
Политика TASK лучше всего подходит для больших пакетных запросов, но она может приводить к увеличению задержки для коротких запросов, выполняемых в большом количестве. Как лучшая практика, рекомендуется запускать выделенный кластер с политикой TASK для больших пакетных запросов, отдельно от другого кластера, который обрабатывает короткие запросы.

Encryption

Trino шифрует данные перед спуллингом в хранилище. Это предотвращает доступ к данным запроса для кого-либо, кроме кластера Trino, который их записал, включая администраторов системы хранения. Новый ключ шифрования случайным образом генерируется для каждого запроса, и ключ удаляется после завершения запроса.

Advanced configuration

Вы можете дополнительно настроить отказоустойчивое выполнение с помощью следующих свойств конфигурации. Значения по умолчанию для этих свойств должны подходить для большинства развертываний, но вы можете изменить их в целях тестирования или устранения неполадок.

Retry limits

Следующие свойства конфигурации контролируют пороги, при которых запросы или задачи перестают повторяться в случае повторяющихся сбоев:

Property name Description Default value Retry policy
query-retry-attempts Максимальное количество попыток, которые Trino может выполнить для повторного запуска запроса, прежде чем объявить запрос неудавшимся. 4 Only QUERY
task-retry-attempts-per-task Максимальное количество попыток, которые Trino может выполнить для повторного запуска одной задачи, прежде чем объявить запрос неудавшимся. 4 Only TASK
retry-initial-delay Минимальное время, которое должен ожидать сбойный запрос или задача перед повторной попыткой. Может быть переопределено с помощью свойства сессии retry_initial_delay. 10s QUERY and TASK
retry-max-delay Максимальное время ожидания перед повторной попыткой для сбойного запроса или задачи. Время ожидания увеличивается после каждого последующего сбоя. Может быть переопределено с помощью свойства сессии retry_max_delay. 1m QUERY and TASK
retry-delay-scale-factor Множитель, на который увеличивается задержка повторной попытки при каждом сбое запроса или задачи. Может быть переопределено свойством сессии retry_delay_scale_factor. 2.0 QUERY and TASK

Размер задач (Task sizing)

При политике повторных попыток TASK важно управлять объемом данных, обрабатываемых в каждой задаче. Если задачи слишком маленькие, то координация задач может занимать больше времени и ресурсов, чем выполнение самой задачи. Если задачи слишком большие, одна задача может требовать больше ресурсов, чем доступно на одном узле, и тем самым не позволит запросу завершиться.

Trino поддерживает ограниченную автоматическую настройку размера задач. Если во время выполнения задач с отказоустойчивостью возникают проблемы, можно настроить следующие параметры конфигурации для ручного управления размером задач. Эти параметры конфигурации применяются только при политике повторов TASK.

Property name Description Default value
fault-tolerant-execution-standard-split-size Стандартный размер данных split, обрабатываемых задачами, которые читают данные из исходных таблиц. Значение интерпретируется с учётом веса split. Если каталог указывает, что создаваемые split являются «легче» или «тяжелее» стандартных, количество split, обрабатываемых задачей, корректируется соответственно. Можно переопределить в текущей сессии с помощью параметра fault_tolerant_execution_standard_split_size. 64MB
fault-tolerant-execution-max-task-split-count Максимальное количество split, обрабатываемых одной задачей. Значение не учитывает вес split и служит защитой на случай, если каталог сообщает неправильный вес split. Можно переопределить в текущей сессии с помощью параметра fault_tolerant_execution_max_task_split_count. 2048
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period Количество задач, создаваемых для любого этапа чтения (non-writer) при произвольном распределении, прежде чем увеличивается размер задачи. 64
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor Коэффициент роста для адаптивного увеличения размера задач-чтения при произвольном распределении. Нижний предел — 1.0. При каждом увеличении новый целевой размер задачи = старый размер × этот коэффициент. 1.26
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min Начальный/минимальный целевой размер входных данных для задач-чтения при произвольном распределении. 512MB
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max Максимальный целевой размер входных данных для задач-чтения при произвольном распределении. 50GB
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period Количество задач, создаваемых для любого этапа записи (writer) при произвольном распределении, прежде чем увеличивается размер задачи. 64
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor Коэффициент роста для адаптивного увеличения размера задач-записи при произвольном распределении. Нижний предел — 1.0. Новый размер = старый × коэффициент. 1.26
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min Начальный/минимальный целевой размер входных данных для задач-записи при произвольном распределении. 4GB
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max Максимальный целевой размер входных данных для задач-записи при произвольном распределении. 50GB
fault-tolerant-execution-hash-distribution-compute-task-target-size Целевой размер входных данных для задач-чтения при хеш-распределении. 512MB
fault-tolerant-execution-hash-distribution-write-task-target-size Целевой размер входных данных для задач-записи при хеш-распределении. 4GB
fault-tolerant-execution-hash-distribution-write-task-target-max-count Мягкая верхняя граница количества задач-записи на этапе хеш-распределения. 2000

Назначение узлов (Node allocation)

При политике повторов TASK узлы назначаются задачам на основе доступной памяти и предполагаемого использования памяти.

Если задача завершается неудачей из-за превышения доступной памяти на узле, она перезапускается с запросом выделить под неё весь узел.

Начальная оценка требований задачи по памяти является статической и задаётся параметром конфигурации fault-tolerant-execution-task-memory.
Этот параметр используется только при политике повторов TASK.

Property name Description Default value
fault-tolerant-execution-task-memory Начальная оценка объёма данных в памяти, требуемых задачей, используемая для распределения задач по узлам (bin-packing). Может быть переопределена в текущей сессии с помощью параметра fault_tolerant_execution_task_memory. 5GB

Прочая настройка

Следующий параметр может использоваться для управления отказоустойчивым выполнением:

Property name Description Default value Retry policy
fault-tolerant-execution-task-descriptor-storage-max-memory Максимальный объём памяти для хранения дескрипторов задач на координаторе для отказоустойчивых запросов. Дополнительная память требуется, чтобы позволить переназначение задач при сбоях. (Размер heap JVM * 0.15) Only TASK
fault-tolerant-execution-max-partition-count Максимальное количество партиций для распределённых join и агрегаций. Аналогично свойству query.max-hash-partition-count. Не рекомендуется увеличивать значение выше 50, поскольку это может привести к нестабильности и снижению производительности. Может быть переопределено с помощью параметра fault_tolerant_execution_max_partition_count. 50 Only TASK
fault-tolerant-execution-min-partition-count Минимальное количество партиций для распределённых join и агрегаций. Аналогично query.min-hash-partition-count. Может быть переопределено параметром fault_tolerant_execution_min_partition_count. 4 Only TASK
fault-tolerant-execution-min-partition-count-for-write Минимальное количество партиций для распределённых join и агрегаций в запросах записи. Аналог query.min-hash-partition-count-for-write. Может быть переопределено параметром fault_tolerant_execution_min_partition_count_for_write. 50 Only TASK
max-tasks-waiting-for-node-per-query Максимальное количество задач, которые могут ожидать выделения узла для одного запроса, прежде чем планирование других задач этого запроса будет приостановлено. 50 Only TASK

Менеджер обмена (Exchange manager)

Буферизация обмена (exchange spooling) отвечает за хранение и управление сброшенными (spooled) данными для отказоустойчивого выполнения.

Вы можете настроить менеджер обмена, основанный на файловой системе, который сохраняет сброшенные данные в указанном месте, таком как AWS S3 и совместимые с S3 системы, Azure Blob Storage, Google Cloud Storage, Alluxio или HDFS.

Конфигурация

Чтобы настроить менеджер обмена, создайте новый конфигурационный файл etc/exchange-manager.properties на координаторе и всех рабочих узлах.
В этом файле установите параметр конфигурации exchange-manager.name в значение filesystem или hdfs, а затем задайте дополнительные параметры конфигурации в зависимости от используемого решения для хранения данных.

Следующая таблица перечисляет доступные параметры конфигурации для файла exchange-manager.properties, их значения по умолчанию и файловые системы, для которых может быть настроен каждый параметр:

Property name Description Default value Supported filesystem
exchange.base-directories Разделённый запятыми список URI-локаций, которые менеджер обмена использует для хранения данных спулинга. Любая
exchange.sink-buffer-pool-min-size Минимальный размер пула буферов для приёмника (sink) обмена. Чем больше размер пула, тем выше параллелизм записи и использование памяти. 10 Любая
exchange.sink-buffers-per-partition Количество буферов на партицию в пуле буферов. Чем больше размер пула, тем выше параллелизм записи и использование памяти. 2 Любая
exchange.sink-max-file-size Максимальный размер данных файлов, записываемых приёмниками обмена. 1GB Любая
exchange.source-concurrent-readers Количество параллельных читателей для чтения из spooling-хранилища. Большее число увеличивает параллелизм чтения и использование памяти. 4 Любая
exchange.s3.aws-access-key AWS access key. Требуется для подключения к AWS S3 и GCS, можно игнорировать для других S3-совместимых систем. AWS S3, GCS
exchange.s3.aws-secret-key AWS secret key. Требуется для подключения к AWS S3 и GCS, можно игнорировать для других S3-совместимых систем. AWS S3, GCS
exchange.s3.iam-role IAM-роль, которую необходимо принять. AWS S3, GCS
exchange.s3.external-id External ID для политики доверия IAM-роли. AWS S3, GCS
exchange.s3.region Регион S3-бакета. AWS S3, GCS
exchange.s3.endpoint Endpoint S3-хранилища при использовании S3-совместимой системы, не относящейся к AWS. Для AWS S3 можно игнорировать, если только политика бакета не требует HTTPS. При необходимости TLS установить https-endpoint, например: https://s3.us-east-1.amazonaws.com. Для GCS указать: https://storage.googleapis.com. Любое S3-совместимое хранилище
exchange.s3.max-error-retries Максимальное количество повторов запроса клиентом S3 менеджера обмена. 10 Любое S3-совместимое хранилище
exchange.s3.path-style-access Включает использование path-style доступа для всех запросов к S3. false Любое S3-совместимое хранилище
exchange.s3.upload.part-size Размер части (part) для multipart-загрузки S3. 5MB Любое S3-совместимое хранилище
exchange.gcs.json-key-file-path Путь к JSON-файлу, содержащему ключ сервисного аккаунта Google Cloud Platform. Не указывать вместе с exchange.gcs.json-key. GCS
exchange.gcs.json-key Ключ сервисного аккаунта GCP в формате JSON. Не указывать вместе с exchange.gcs.json-key-file-path. GCS
exchange.azure.endpoint Endpoint Azure Blob, используемый для доступа к контейнеру спулинга. Не указывать вместе с exchange.azure.connection-string. Azure Blob Storage
exchange.azure.connection-string Строка подключения, используемая для доступа к контейнеру спулинга. Не указывать вместе с exchange.azure.endpoint. Azure Blob Storage
exchange.azure.block-size Размер блока данных для параллельной загрузки Azure block blob. 4MB Azure Blob Storage
exchange.azure.max-error-retries Максимальное количество повторов запроса Azure-клиентом менеджера обмена. 10 Azure Blob Storage
exchange.alluxio.block-size Размер блока данных для Alluxio-хранилища. 4MB Alluxio
exchange.alluxio.site-file-path Путь к файлу конфигурации Alluxio (например, /etc/alluxio-site.properties). Файл должен существовать на всех узлах кластера Trino. Следуйте документации по конфигурации клиента Alluxio. Alluxio
exchange.hdfs.block-size Размер блока данных для HDFS. 4MB HDFS
exchange.hdfs.skip-directory-scheme-validation Пропустить проверку схемы директории для поддержки HDFS-совместимых файловых систем. false HDFS
hdfs.config.resources Разделённый запятыми список путей к файлам конфигурации HDFS, например /etc/hdfs-site.xml. Файлы должны существовать на всех узлах кластера Trino. HDFS

Чтобы снизить общую нагрузку на ввод-вывод у менеджера обмена, значение конфигурационного параметра exchange.compression-codec по умолчанию установлено в LZ4. Кроме того, сжатие и распаковка файлов выполняются автоматически, а некоторые детали могут быть сконфигурированы.

Также рекомендуется настроить правило жизненного цикла (lifecycle rule) для бакета, чтобы автоматически удалять заброшенные объекты в случае сбоя узла.

AWS S3

Следующий пример файла exchange-manager.properties задаёт бакет AWS S3 в качестве места хранения данных спулинга. Обратите внимание, что место назначения не обязательно должно находиться в AWS — это может быть любая S3-совместимая система хранения. Хотя менеджер обмена рассчитан на поддержку S3-совместимых систем, только AWS S3 и MinIO протестированы на совместимость. Для других систем необходимо проводить собственное тестирование и консультироваться с поставщиком.

Можно настроить несколько S3-бакетов для распределения данных спулинга, уменьшая I/O-нагрузку на один бакет. Если запрос завершается ошибкой:

“software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your request rate”

это означает, что рабочая нагрузка является I/O-интенсивной, и вам следует указать несколько бакетов в exchange.base-directories, чтобы распределить нагрузку:

Azure Blob Storage

Следующий пример файла exchange-manager.properties задаёт контейнер Azure Blob Storage в качестве места хранения данных спулинга. Нужно использовать именно Azure Blob Storage, а не Azure Data Lake Storage или другую иерархическую систему хранения в Azure.

Google Cloud Storage

Чтобы включить спулинг обмена в GCS в Trino, измените endpoint запросов на URI https://storage.googleapis.com
и настройте AWS access/secret keys, используя HMAC-ключи GCS. Если вы развернули Trino в GCP, необходимо либо создать сервисный аккаунт с доступом к вашему бакету спулинга, либо указать путь к файлу учётных данных GCS.

Следующий пример exchange-manager.properties задаёт бакет GCS в качестве места хранения данных спулинга:

Alluxio

Следующий пример exchange-manager.properties задаёт Alluxio в качестве места хранения данных спулинга:

HDFS

Следующий пример конфигурации exchange-manager.properties указывает HDFS как место для spooling-хранилища:

Когда вы хотите использовать совместимую с Hadoop файловую систему в качестве spooling-хранилища, вам нужно включить параметр exchange.hdfs.skip-directory-scheme-validation в exchange-manager.properties, если вы настраиваете exchange.base-directories с использованием специфичной схемы (не hdfs).

Также могут понадобиться следующие шаги:

  • Настроить реализацию AbstractFileSystem в core-site.xml.
  • Добавить соответствующие клиентские JAR-файлы в каталог ${Trino_HOME}/plugin/exchange-hdfs на всех узлах кластера Trino.
  • Локальное файловое хранилище

Следующий пример конфигурации exchange-manager.properties указывает локальный каталог /tmp/trino-exchange-manager как spooling-хранилище:

Примечание:
Использование локальной файловой системы для exchange рекомендуется только в автономных (standalone), непроизводственных кластерах.
Локальный каталог можно использовать в распределённом кластере только если он является общим и доступен со всех узлов.

Адаптивные оптимизации плана

Режим fault-tolerant execution предоставляет несколько адаптивных оптимизаций плана, которые динамически изменяют план выполнения запроса на основе статистики во время выполнения.

Повышение устойчивости обработки запросов

Вы можете настроить Trino так, чтобы он был более устойчивым к сбоям во время обработки запросов, включив режим fault-tolerant execution. Это позволяет Trino выполнять более крупные запросы, такие как пакетные операции, без падения запроса при сбоях рабочих узлов.

Когда этот режим включён, кластер Trino буферизует данные, используемые рабочими узлами во время обработки запроса. Если обработка на рабочем узле по какой-либо причине завершается неудачей — например, из-за сетевого сбоя или нехватки ресурсов — координатор переназначает выполнение неудавшегося фрагмента работы другому рабочему узлу. Это позволяет продолжать выполнение запроса, используя буферизованные данные.

Архитектура

Узел-координатор использует настроенный сервис exchange manager, который буферизует данные во время обработки запроса во внешнем хранилище, например в S3-бакете. Рабочие узлы отправляют данные в буфер во время выполнения своих задач.

Рекомендации и особенности

Кластер с fault-tolerant execution лучше всего подходит для больших пакетных запросов. Пользователи могут столкнуться с задержками или похожими эффектами при большом количестве коротких запросов. Поэтому рекомендуется использовать отдельный кластер с включённым fault-tolerant режимом для пакетных операций, отдельно от кластера, работающего с высокой нагрузкой по количеству запросов.

Каталоги, использующие следующие коннекторы, поддерживают fault-tolerant выполнение операций чтения и записи:

  • Delta Lake connector
  • Hive connector
  • Iceberg connector
  • MySQL connector
  • PostgreSQL connector
  • SQL Server connector

Каталоги, использующие другие коннекторы, поддерживают fault-tolerant только для операций чтения. При включенном fault-tolerant режиме операции записи на таких каталогах будут завершаться ошибкой.

Exchange-менеджер может отправлять большие объёмы данных в хранилище exchange, что приводит к высокой нагрузке на I/O. Чтобы сбалансировать нагрузку, вы можете настроить несколько хранилищ для exchange.

Конфигурация

Ниже приведены шаги настройки кластера Trino с fault-tolerant execution, используя S3-совместимое хранилище для exchange.

Настройте S3-бакет, который будет использоваться как exchange-хранилище. В примере используется AWS S3, но в документации также описаны другие варианты. Можно использовать несколько бакетов.

Для каждого бакета в AWS получите следующую информацию:

  • S3 URI бакета, например: s3://exchange-spooling-bucket
  • Регион бакета, например: us-west-1
  • AWS access key и secret key для доступа

Для Kubernetes-развёртывания Trino добавьте в Helm chart следующую конфигурацию exchange-менеджера в секции server.exchangeManager и additionalExchangeManagerProperties, используя данные о S3:

В установках вне Kubernetes те же свойства должны быть указаны в файле exchange-manager.properties на координаторе и всех рабочих узлах.

Добавьте следующую конфигурацию для fault-tolerant execution в секцию additionalConfigProperties Helm chart:

В установках вне Kubernetes это свойство должно быть добавлено в файл config.properties на координаторе и рабочих узлах.

Разверните кластер повторно или перезапустите его (для non-Kubernetes установок).

Теперь ваш кластер Trino настроен для fault-tolerant выполнения запросов. Если выполнение запроса обычно прерывалось бы из-за сбоя, fault-tolerant execution возобновит обработку, чтобы обеспечить успешное завершение запроса.

Task properties

task.concurrency

  • Тип: integer (целое число)
  • Ограничения: значение должно быть степенью двойки
  • Значение по умолчанию: количество физических CPU на узле,
    с минимальным значением 2 и максимальным 32. В режиме Fault-tolerant execution (устойчивого выполнения) по умолчанию — 8.
  • Сессионное свойство: task_concurrency

Определяет стандартный уровень локальной параллельности для операторов, таких как JOIN и AGGREGATION. Это значение следует увеличивать или уменьшать в зависимости от количества параллельно выполняющихся запросов и загрузки ресурсов worker-узлов.

Низкие значения подходят для кластеров, где одновременно выполняется много запросов,
так как кластер уже загружен, и увеличение параллелизма приведёт к замедлению из-за переключения контекста и прочих накладных расходов.

Высокие значения лучше для кластеров, где выполняется только один или несколько запросов одновременно.

task.http-response-threads

  • Тип: integer (целое число)
  • Минимальное значение: 1
  • Значение по умолчанию: 100

Определяет максимальное количество потоков, которые могут быть созданы для обработки HTTP-ответов. Потоки создаются по мере необходимости и удаляются, когда простаивают, поэтому большое значение не создаёт избыточной нагрузки, если запросов немного.

Большее количество потоков может быть полезно в кластерах с большим числом параллельных запросов или в кластерах с сотнями или тысячами worker-узлов.

task.http-timeout-threads

  • Тип: integer (целое число)
  • Минимальное значение: 1
  • Значение по умолчанию: 3

Количество потоков, используемых для обработки тайм-аутов при формировании HTTP-ответов.
Это значение следует увеличить, если все потоки часто находятся в использовании.
Нагрузку можно отслеживать через объект JMX:
trino.server:name=AsyncHttpExecutionMBean:TimeoutExecutor
Если значение ActiveCount всегда равно PoolSize, значит, нужно увеличить число потоков.

task.info-update-interval

  • Тип: duration (время, длительность)
  • Минимальное значение: 1 мс
  • Максимальное значение: 10 с
  • Значение по умолчанию: 3 с

Определяет, как часто обновляется информация о задачах (task information), которая используется при планировании выполнения.
Большее значение уменьшает нагрузку на CPU координатора (coordinator), но может привести к менее оптимальному распределению сплитов (split scheduling).

task.max-drivers-per-task

  • Тип: integer (целое число)
  • Минимальное значение: 1
  • Значение по умолчанию: 2 147 483 647 (максимум для int)

Определяет максимальное количество драйверов (drivers), которые могут выполняться в задаче (task) одновременно. Установка ограничения снижает вероятность того, что задача создаст слишком много драйверов, что может улучшить производительность при параллельном выполнении нескольких запросов.

Однако слишком малое значение может привести к неэффективному использованию ресурсов,
если одновременно выполняется слишком мало запросов.

task.max-partial-aggregation-memory

  • Тип: data size (объём данных)
  • Значение по умолчанию: 16 MB

Определяет максимальный размер промежуточных (partial) результатов агрегирования при распределённых агрегациях. Увеличение этого значения позволяет хранить больше групп локально перед их сбросом, что снижает сетевой трафик и нагрузку на CPU, но увеличивает использование памяти.

task.max-worker-threads

  • Тип: integer (целое число)
  • Значение по умолчанию: (количество CPU на узле × 2)

Устанавливает количество потоков, используемых воркерами (workers) для обработки сплитов (splits). Если загрузка CPU на воркере низкая, и все потоки используются, увеличение этого параметра может повысить пропускную способность. Однако это приводит к увеличению использования heap-памяти, а слишком высокое значение может снизить производительность из-за частого переключения контекста.

Количество активных потоков можно увидеть в свойстве RunningSplits объекта JMX:
trino.execution.executor:name=TaskExecutor.RunningSplits

task.min-drivers

  • Тип: integer (целое число)
  • Значение по умолчанию: (task.max-worker-threads × 2)

Определяет целевое количество выполняющихся leaf splits (листовых сплитов) на воркере. Это минимальное значение, так как каждая leaf-задача гарантированно выполняет как минимум 3 сплита. Нелистовые задачи также выполняются, чтобы избежать взаимоблокировок (deadlocks).

Меньшее значение может улучшить отзывчивость для новых задач, но может привести к недоиспользованию ресурсов. Большее значение повышает загрузку ресурсов, но требует больше памяти.

task.min-drivers-per-task

  • Тип: integer (целое число)
  • Минимальное значение: 1
  • Значение по умолчанию: 3

Определяет минимальное количество драйверов (drivers), которые гарантированно выполняются одновременно для одной задачи (task), если у неё есть оставшиеся сплиты для обработки.

task.min-writer-count

  • Тип: integer (целое число)
  • Значение по умолчанию: 1
  • Сессионное свойство: task_min_writer_count

Определяет количество параллельных потоков записи (writer threads) на одного воркера для одного запроса, если не используется предпочтительное партиционирование (preferred partitioning) и масштабирование записывающих потоков (task writer scaling).

Увеличение этого значения может ускорить запись, особенно если запрос не ограничен I/O и может эффективно использовать дополнительный CPU для параллельной записи.

Однако некоторые коннекторы могут упираться в CPU при записи (например, из-за сжатия).
Слишком высокое значение может перегрузить кластер, особенно если движок вставляет данные в партиционированную таблицу без preferred partitioning — в этом случае каждый поток записи может писать во все партиции, что способно вызвать ошибку Out of memory, так как запись в партицию требует выделения памяти под буферизацию.

Exchange properties

Обмены (Exchanges) передают данные между узлами Trino на разных стадиях выполнения запроса.
Настройка этих параметров может помочь устранить проблемы коммуникации между узлами
или улучшить использование сетевых ресурсов.

Дополнительно можно настроить использование HTTP-клиентов обмена (exchange HTTP client).

exchange.client-threads

  • Тип: integer (целое число)
  • Минимальное значение: 1
  • Значение по умолчанию: 25

Определяет количество потоков, используемых exchange-клиентами для получения данных с других узлов Trino.
Большее значение может повысить производительность на крупных кластерах или при высокой степени параллельности (high concurrency). Однако слишком высокое значение может, наоборот, снизить производительность из-за частых переключений контекста и увеличенного потребления памяти.

exchange.concurrent-request-multiplier

  • Тип: integer (целое число)
  • Минимальное значение: 1
  • Значение по умолчанию: 3

Этот множитель определяет, сколько одновременных запросов может выполняться в зависимости от доступной памяти буфера (buffer memory).
Максимальное количество запросов вычисляется эвристически, на основе количества клиентов, которые могут поместиться в доступное место буфера, умноженного на этот множитель.

Пример:
Если exchange.max-buffer-size = 32 MB, уже занято 20 MB, а средний размер запроса — 2 MB, то максимальное количество клиентов = multiplier * ((32MB - 20MB) / 2MB) = multiplier * 6.

Настройка этого параметра изменяет поведение этой эвристики — увеличение значения может повысить параллельность и улучшить сетевую загрузку.

exchange.compression-codec

  • Тип: string (строка)
  • Допустимые значения: NONE, LZ4, ZSTD
  • Значение по умолчанию: NONE

Определяет кодек сжатия, который используется при сжатии и распаковке файлов во время обмена данными между узлами или при взаимодействии с хранилищем обмена (exchange storage) в режиме Fault-tolerant execution.

exchange.data-integrity-verification

  • Тип: string (строка)
  • Допустимые значения: NONE, ABORT, RETRY
  • Значение по умолчанию: ABORT

Определяет поведение системы при проблемах с целостностью данных. По умолчанию режим ABORT приводит к прерыванию запроса, если во время встроенной проверки обнаружены ошибки целостности данных.

  • NONE — отключает проверку целостности.
  • ABORT — останавливает выполнение запроса при обнаружении ошибок.
  • RETRY — при обнаружении ошибки повторяет попытку обмена данными.

exchange.max-buffer-size

  • Тип: data size (объём данных)
  • Значение по умолчанию: 32MB

Определяет размер буфера в клиенте обмена (exchange client), который хранит данные, полученные с других узлов, до их обработки.

Больший размер буфера может повысить пропускную способность сети в крупных кластерах и сократить время выполнения запроса, но при этом уменьшает объём памяти, доступный для других задач.

exchange.max-response-size

  • Тип: data size (объём данных)
  • Минимальное значение: 1MB
  • Значение по умолчанию: 16MB

Устанавливает максимальный размер ответа, возвращаемого при обмене данными между узлами. Ответ помещается в буфер клиента обмена, который делится между всеми параллельными запросами.

Увеличение значения может улучшить сетевую пропускную способность при высокой сетевой задержке (latency).

Уменьшение значения может улучшить производительность на крупных кластерах, так как снижает дисбаланс нагрузки (skew) — буфер сможет одновременно обрабатывать больше задач, вместо того чтобы удерживать большие объемы данных от меньшего числа задач.

sink.max-buffer-size

  • Тип: data size (объём данных)
  • Значение по умолчанию: 32MB

Размер выходного буфера для данных задачи (task), которые ожидают, пока их заберут вышестоящие задачи (upstream tasks). Если вывод задачи разделён по хэшу (hash partitioned), этот буфер общий для всех потребителей этих разделов.

Увеличение значения может повысить сетевую пропускную способность при передаче данных между стадиями запроса — особенно если высокая сетевой задержка (latency) или в кластере много узлов.

sink.max-broadcast-buffer-size

  • Тип: data size (объём данных)
  • Значение по умолчанию: 200MB

Размер буфера широковещательной (broadcast) передачи для данных задачи, которые ждут, пока их заберут вышестоящие задачи. Этот буфер используется для хранения и передачи данных со стороны build в операциях реплицированных (broadcast) join’ов.

Если буфер слишком мал, это может ограничить масштабирование задач со стороны probe в join-операциях, когда в кластер добавляются новые узлы.

Resource management properties

query.max-cpu-time

  • Тип: duration (длительность)
  • Значение по умолчанию: 1_000_000_000d

Максимальное количество CPU-времени, которое запрос может использовать во всём кластере. Если запрос превышает этот лимит, он принудительно завершается.

query.max-memory-per-node

  • Тип: heap size (объём памяти кучи)
  • Значение по умолчанию: 30% от максимального размера кучи (heap) на узле

Максимальный объём пользовательской памяти, которую запрос может использовать на одном рабочем узле (worker).

Пользовательская память — это память, выделяемая во время выполнения запроса для объектов, которые напрямую связаны с самим запросом.

Примеры:

  • хэш-таблицы, создаваемые во время выполнения;
  • память, используемая при сортировке и агрегациях.

Если использование памяти для запроса на любом узле достигает этого лимита — запрос завершается.

Предупреждение:
Сумма значений query.max-memory-per-node и memory.heap-headroom-per-node должна быть меньше максимального размера кучи JVM на узле.

Примечание:
Не применяется для запросов с включёнными повторными попытками задач (retry-policy=TASK).

query.max-memory

  • Тип: data size (объём данных)
  • Значение по умолчанию: 20GB

Максимальный объём пользовательской памяти, которую запрос может использовать во всём кластере. Это память, выделяемая для операций, связанных с конкретным запросом, например, хэш-таблицы, сортировки, буферы агрегаций и т.д.

Если использование памяти запросом на всех узлах вместе превышает этот лимит — запрос завершается.

Предупреждение:
query.max-total-memory должно быть больше, чем query.max-memory.

Примечание:
Не применяется для запросов с retry-policy=TASK.

query.max-total-memory

  • Тип: data size (объём данных)
  • Значение по умолчанию: (query.max-memory * 2)

Максимальный объём всей памяти, которую запрос может использовать в кластере, включая revocable memory — то есть память, которую система может отобрать при нехватке ресурсов.

Если общий объём выделенной памяти превышает этот лимит — запрос завершается.

Предупреждение:
query.max-total-memory должно быть больше, чем query.max-memory.

Примечание:
Не применяется для запросов с retry-policy=TASK.

memory.heap-headroom-per-node

  • Тип: heap size (объём памяти кучи)
  • Значение по умолчанию: 30% от максимального размера кучи JVM на узле

Это объём памяти, который резервируется в куче JVM в качестве запаса (headroom) для операций, не отслеживаемых самим Trino.

Предупреждение:
Сумма query.max-memory-per-node и memory.heap-headroom-per-node должна быть меньше максимального размера кучи JVM на узле.

exchange.deduplication-buffer-size

  • Тип: data size (объём данных)
  • Значение по умолчанию: 32MB

Размер буфера, используемого для временных данных (spooled data) во время Fault-tolerant execution — режима выполнения с устойчивостью к сбоям.

Query management properties

query.client.timeout

  • Тип: duration
  • Значение по умолчанию: 5m

Определяет, как долго кластер может работать без связи с клиентским приложением (например, CLI), прежде чем сочтёт запрос «заброшенным» и отменит его выполнение.

query.execution-policy

  • Тип: string
  • Значение по умолчанию: phased
  • Сессионное свойство: execution_policy

Определяет алгоритм организации выполнения всех стадий (stages) запроса.

Доступны следующие политики выполнения:

  • phased — выполняет стадии последовательно, чтобы избежать блокировок из-за зависимостей между стадиями. Этот режим обеспечивает максимальное использование ресурсов кластера и минимальное общее время выполнения запроса.
  • all-at-once — запускает все стадии запроса одновременно. В начале это даёт высокую загрузку ресурсов, но из-за зависимостей между стадиями часть работы простаивает, что увеличивает время ожидания и общее время выполнения запроса.

query.determine-partition-count-for-write-enabled

  • Тип: boolean
  • Значение по умолчанию: false
  • Сессионное свойство: determine_partition_count_for_write_enabled

Включает возможность определять количество партиций для операций записи на основе объёма данных, прочитанных и обработанных запросом.

query.max-hash-partition-count

  • Тип: integer
  • Значение по умолчанию: 100
  • Сессионное свойство: max_hash_partition_count

Задает максимальное количество партиций, используемых при выполнении распределённых операций — таких как JOIN, AGGREGATION, partitioned window functions и других.

query.min-hash-partition-count

  • Тип: integer
  • Значение по умолчанию: 4
  • Сессионное свойство: min_hash_partition_count

Минимальное количество партиций, используемых при выполнении распределённых операций, таких как JOIN, AGGREGATION, partitioned window functions и других.

query.min-hash-partition-count-for-write

  • Тип: integer
  • Значение по умолчанию: 50
  • Сессионное свойство: min_hash_partition_count_for_write

Минимальное количество партиций, используемых при выполнении распределённых операций в записывающих запросах (INSERT, CREATE TABLE AS SELECT, EXECUTE и т. д.), например JOIN, AGGREGATION или оконных функций с партиционированием.

query.max-writer-task-count

  • Тип: integer
  • Значение по умолчанию: 100
  • Сессионное свойство: max_writer_task_count

Максимальное количество задач (tasks), участвующих в записи данных во время выполнения запросов INSERT, CREATE TABLE AS SELECT и EXECUTE.

Это ограничение применяется только, если включена одна из опций:

  • redistribute-writes, или
  • scale-writers.

query.low-memory-killer.policy

  • Тип: string
  • Значение по умолчанию: total-reservation-on-blocked-nodes

Определяет поведение кластера при нехватке памяти, когда необходимо принудительно завершить один или несколько запросов.

Поддерживаемые значения:

  • none — не завершать запросы при нехватке памяти.
  • total-reservation — завершить запрос, который использует больше всего памяти в данный момент.
  • total-reservation-on-blocked-nodes — завершить запрос, который использует больше всего памяти именно на узлах, где закончилась память.

💡 Примечание:
Данный параметр применяется только для запросов, у которых отключены повторные попытки на уровне задач (retry-policy установлено в NONE или QUERY).

task.low-memory-killer.policy

  • Тип: string
  • Значение по умолчанию: total-reservation-on-blocked-nodes

Определяет поведение системы при нехватке памяти — каким образом Trino будет завершать выполняющиеся задачи (tasks).

Поддерживаемые значения:

  • none — не завершать задачи при нехватке памяти.
  • total-reservation-on-blocked-nodes — завершать задачи, которые принадлежат запросам с включёнными повторными попытками на уровне задач (task retries) и которые используют больше всего памяти на узлах, где память закончилась.
  • least-waste — завершать задачи, принадлежащие запросам с включёнными task retries, которые потребляют значительное количество памяти на узлах с нехваткой памяти, но избегать завершения задач, которые выполняются уже долго, чтобы не тратить зря проделанную работу.

Примечание:
Применяется только для запросов с включёнными повторными попытками на уровне задач (retry-policy=TASK).

query.max-execution-time

  • Тип: duration
  • Значение по умолчанию: 100d
  • Сессионное свойство: query_max_execution_time

Максимально допустимое время активного выполнения запроса в кластере (без учёта анализа, планирования или ожидания в очереди). После превышения этого времени запрос будет автоматически завершён.

query.max-length

  • Тип: integer
  • Значение по умолчанию: 1,000,000
  • Максимальное значение: 1,000,000,000

Максимальное количество символов в тексте SQL-запроса. Если запрос превышает этот предел, он не будет обработан, и Trino вернёт ошибку QUERY_TEXT_TOO_LARGE.

query.max-planning-time

  • Тип: duration
  • Значение по умолчанию: 10m
  • Сессионное свойство: query_max_planning_time

Максимально допустимое время, которое может быть потрачено на планирование выполнения запроса. После истечения этого времени координатор попытается остановить запрос. Однако некоторые операции на этапе планирования могут быть неотменяемыми, поэтому завершение не всегда происходит мгновенно.

query.max-run-time

  • Тип: длительность (duration)
  • Значение по умолчанию: 100d
  • Параметр сессии: query_max_run_time

Максимальное время, в течение которого запрос может выполняться в кластере, прежде чем будет принудительно завершён. Это время включает этапы анализа и планирования, а также ожидание в очереди — по сути, это максимальное «время жизни» запроса с момента его создания.

query.max-scan-physical-bytes

  • Тип: размер данных (data size)
  • Параметр сессии: query_max_scan_physical_bytes

Максимальное количество байт, которые могут быть просканированы во время выполнения запроса. Когда этот лимит достигается, выполнение запроса останавливается, чтобы предотвратить чрезмерное использование ресурсов.

query.max-write-physical-size

  • Тип: размер данных (data size)
  • Параметр сессии: query_max_write_physical_size

Максимальный физический объём данных, который может быть записан во время выполнения запроса. При достижении лимита запрос завершается, чтобы избежать избыточной нагрузки на систему.

query.max-stage-count

  • Тип: целое число (integer)
  • Значение по умолчанию: 150
  • Минимальное значение: 1

Максимальное количество стадий (stages), которые может сгенерировать один запрос. Если запрос создаёт больше стадий, чем разрешено, он завершается с ошибкой QUERY_HAS_TOO_MANY_STAGES.

Предупреждение:
Если установить слишком большое значение, это может вызвать нестабильность кластера — запросы с большим числом стадий могут привести к ошибкам других запросов с сообщением:
REMOTE_TASK_ERROR и Max requests queued per destination exceeded for HttpDestination

query.max-history

  • Тип: целое число (integer)
  • Значение по умолчанию: 100

Максимальное количество запросов, сохраняемых в истории запросов для статистики и отображения в веб-интерфейсе. Когда лимит достигнут, старые запросы удаляются.

Чтобы хранить информацию о запросах дольше и в большем объёме, необходимо использовать прослушиватель событий (event listener), который записывает события во внешнюю систему.

query.min-expire-age

  • Тип: длительность (duration)
  • Значение по умолчанию: 15m

Минимальный возраст запроса в истории перед его удалением. Просроченные запросы удаляются из буфера истории и больше не отображаются в веб-интерфейсе.&lt;/p>

<p>Чтобы сохранять события запросов и подробную информацию о них во внешней системе, также используется

event listener.

query.remote-task.enable-adaptive-request-size

  • Тип: логический (boolean)
  • Значение по умолчанию: true
  • Параметр сессии: remote_task_adaptive_update_request_size_enabled

Включает динамическое разделение запросов, отправляемых задачами на сервер. Это помогает избежать ошибок «out of memory» при работе с большими схемами. Настройки по умолчанию оптимизированы для типичных сценариев и должны изменяться только опытными пользователями, работающими с очень крупными таблицами.

query.remote-task.guaranteed-splits-per-task

  • Тип: целое число (integer)
  • Значение по умолчанию: 3
  • Параметр сессии: remote_task_guaranteed_splits_per_request

Минимальное количество splits (подзадач), которые должны быть назначены каждой удалённой задаче, чтобы гарантировать минимальный объём работы для выполнения. Требует, чтобы параметр query.remote-task.enable-adaptive-request-size был включён.

query.remote-task.max-error-duration

  • Тип: длительность (duration)
  • Значение по умолчанию: 1m

Максимальное время ожидания связи между координатором и удалённой задачей. Если координатор не получает обновления от задачи в течение этого времени, задача считается неудавшейся (failed).

query.remote-task.max-request-size

  • Тип: размер данных (data size)
  • Значение по умолчанию: 8MB
  • Параметр сессии: remote_task_max_request_size

Максимальный размер одного запроса, который может быть выполнен удалённой задачей. Требует включённого параметра query.remote-task.enable-adaptive-request-size.

query.remote-task.request-size-headroom

  • Тип: размер данных (data size)
  • Значение по умолчанию: 2MB
  • Параметр сессии: remote_task_request_size_headroom

Определяет запас (headroom) по памяти, который выделяется сверх размера данных запроса. Также требует включённого параметра query.remote-task.enable-adaptive-request-size.

query.info-url-template

  • Тип: строка (string)
  • Значение по умолчанию: URL страницы информации о запросе на координаторе

Позволяет перенаправлять клиентов на альтернативное местоположение для получения информации о запросах. URL должен содержать плейсхолдер идентификатора запроса — ${QUERY_ID}.

Пример:

Во время выполнения ${QUERY_ID} заменяется фактическим идентификатором запроса.

retry-policy

  • Тип: строка (string)
  • Значение по умолчанию: NONE

Определяет политику повторного выполнения (retry policy) для Fault-tolerant execution — отказоустойчивого выполнения.

Поддерживаемые значения:

  • NONE — отключает отказоустойчивое выполнение.
  • TASK — повторно выполняет отдельные задачи (tasks) внутри запроса при сбое. Требует настройки exchange manager.
  • QUERY — повторно выполняет весь запрос целиком при сбое.

Optimizer properties

optimizer.dictionary-aggregation

  • Тип: boolean
  • Значение по умолчанию: false
  • Сессионное свойство: dictionary_aggregation

Включает оптимизацию для агрегаций, выполняемых над словарями (dictionaries).

optimizer.optimize-metadata-queries

  • Тип: boolean
  • Значение по умолчанию: false
  • Сессионное свойство: optimize_metadata_queries

Включает оптимизацию некоторых агрегатных запросов с использованием значений, хранящихся в метаданных.
Это позволяет Trino выполнять простые запросы за постоянное время.
На данный момент оптимизация применяется к функциям max, min и approx_distinct по ключам партиций и другим агрегатам, не зависящим от количества строк (включая DISTINCT-агрегации).
Использование этого параметра может существенно ускорить выполнение некоторых запросов.

⚠️ Важно: возможны некорректные результаты, если коннектор возвращает ключи партиций, для которых нет строк.
Например, коннектор Hive может вернуть пустые партиции, созданные другими системами (Trino такие не создает).

optimizer.distinct-aggregations-strategy

  • Тип: string
  • Допустимые значения:
    AUTOMATIC, MARK_DISTINCT, SINGLE_STEP, PRE_AGGREGATE, SPLIT_TO_SUBQUERIES
  • Значение по умолчанию: AUTOMATIC
  • Сессионное свойство: distinct_aggregations_strategy

Определяет стратегию обработки нескольких DISTINCT-агрегаций.

Режимы работы:

  • SINGLE_STEP — вычисляет DISTINCT-агрегации за один шаг без предварительной агрегации.
    Может работать медленно, если количество уникальных ключей группировки невелико.
  • MARK_DISTINCT — использует механизм MarkDistinct для нескольких DISTINCT-агрегаций или их комбинаций с обычными агрегатами.
  • PRE_AGGREGATE — выполняет DISTINCT-агрегации с комбинацией этапов агрегации и предагрегации.
  • SPLIT_TO_SUBQUERIES — разбивает входные данные агрегации на независимые подзапросы, где каждый подзапрос вычисляет одну DISTINCT-агрегацию, повышая параллелизм.
  • AUTOMATIC — Trino автоматически выбирает подходящую стратегию.

По умолчанию предпочтительна стратегия SINGLE_STEP, но если уровень параллелизма ограничен (из-за малого числа ключей группировки), Trino выбирает другую стратегию на основе статистики данных.

optimizer.push-aggregation-through-outer-join

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: push_aggregation_through_outer_join

Когда агрегирующая функция находится над внешним соединением (outer join), и все столбцы из внешней стороны соединения присутствуют в группировке, Trino может переместить агрегацию ниже соединения.

Эта оптимизация особенно полезна для коррелированных скалярных подзапросов, которые Trino переписывает как агрегации над внешним соединением.

Пример:

Включение этой оптимизации может существенно ускорить запросы,
так как уменьшает объем данных, обрабатываемых оператором JOIN.
Однако для запросов с очень избирательными соединениями она может, наоборот, немного замедлить выполнение.

optimizer.push-table-write-through-union

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: push_table_write_through_union

Позволяет параллелизировать запись данных при использовании UNION ALL в запросах, которые создают или заполняют таблицы.

Эта оптимизация ускоряет запись результирующих таблиц,
поскольку устраняет необходимость дополнительной синхронизации при объединении результатов.

Включение параметра полезно, когда скорость записи ещё не достигла предела производительности,
но на перегруженных системах может, наоборот, замедлить выполнение запросов.

optimizer.push-filter-into-values-max-row-count

  • Тип: integer
  • Значение по умолчанию: 100
  • Минимальное значение: 0
  • Сессионное свойство: push_filter_into_values_max_row_count

Определяет порог количества строк в операторе VALUES,
ниже которого планировщик может выполнить фильтрацию напрямую над VALUES,
чтобы оптимизировать план запроса.

optimizer.join-reordering-strategy

  • Тип: string
  • Допустимые значения: AUTOMATIC, ELIMINATE_CROSS_JOINS, NONE
  • Значение по умолчанию: AUTOMATIC
  • Сессионное свойство: join_reordering_strategy

Определяет стратегию перестановки (reordering) соединений (JOIN).

Режимы работы:

  • NONE — сохраняет порядок таблиц, указанный в запросе.
  • ELIMINATE_CROSS_JOINS — переставляет соединения, чтобы избежать кросс-соединений (CROSS JOIN), где это возможно, при этом максимально сохраняет исходный порядок таблиц.
  • AUTOMATIC — перебирает все возможные варианты порядка соединений и использует стоимостную модель (cost-based estimation), чтобы выбрать порядок с наименьшей вычислительной стоимостью.
    Если статистика недоступна или стоимость не может быть вычислена, Trino автоматически переключается на стратегию ELIMINATE_CROSS_JOINS.

optimizer.max-reordered-joins

  • Тип: integer
  • Значение по умолчанию: 8
  • Сессионное свойство: max_reordered_joins

Определяет максимальное количество соединений, которые оптимизатор может переставить одновременно,
если используется стоимостная стратегия (cost-based).

⚠️ Предупреждение:
Количество возможных комбинаций соединений растёт факториально по мере увеличения числа таблиц.
Поэтому повышение этого значения может серьёзно замедлить выполнение запросов и планирование.

optimizer.optimize-duplicate-insensitive-joins

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: optimize_duplicate_insensitive_joins

Уменьшает количество строк, возвращаемых при выполнении JOIN,
если оптимизатор обнаруживает, что дублирующиеся строки в результате можно безопасно пропустить.
Это может повысить производительность без изменения итоговых данных.

optimizer.use-exact-partitioning

  • Тип: boolean
  • Значение по умолчанию: false
  • Сессионное свойство: use_exact_partitioning

Указывает, что данные должны быть перераспределены (repartitioned),
если схема партиционирования предыдущего этапа не в точности совпадает с той,
которую ожидает следующий этап выполнения запроса.

Полезно для обеспечения корректного распределения данных,
но может увеличить затраты на шифлинг (data shuffling) и повлиять на производительность.

optimizer.use-table-scan-node-partitioning

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: use_table_scan_node_partitioning

Использует разделение таблицы на узлы (node partitioning), предоставленное коннектором, при чтении таблиц.
Например, в Hive это соответствует buckets (корзинам) таблицы.

Если включено (true) и выполняется минимальное соотношение партиций к задачам,
каждая партиция таблицы читается отдельным воркером.
Минимальное соотношение задаётся параметром
optimizer.table-scan-node-partitioning-min-bucket-to-task-ratio.

Назначение партиций распределяется между воркерами для параллельной обработки.
Использование node partitioning при чтении таблицы может ускорить выполнение запросов,
так как уменьшает сложность выполнения (например, может не потребоваться перестановка данных по всему кластеру при агрегациях).

Однако, если количество партиций меньше числа воркеров,
уровень параллелизма может снизиться.

optimizer.table-scan-node-partitioning-min-bucket-to-task-ratio

  • Тип: double
  • Значение по умолчанию: 0.5
  • Сессионное свойство: table_scan_node_partitioning_min_bucket_to_task_ratio

Определяет минимальное соотношение «количество бакетов / количество задач»,
необходимое для использования node partitioning при чтении таблицы.

Если число бакетов в таблице мало по сравнению с числом воркеров,
Trino распределяет чтение таблицы по всем воркерам для повышения параллелизма.

optimizer.colocated-joins-enabled

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: colocated_join

Использует колоцированные соединения (co-located joins),
если обе стороны соединения (JOIN) имеют одинаковое партиционирование таблицы по ключам соединения,
и выполняются условия, указанные в optimizer.use-table-scan-node-partitioning.

Например, при соединении таблиц Hive с одинаковыми схемами бакетизации,
Trino может выполнить соединение без обмена данными между воркерами,
что значительно повышает производительность запросов.

optimizer.filter-conjunction-independence-factor

  • Тип: double
  • Значение по умолчанию: 0.75
  • Минимальное значение: 0
  • Максимальное значение: 1
  • Сессионное свойство: filter_conjunction_independence_factor

Определяет степень предположения независимости при оценке селективности (избирательности) сочетания нескольких предикатов (условий) в фильтре WHERE.

🔹 Чем ниже значение, тем более консервативной будет оценка — оптимизатор будет считать, что между столбцами предикатов существует большая корреляция.
🔹 Значение 0 означает, что оптимизатор предполагает полную зависимость столбцов, и селективность всего фильтра определяется наиболее селективным условием.

optimizer.join-multi-clause-independence-factor

  • Тип: double
  • Значение по умолчанию: 0.25
  • Минимальное значение: 0
  • Максимальное значение: 1
  • Сессионное свойство: join_multi_clause_independence_factor

Задаёт степень предположения независимости условий при оценке результата соединений (JOIN) с несколькими условиями.

🔹 Более низкие значения дают консервативную оценку, предполагая сильную корреляцию между столбцами в условиях соединения.
🔹 При значении 0 оптимизатор считает, что столбцы в условиях соединения полностью коррелированы,
и селективность соединения определяется наиболее селективным условием.

optimizer.non-estimatable-predicate-approximation.enabled

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: non_estimatable_predicate_approximation_enabled

Включает аппроксимацию (приблизительную оценку) количества строк,
возвращаемых фильтрами, чья стоимость не может быть точно оценена,
даже при наличии полной статистики.

Это позволяет оптимизатору строить более эффективные планы выполнения,
даже при наличии фильтров, для которых ранее не удавалось выполнить оценку.

optimizer.join-partitioned-build-min-row-count

  • Тип: integer
  • Значение по умолчанию: 1000000
  • Минимальное значение: 0
  • Сессионное свойство: join_partitioned_build_min_row_count

Минимальное количество строк на «build»-стороне соединения (JOIN), при котором используется поисковое соединение с разбиением на партиции (partitioned join lookup).
Если количество строк на build-стороне оценивается как меньше указанного порога, используется однопоточный поиск для повышения производительности соединения.

🔹 Значение 0 отключает эту оптимизацию.

optimizer.min-input-size-per-task

  • Тип: размер данных (data size)
  • Значение по умолчанию: 5GB
  • Минимальное значение: 0MB
  • Сессионное свойство: min_input_size_per_task

Минимальный объём входных данных, необходимый на одну задачу (task).
Эта настройка помогает оптимизатору определить количество хэш-разбиений (hash partitions) для операций JOIN и AGGREGATION.

🔹 Ограничение количества разбиений для небольших запросов повышает конкурентность в больших кластерах, где одновременно выполняются множество малых запросов.
🔹 Оценённое значение всегда находится в пределах между параметрами min_hash_partition_count и max_hash_partition_count.
🔹 Значение 0MB отключает оптимизацию.

optimizer.min-input-rows-per-task

  • Тип: integer
  • Значение по умолчанию: 10000000
  • Минимальное значение: 0
  • Сессионное свойство: min_input_rows_per_task

Минимальное количество входных строк, необходимое на одну задачу (task).
Подобно предыдущему параметру, используется оптимизатором для определения количества хэш-разбиений при соединениях и агрегациях.

🔹 Снижение количества разбиений для малых запросов повышает общую эффективность и параллелизм при высокой загрузке кластера.
🔹 Значение 0 отключает оптимизацию.

optimizer.use-cost-based-partitioning

  • Тип: boolean
  • Значение по умолчанию: true
  • Сессионное свойство: use_cost_based_partitioning

При включении оптимизатор использует оценку стоимости выполнения (cost-based optimization),
чтобы определить, требуется ли повторное разбиение (repartitioning) данных после уже выполненного этапа разбиения.

Это позволяет избежать ненужных перераспределений данных и повысить общую производительность выполнения запроса.

Подпишись на телеграм канал Data Engineering Инжиниринг данных
Иван Шамаев
Более 14 лет опыта в ИТ. Разрабатывал аналитические решения и визуализации данных, строил витрины данных, автоматизировал выгрузку и обработку данных, автоматизировал data pipelines в Airflow. Разработка рассылок, финансовой отчетности, кастомизация Superset.
0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x