Hive connector#

The Hive connector lets you query data stored in an Apache Hive data warehouse. Hive is a combination of three components:
Data files in varying formats, that are typically stored in the Hadoop Distributed File System (HDFS) or in object storage systems such as Amazon S3.
Metadata about how the data files are mapped to schemas and tables. This metadata is stored in a database, such as MySQL, and is accessed via the Hive metastore service.
A query language called HiveQL. This query language is executed on a distributed computing framework such as MapReduce or Tez.
SEP only uses the first two components: the data and the metadata. It does not use HiveQL or any part of Hive’s execution environment.
SEP includes additional enterprise features that are built on top of the existing Trino connector functionality. For more information on connector key feature differences between Trino and SEP, see the connectors feature matrix.
Requirements#
The Hive connector requires a Hive metastore service (HMS), or a compatible implementation of the Hive metastore, such as AWS Glue.
You must select and configure a supported file system in your catalog configuration file.
The coordinator and all workers must have network access to the Hive metastore and the storage system. Hive metastore access with the Thrift protocol defaults to using port 9083.
Data files must be in a supported file format. File formats can be configured
using the format
table property and other specific
properties:
In the case of serializable formats, only specific SerDes are allowed:
RCText - RCFile using
ColumnarSerDe
RCBinary - RCFile using
LazyBinaryColumnarSerDe
SequenceFile
CSV - using
org.apache.hadoop.hive.serde2.OpenCSVSerde
JSON - using
org.apache.hive.hcatalog.data.JsonSerDe
OPENX_JSON - OpenX JSON SerDe from
org.openx.data.jsonserde.JsonSerDe
. Find more details about the Trino implementation in the source repository. Starburst recommends using the OpenX JSON serde. Existing tables using these serdes and all the associated serde properties are handled automatically.TextFile
Additional features of the connector require a valid Starburst Enterprise license, unless otherwise noted.
Extensions#
The connector includes all the features and integrations detailed in the following section:
General configuration#
To configure the Hive connector, create a catalog properties file that
specifies the Hive connector by setting the connector.name
to
hive
.
You must configure a metastore for metadata.
You must select and configure one of the supported file systems.
connector.name=hive
hive.metastore.uri=thrift://example.net:9083
fs.x.enabled=true
Replace the fs.x.enabled
configuration property with the desired file system.
If you are using AWS Glue as your metastore, you must
instead set hive.metastore
to glue
:
connector.name=hive
hive.metastore=glue
Each metastore type has specific configuration properties along with General metastore configuration properties.
Multiple Hive clusters#
You can have as many catalogs as you need. If you have additional Hive clusters, configure another catalog.
To add another catalog, add a new properties file to etc/catalog
. For example,
if you name the property file sales.properties
, SEP creates a catalog
named sales
.
General configuration properties#
The following table lists general configuration properties for the Hive connector. There are additional sets of configuration properties throughout the Hive connector documentation.
Property Name |
Description |
Default |
---|---|---|
|
Set the default value for the auto_purge table property for managed tables. See the Table properties for more information on auto_purge. |
|
|
Set to |
|
|
Enables automatic column level statistics collection on write. See Table statistics for details. |
|
|
The compression codec to use when writing files. Possible values are |
|
|
Should empty files be created for buckets that have no data? |
|
|
How long a cached directory listing is considered valid. |
|
|
Cache directory listing for specific tables. Examples:
|
|
|
List of tables to be excluded from caching. If used with the
|
|
|
Maximum retained size of cached file status entries. |
|
|
Force splits to be scheduled on the same node as the Hadoop DataNode process serving the split data. This is useful for installations where Trino is collocated with every DataNode. |
|
|
Path to configuration file for mapping usage with storage caching. |
|
|
Refresh period for mapping file with storage caching. |
|
|
Enable translation for Hive views. |
|
|
Use the legacy algorithm to translate Hive views. You can use
the |
|
|
Ignore partitions when the file system location does not exist rather than failing the query. This skips data that may be expected to be part of the table. |
|
|
Can new data be inserted into existing partitions? If |
|
|
What happens when data is inserted into an existing partition? Possible values are:
|
|
|
Maximum number of partitions to drop in a single query. |
100,000 |
|
The maximum number of partitions for a single table scan to load eagerly on the coordinator. Certain optimizations are not possible without eager loading. |
100,000 |
|
Maximum number of partitions for a single table scan. |
1,000,000 |
|
Maximum number of partitions per writer. |
100 |
|
Number of threads used for retrieving metadata. Currently, only table loading is parallelized. |
|
|
Enable creating non-managed (external) Hive tables. |
|
|
Enable writes to non-managed (external) Hive tables. |
|
|
Access ORC columns by name. By default, columns in ORC files are accessed by
their ordinal position in the Hive table definition. The equivalent catalog
session property is |
|
|
Improve parallelism of partitioned and bucketed table writes. When disabled, the number of writing threads is limited to number of buckets. |
|
|
Enables Athena partition projection support |
|
|
Specifies the number of partitions to analyze when computing table statistics. |
100 |
|
Maximum retained size of all entries in per transaction file status cache. Retained size limit is shared across all running queries. |
|
|
Time zone for Parquet read and write. |
JVM default |
|
Access Parquet columns by name by default. Set this property to |
|
|
Set to |
|
|
Allow specifying the list of schemas for which SEP will enforce that
queries use a filter on partition keys for source tables. The list can be
specified using the |
|
|
Adjusts binary encoded timestamp values to a specific time zone. For Hive 3.1+, this must be set to UTC. |
JVM default |
|
Enable reading data from subdirectories of table or partition locations. If
disabled, subdirectories are ignored. This is equivalent to the
|
|
|
Should new partitions be written using the existing table format or the default SEP format? |
|
|
Filter S3 objects based on their storage class and restored status if applicable. Possible values are
|
|
|
Enables auto-commit for all writes. This can be used to disallow multi-statement write transactions. |
|
|
The default file format used when creating new tables. |
|
|
Enables Table statistics. The equivalent catalog session
property is |
|
|
Best effort maximum size of new files. |
|
|
Controls whether the temporary staging directory configured at
|
|
|
Controls the location of temporary staging directory that is used for write
operations. The |
|
|
Specifies the precision to use for Hive columns of type |
|
|
Enables validation that data is in the correct bucket when reading bucketed tables. |
|
|
Various properties for materialized view usage and configuration. |
Fault-tolerant execution support#
The connector supports Fault-tolerant execution of query processing. Read and write operations are both supported with any retry policy on non-transactional tables.
Read operations are supported with any retry policy on transactional tables.
Write operations and CREATE TABLE ... AS
operations are not supported with any
retry policy on transactional tables.
File system access configuration#
The connector supports accessing the following file systems:
You must enable and configure the specific file system access. Legacy support is not recommended and will be removed.
SQL support#
The connector provides read access and write access to data and metadata in the configured object storage system and metadata stores:
Globally available statements; see also Globally available statements
Security operations: see also SQL standard-based authorization for object storage
Refer to the migration guide for practical advice on migrating from Hive to SEP.
The following sections provide Hive-specific information regarding SQL support.
Basic usage examples#
The examples shown here work on Google Cloud Storage by replacing s3://
with
gs://
.
Create a new Hive table named page_views
in the web
schema
that is stored using the ORC file format, partitioned by date and
country, and bucketed by user into 50
buckets. Note that Hive
requires the partition columns to be the last columns in the table:
CREATE TABLE example.web.page_views (
view_time TIMESTAMP,
user_id BIGINT,
page_url VARCHAR,
ds DATE,
country VARCHAR
)
WITH (
format = 'ORC',
partitioned_by = ARRAY['ds', 'country'],
bucketed_by = ARRAY['user_id'],
bucket_count = 50
)
Create a new Hive schema named web
that stores tables in an
S3 bucket named my-bucket
:
CREATE SCHEMA example.web
WITH (location = 's3://my-bucket/')
Drop a schema:
DROP SCHEMA example.web
Drop a partition from the page_views
table:
DELETE FROM example.web.page_views
WHERE ds = DATE '2016-08-09'
AND country = 'US'
Query the page_views
table:
SELECT * FROM example.web.page_views
List the partitions of the page_views
table:
SELECT * FROM example.web."page_views$partitions"
Create an external Hive table named request_logs
that points at
existing data in S3:
CREATE TABLE example.web.request_logs (
request_time TIMESTAMP,
url VARCHAR,
ip VARCHAR,
user_agent VARCHAR
)
WITH (
format = 'TEXTFILE',
external_location = 's3://my-bucket/data/logs/'
)
Collect statistics for the request_logs
table:
ANALYZE example.web.request_logs;
Drop the external table request_logs
. This only drops the metadata
for the table. The referenced data directory is not deleted:
DROP TABLE example.web.request_logs
CREATE TABLE AS can be used to create transactional tables in ORC format like this:
CREATE TABLE <name> WITH ( format='ORC', transactional=true ) AS <query>
Add an empty partition to the page_views
table:
CALL system.create_empty_partition(
schema_name => 'web',
table_name => 'page_views',
partition_columns => ARRAY['ds', 'country'],
partition_values => ARRAY['2016-08-09', 'US']);
Drop stats for a partition of the page_views
table:
CALL system.drop_stats(
schema_name => 'web',
table_name => 'page_views',
partition_values => ARRAY[ARRAY['2016-08-09', 'US']]);
Procedures#
Use the CALL statement to perform data manipulation or
administrative tasks. Procedures must include a qualified catalog name, if your
Hive catalog is called web
:
CALL web.system.example_procedure()
The following procedures are available:
system.create_empty_partition(schema_name, table_name, partition_columns, partition_values)
Create an empty partition in the specified table.
system.sync_partition_metadata(schema_name, table_name, mode, case_sensitive)
Check and update partitions list in metastore. There are three modes available:
ADD
: add any partitions that exist on the file system, but not in the metastore.DROP
: drop any partitions that exist in the metastore, but not on the file system.FULL
: perform bothADD
andDROP
.
The
case_sensitive
argument is optional. The default value istrue
for compatibility with Hive’sMSCK REPAIR TABLE
behavior, which expects the partition column names in file system paths to use lowercase (e.g.col_x=SomeValue
). Partitions on the file system not conforming to this convention are ignored, unless the argument is set tofalse
.system.drop_stats(schema_name, table_name, partition_values)
Drops statistics for a subset of partitions or the entire table. The partitions are specified as an array whose elements are arrays of partition values (similar to the
partition_values
argument increate_empty_partition
). Ifpartition_values
argument is omitted, stats are dropped for the entire table.
system.register_partition(schema_name, table_name, partition_columns, partition_values, location)
Registers existing location as a new partition in the metastore for the specified table.
When the
location
argument is omitted, the partition location is constructed usingpartition_columns
andpartition_values
.Due to security reasons, the procedure is enabled only when
hive.allow-register-partition-procedure
is set totrue
.
system.unregister_partition(schema_name, table_name, partition_columns, partition_values)
Unregisters given, existing partition in the metastore for the specified table. The partition data is not deleted.
system.flush_metadata_cache()
Flush all Hive metadata caches.
system.flush_metadata_cache(schema_name => ..., table_name => ...)
Flush Hive metadata caches entries connected with selected table. Procedure requires named parameters to be passed
system.flush_metadata_cache(schema_name => ..., table_name => ..., partition_columns => ARRAY[...], partition_values => ARRAY[...])
Flush Hive metadata cache entries connected with selected partition. Procedure requires named parameters to be passed.
Flush filesystem cache#
system.flush_filesystem_cache()
Flushes filesystem cache of a specific table. By default, this function accepts a schema name and a table name as parameters. You can flush the filesystem cache for specific partitions. For example, the following system call flushes the filesystem cache of a specific partition of the
MY_TABLE
table:CALL system.flush_filesystem_cache('TPCH_SCHEMA', 'MY_TABLE', ARRAY['col2'], ARRAY['group1']);
Data management#
The Data management functionality includes support for INSERT
,
UPDATE
, DELETE
, and MERGE
statements, with the exact support depending on
the storage system, file format, and metastore.
When connecting to a Hive metastore version 3.x, the Hive connector supports reading from and writing to insert-only and ACID tables, with full support for partitioning and bucketing.
DELETE applied to non-transactional tables is only supported if the
table is partitioned and the WHERE
clause matches entire partitions.
Transactional Hive tables with ORC format support “row-by-row” deletion, in
which the WHERE
clause may match arbitrary sets of rows.
UPDATE is only supported for transactional Hive tables with format ORC.
UPDATE
of partition or bucket columns is not supported.
MERGE is only supported for ACID tables.
ACID tables created with Hive Streaming Ingest are not supported.
Schema and table management#
The Hive connector supports querying and manipulating Hive tables and schemas (databases). While some uncommon operations must be performed using Hive directly, most operations can be performed using SEP.
Schema evolution#
Hive table partitions can differ from the current table schema. This occurs when the data types of columns of a table are changed from the data types of columns of preexisting partitions. The Hive connector supports this schema evolution by allowing the same conversions as Hive. The following table lists possible data type conversions.
Data type |
Converted to |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Any conversion failure results in null, which is the same behavior as Hive. For
example, converting the string 'foo'
to a number, or converting the string
'1234'
to a TINYINT
(which has a maximum value of 127
).
Avro schema evolution#
SEP supports querying and manipulating Hive tables with the Avro storage format, which has the schema set based on an Avro schema file/literal. SEP is also capable of creating the tables in SEP by infering the schema from a valid Avro schema file located locally, or remotely in HDFS/Web server.
To specify that the Avro schema should be used for interpreting table data, use
the avro_schema_url
table property.
The schema can be placed in the local file system or remotely in the following locations:
HDFS (e.g.
avro_schema_url = 'hdfs://user/avro/schema/avro_data.avsc'
)S3 (e.g.
avro_schema_url = 's3n:///schema_bucket/schema/avro_data.avsc'
)A web server (e.g.
avro_schema_url = 'http://example.org/schema/avro_data.avsc'
)
The URL, where the schema is located, must be accessible from the Hive metastore and SEP coordinator/worker nodes.
Alternatively, you can use the table property avro_schema_literal
to define
the Avro schema.
The table created in SEP using the avro_schema_url
or avro_schema_literal
property behaves the same way as a Hive table with avro.schema.url
or
avro.schema.literal
set.
Example:
CREATE TABLE example.avro.avro_data (
id BIGINT
)
WITH (
format = 'AVRO',
avro_schema_url = '/usr/local/avro_data.avsc'
)
The columns listed in the DDL (id
in the above example) is ignored if
avro_schema_url
is specified. The table schema matches the schema in the Avro
schema file. Before any read operation, the Avro schema is accessed so the query
result reflects any changes in schema. Thus SEP takes advantage of Avro’s
backward compatibility abilities.
If the schema of the table changes in the Avro schema file, the new schema can still be used to read old data. Newly added/renamed fields must have a default value in the Avro schema file.
The schema evolution behavior is as follows:
Column added in new schema: Data created with an older schema produces a default value when table is using the new schema.
Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.
Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.
Changing type of column in the new schema: If the type coercion is supported by Avro or the Hive connector, then the conversion happens. An error is thrown for incompatible types.
Limitations#
The following operations are not supported when avro_schema_url
is set:
CREATE TABLE AS
is not supported.Bucketing(
bucketed_by
) columns are not supported inCREATE TABLE
.ALTER TABLE
commands modifying columns are not supported.
ALTER TABLE EXECUTE#
The connector supports the following commands for use with ALTER TABLE EXECUTE.
optimize#
The optimize
command is used for rewriting the content of the specified
table so that it is merged into fewer but larger files. If the table is
partitioned, the data compaction acts separately on each partition selected for
optimization. This operation improves read performance.
All files with a size below the optional file_size_threshold
parameter
(default value for the threshold is 100MB
) are merged:
ALTER TABLE test_table EXECUTE optimize
The following statement merges files in a table that are under 128 megabytes in size:
ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '128MB')
You can use a WHERE
clause with the columns used to partition the table
to filter which partitions are optimized:
ALTER TABLE test_partitioned_table EXECUTE optimize
WHERE partition_key = 1
You can use a more complex WHERE
clause to narrow down the scope of the
optimize
procedure. The following example casts the timestamp values to
dates, and uses a comparison to only optimize partitions with data from the year
2022 or newer:
ALTER TABLE test_table EXECUTE optimize
WHERE CAST(timestamp_tz AS DATE) > DATE '2021-12-31'
The optimize
command is disabled by default, and can be enabled for a
catalog with the <catalog-name>.non_transactional_optimize_enabled
session property:
SET SESSION <catalog_name>.non_transactional_optimize_enabled=true
Warning
Because Hive tables are non-transactional, take note of the following possible outcomes:
If queries are run against tables that are currently being optimized, duplicate rows may be read.
In rare cases where exceptions occur during the
optimize
operation, a manual cleanup of the table directory is needed. In this situation, refer to the Trino logs and query failure messages to see which files must be deleted.
Table properties#
Table properties supply or set metadata for the underlying tables. This is key for CREATE TABLE AS statements. Table properties are passed to the connector using a WITH clause:
CREATE TABLE tablename
WITH (format='CSV',
csv_escape = '"')
Property name |
Description |
Default |
---|---|---|
|
Indicates to the configured metastore to perform a purge when a table or partition is deleted instead of a soft deletion using the trash. |
|
|
The URI pointing to Avro schema evolution for the table. |
|
|
The number of buckets to group data into. Only valid if used with
|
0 |
|
The bucketing column for the storage table. Only valid if used with
|
|
|
Specifies which Hive bucketing version to use. Valid values are |
|
|
The CSV escape character. Requires CSV format. |
|
|
The CSV quote character. Requires CSV format. |
|
|
The CSV separator character. Requires CSV format. You can use other
separators such as |
|
|
The URI for an external Hive table on S3, Azure Blob Storage, etc. See the Basic usage examples for more information. |
|
|
The table file format. Valid values include |
|
|
The serialization format for |
|
|
Comma separated list of columns to use for ORC bloom filter. It improves the
performance of queries using equality predicates, such as |
|
|
The ORC bloom filters false positive probability. Requires ORC format. |
0.05 |
|
The partitioning column for the storage table. The columns listed in the
|
|
|
Comma separated list of columns to use for Parquet bloom filter. It improves
the performance of queries using equality predicates, such as |
|
|
The number of footer lines to ignore when parsing the file for data. Requires TextFile or CSV format tables. |
|
|
The number of header lines to ignore when parsing the file for data. Requires TextFile or CSV format tables. |
|
|
The column to sort by to determine bucketing for row. Only valid if
|
|
|
Allows the use of custom field separators, such as ‘|’, for TextFile formatted tables. |
|
|
Allows the use of a custom escape character for TextFile formatted tables. |
|
|
Set this property to |
|
|
Enables partition projection for selected table. Mapped from AWS Athena table property projection.enabled. |
|
|
Ignore any partition projection properties stored in the metastore for the selected table. This is a Trino-only property which allows you to work around compatibility issues on a specific table, and if enabled, Trino ignores all other configuration options related to partition projection. |
|
|
Projected partition location template, such as |
|
|
Additional properties added to a Hive table. The properties are not used by
SEP, and are available in the |
Metadata tables#
The raw Hive table properties are available as a hidden table, containing a separate column per table property, with a single row containing the property values.
$properties
table#
The properties table name is composed with the table name and $properties
appended. It exposes the parameters of the table in the metastore.
You can inspect the property names and values with a simple query:
SELECT * FROM example.web."page_views$properties";
stats_generated_via_stats_task | auto.purge | trino_query_id | trino_version | transactional
---------------------------------------------+------------+-----------------------------+---------------+---------------
workaround for potential lack of HIVE-12730 | false | 20230705_152456_00001_nfugi | 434 | false
$partitions
table#
The $partitions
table provides a list of all partition values
of a partitioned table.
The following example query returns all partition values from the
page_views
table in the web
schema of the example
catalog:
SELECT * FROM example.web."page_views$partitions";
day | country
------------+---------
2023-07-01 | POL
2023-07-02 | POL
2023-07-03 | POL
2023-03-01 | USA
2023-03-02 | USA
Column properties#
Property name |
Description |
Default |
---|---|---|
|
Defines the type of partition projection to use on this column. May be used
only on partition columns. Available types: |
|
|
Used with |
|
|
Used with |
|
|
Used with |
|
|
Used with |
|
|
Used with |
|
|
Used with |
Metadata columns#
In addition to the defined columns, the Hive connector automatically exposes metadata in a number of hidden columns in each table:
$bucket
: Bucket number for this row$path
: Full file system path name of the file for this row$file_modified_time
: Date and time of the last modification of the file for this row$file_size
: Size of the file for this row$partition
: Partition name for this row
You can use these columns in your SQL statements like any other column. They can be selected directly, or used in conditional statements. For example, you can inspect the file size, location and partition for each record:
SELECT *, "$path", "$file_size", "$partition"
FROM example.web.page_views;
Retrieve all records that belong to files stored in the partition
ds=2016-08-09/country=US
:
SELECT *, "$path", "$file_size"
FROM example.web.page_views
WHERE "$partition" = 'ds=2016-08-09/country=US'
View management#
SEP allows reading from Hive materialized views, and can be configured to support reading Hive views.
Views#
By default, Hive views are executed with the DEFINER
security mode. Set the
hive.hive-views.run-as-invoker
catalog configuration property to true
to use
INVOKER
semantics.
To execute all views as INVOKER
, set both the
hive.trino-views.run-as-invoker
and the hive.hive-views.run-as-invoker
catalog configuration properties to true
.
Athena views#
Note
Amazon Athena view support is a public preview feature. There are expected to be cases of syntax differences between Athena and SEP that cause view execution to fail. Contact Starburst Support with questions or feedback.
The connector supports querying views created in Amazon Athena. Athena views are
executed with INVOKER
security mode. If you are using AWS Lake
Formation, support for Athena views includes
cross-account access to views shared from other AWS catalogs.
Hive views#
Hive views are defined in HiveQL and stored in the Hive Metastore Service. They are analyzed to allow read access to the data.
The Hive connector includes support for reading Hive views with three different modes.
Disabled
Legacy
Experimental
If using Hive views from Trino is required, you must compare results in Hive and Trino for each view definition to ensure identical results. Use the experimental mode whenever possible. Avoid using the legacy mode. Leave Hive views support disabled, if you are not accessing any Hive views from Trino.
You can configure the behavior in your catalog properties file.
By default, Hive views are executed with the RUN AS DEFINER
security mode. Set
the hive.hive-views.run-as-invoker
catalog configuration property to true
to use RUN AS INVOKER
semantics.
Disabled
The default behavior is to ignore Hive views. This means that your business logic and data encoded in the views is not available in Trino.
Legacy
A very simple implementation to execute Hive views, and therefore allow read
access to the data in Trino, can be enabled with hive.hive-views.enabled=true
and hive.hive-views.legacy-translation=true
.
For temporary usage of the legacy behavior for a specific catalog, you can set
the hive_views_legacy_translation
catalog session property
to true
.
This legacy behavior interprets any HiveQL query that defines a view as if it is written in SQL. It does not do any translation, but instead relies on the fact that HiveQL is very similar to SQL.
This works for very simple Hive views, but can lead to problems for more complex queries. For example, if a HiveQL function has an identical signature but different behaviors to the SQL version, the returned results may differ. In more extreme cases the queries might fail, or not even be able to be parsed and executed.
Experimental
The new behavior is better engineered and has the potential to become a lot more powerful than the legacy implementation. It can analyze, process, and rewrite Hive views and contained expressions and statements.
It supports the following Hive view functionality:
UNION [DISTINCT]
andUNION ALL
against Hive viewsNested
GROUP BY
clausescurrent_user()
LATERAL VIEW OUTER EXPLODE
LATERAL VIEW [OUTER] EXPLODE
on array of structLATERAL VIEW json_tuple
You can enable the experimental behavior with
hive.hive-views.enabled=true
. Remove the
hive.hive-views.legacy-translation
property or set it to false
to make
sure legacy is not enabled.
Keep in mind that numerous features are not yet implemented when experimenting with this feature. The following is an incomplete list of missing functionality:
HiveQL
current_date
,current_timestamp
, and othersHive function calls including
translate()
, window functions, and othersCommon table expressions and simple case expressions
Honor timestamp precision setting
Support all Hive data types and correct mapping to Trino types
Ability to process custom UDFs
Materialized views#
Note
If you are a data consumer, read the Materialized views page for an introduction to using materialized views.
The connector supports Materialized view management, with the following requirements:
The cache service must be configured and running
Catalogs must be configured to allow materialized views.
In the underlying system, each materialized view consists of a view definition and a storage table. The storage table name is stored as a materialized view property. The materialized data is stored in that storage table.
Note
If you are a data engineer or platform administrator, read our cache service introduction for an overview of setting up the cache service and using materialized views.
The CREATE MATERIALIZED VIEW statement specifies the query to define
the data for the materialized view, the refresh schedule, and other parameters
used by the cache service. The query can access any available catalog and
schema. Storage configuration for the materialized view must be supplied with
table properties in the WITH
statement. The optional
automatic refresh is also configured with properties set in the WITH
clause.
Note
The format
table property is not supported for materialized views created
with the Hive connector. These materialized views use the default file format
configured in the optional hive.storage-format
catalog configuration
property, which defaults to ORC
.
Once the storage tables are populated, the materialized view is created, and you can access it like a table using the name of the materialized view.
Materialized views are populated with data and refreshed manually with the REFRESH MATERIALIZED VIEW command, or by the Automated materialized view management.
Use the SHOW CREATE MATERIALIZED VIEW statement to view the
complete CREATE MATERIALIZED VIEW
statement for a materialized view,
including the properties in the WITH
clause.
Dropping a materialized view with DROP MATERIALIZED VIEW removes the definition and the storage table.
Configuration#
To enable materialized views you must:
Create a storage schema to contain the storage tables for materialized views.
Specify the required configuration properties in the catalog properties file for each desired catalog to enable materialized view creation and usage in that catalog.
Both are discussed in this section.
The following are the required catalog configuration properties for a deployment that connects to the cache service using HTTPS on the default port:
materialized-views.enabled=true
materialized-views.namespace=<your_namespace>
materialized-views.storage-schema=<your_storage_schema>
materialized-views.http-client.https.hostname-verification=false
materialized-views.http-client.trust-store-path=</etc/trino/keystore.jks>
materialized-views.http-client.trust-store-password=<keystore_password>
cache-service.uri=https://<cache-service-hostname>:8543
cache-service.user=<starburst-user>
cache-service.password=<starburst-password>
The following are the required catalog configuration properties for a deployment that connects to the cache service using an insecure HTTP connection on the default port:
materialized-views.enabled=true
materialized-views.namespace=<your_namespace>
materialized-views.storage-schema=<your_storage_schema>
cache-service.uri=http://<cache-service-hostname>:8180
The following table lists all available catalog configuration properties related to materialized views. Instructions for creating the required storage schema follow this table.
Property name |
Description |
Required |
---|---|---|
|
Set to |
true |
|
Specifies the catalog that contains the schema used to store the storage tables for the materialized views. Defaults to the catalog used for the materialized view itself. |
false |
|
Specifies the schema used to store the storage tables for the materialized views. Ensure that the proper access control exists on that schema to prevent users from directly accessing the storage tables. |
true |
|
Used by the cache service to create a fully-qualified name for the materialized views, and to identify which catalog is used to run the scheduled refresh queries. |
true |
|
Directs SEP to run as the user submitting the query when present in a
catalog and set to |
false |
|
Specifies default value for the run-as-invoker property. |
false |
|
The URI of the SEP cache service. |
true |
The storage schema must be defined in the catalog properties file.
materialized-views.storage-schema=views_cache_storage
If it does not exist yet, you must create it with a defined location:
CREATE SCHEMA example.views_cache_storage WITH (location = 's3a://<s3-bucket-name>/hivepostgres_views/views_cache_storage/');
In addition, the schema for the materialized view itself must exist. If it does not exist, you must create it:
CREATE SCHEMA example.views_schema WITH (location = 's3a://<s3-bucket-name>/hivepostgres_views/views_schemas');
With the cache service running, the catalog configured and the schemas defined,
you can proceed to create a materialized view. In this example, a materialized
view named example.example_schema.example_materialized_view
is created:
CREATE MATERIALIZED VIEW example.example_schema.example_materialized_view
WITH (
grace_period = '15m',
max_import_duration = '1m'
) AS
SELECT *
FROM example.public.example_table
WHERE example_field IN ( 'examplevalue1', 'examplevalue2' )
;
The query, specified after AS
, can be any valid query, including queries
accessing one or multiple other catalogs.
The properties for the view are stored in the cache service database, and the
data in the storage schema, example.example_mvstorage
.
Once the materialized view has been created, you can query the data. If the data
is not yet cached, the query runs against the source data instead. You can force
the materialized view to cache at any time by running a REFRESH
statement:
REFRESH MATERIALIZED VIEW example.example_schema.example_materialized_view;
To query data in the materialized view, use a SELECT
statement
as you would for any other table:
SELECT * FROM example.example_schema.example_materialized_view;
You can also use the materialized view in more complex queries, just like any other table.
Access to a materialized view is much faster, since the data is readily available in the storage table and no computation is necessary. The data however does use storage in addition to the source data. Use the following query to determine the catalog, schema, and name of the storage table:
SELECT * FROM system.metadata.materialized_views WHERE name = 'viewname';
Then use the hidden properties of the storage table to compute an estimate for the used storage:
SELECT SUM(size) table_size, COUNT(file) num_files
FROM (
SELECT
"$path" AS file,
"$file_size" AS size
FROM <storage_catalog>.<storage_schema>.<storage_table>
)
GROUP BY file, size;
Troubleshooting#
Whether your materialized views are refreshed manually or by the cache service
using WITH
clause parameters, refreshes may fail if columns are added or
renamed at the source. If this happens, drop the materialized view, and create
it again.
ALTER MATERIALIZED VIEW SET PROPERTIES
#
The extended Starburst Hive connector adds support for ALTER MATERIALIED VIEW SET PROPERTIES statements.
ALTER MATERIALIZED VIEW
exampleMaterializedView
SET PROPERTIES
cron = '*/15 * * * *',
refresh_interval = DEFAULT;
When a materialized view property that impacts the storage layout or an
incremental refresh property is altered, a new storage table is created.
If a materialized view property that does not affect the storage layout is
changed, such as refresh_interval
or grace_period
, the existing
storage table continues to be used.
For example, the following ALTER MATERIALIZED VIEW
statement changes the
storage layout of exampleMaterializedView
by altering the partitioned_by
property, resulting in a new storage table for the materialized view:
ALTER MATERIALIZED VIEW
exampleMaterializedView
SET PROPERTIES
partitioned_by = ARRAY['regionkey'];
Automated materialized view management#
The connector uses the cache service to manage metadata and maintenance of materialized views, related storage tables, and other aspects.
When the CREATE MATERIALIZED VIEW
statement runs initially, the cache services
picks up processing asynchronously after its configured refresh-interval and
delay, which defaults to two minutes. The
defined query is then run and used to populate the storage tables. The
processing time depends on the complexity of the query, the cluster performance,
and the performance of the storage.
Configuration properties are specified in the WITH
clause when creating a
materialized view:
Property name |
Description |
Default |
---|---|---|
|
Minimum duration between refreshes of the materialized view, for example
|
|
|
Unix cron expression specifying a schedule for regular refresh of the
materialized view, for example |
|
|
Maximum allowed execution time for the refresh of the materialized view to complete. Measured from the scheduled time to the completion of the query execution. If a refresh fails for exceeding the maximum duration, the cache service attempts a refresh at the next scheduled time. |
|
|
After a view’s TTL (Time to Live, calculated as
|
|
|
Column used during incremental refresh by the service to apply an
|
|
|
Namespace used by the cache service to create a fully qualified name for materialized views in a catalog. |
|
|
Validate access to tables and data referenced by materialized view as
invoker. If you want to set this to Warning If you enable |
|
In the following example, a materialized view named customer_total_return
in
example.example_schema
is created to automatically refresh daily at 2:30AM:
CREATE MATERIALIZED VIEW example.example_schema.customer_total_return
WITH (
grace_period = '5m',
max_import_duration = '30m',
cron = '30 2 * * *'
) AS
SELECT
sr_customer_sk ctr_customer_sk,
sr_store_sk ctr_store_sk,
sum(sr_return_amt) ctr_total_return
FROM
tpcds.sf1.store_returns,
tpcds.sf1.date_dim
WHERE ( (sr_returned_date_sk = d_date_sk) AND (d_year = 2000) )
GROUP BY sr_customer_sk, sr_store_sk
;
After a materialized view is refreshed, at the end of the effective grace period, any new query requests that arrive after the new refresh is complete are run against the new contents. Query requests created before a refresh is complete are run against the previously existing contents until the effective grace period for that table is over.
Troubleshooting#
When you first create a materialized view, it is helpful to be able to determine
the state of a refresh, particularly when a refresh has failed or the view data
appears to be stale. This can happen when the refresh takes longer than the
combined max_import_duration
and effective grace period. Materialized views
have their own metadata tables located in the default schema of the cache
service database that contain current state and
other information. Metadata tables are named as the materialized view name with
$imports
added to the end. For example, given a materialized view,
example.example_schema.customer_total_return
, run the following query to
refresh and view the metadata for your materialized view:
SELECT * FROM example.example_schema."customer_total_return$imports"
Note
You must enclose <your_table_name>$imports
in quotes so that the query
parser handles the dollar sign correctly.
The resulting metadata table contains the following fields:
status
- Scheduled, Running, Finished, Failed, Timeoutmax_import_duration
- The value originally set in theCREATE AS
statementstart_time
- As computed from thecron
orrefresh_interval
finish_time
row_count
error
The metadata for a refresh is maintained until the effective grace period passes.
Table functions#
The connector provides specific table functions to access Hive.
UNLOAD#
The UNLOAD
SQL statement is a pre-built table function within the system
schema that writes files directly to storage. The files that UNLOAD
writes to
storage corresponds to what the input
parameter selects.
SELECT * FROM TABLE(system.unload(
input => TABLE(...) [PARTITION BY col (, ...)],
location => '',
format => ''
[, existing_directory => '']
[, compression => '']
[, separator => '']
))
Note
The input
, location
, and format
parameters are required. The
existing_directory
, compression
, and separator
parameters are optional.
The input
parameter can accept either a table name or a SELECT
query. The
separator
parameter is applicable only when the format
argument is set to
CSV
or TEXTFILE
. See object storage file
formats for more details.
The existing_directory
parameter defaults to CHECK
and throws an exception
if the specified location does not exist. Set existing_directory
to IGNORE
to create a new directory. Both CHECK
and IGNORE
fail if the location is not
empty.
Roles do not have access to the UNLOAD
function by default. To enable role
access, users must have the UNLOAD
function privilege
on the catalog, as well as the corresponding location
privilege on the object storage location where the output
is written.
Examples#
To unload the orderkey
, custkey
, and orderstatus
columns from the orders
table to the location s3://mybucket/my/unload/location
in ORC
file format:
SELECT
*
FROM
TABLE (
system.unload (
input => TABLE (
SELECT
orderkey,
custkey,
orderstatus
FROM
tpch.sf1.orders
)
PARTITION BY
(orderstatus),
location => 's3://mybucket/my/unload/location',
format => 'ORC'
)
)
An example of UNLOAD
using input => TABLE(tpch.sf1.orders)
:
SELECT
*
FROM
TABLE (
system.unload (
input => TABLE (tpch.sf1.orders)
PARTITION BY
(orderstatus),
location => 's3://mybucket/my/unload/location',
format => 'ORC'
)
)
An example of UNLOAD
with multiple partitions:
SELECT
*
FROM
TABLE (
system.unload (
input => TABLE (
SELECT
orderkey,
custkey,
orderdate,
orderstatus
FROM
tpch.sf1.orders
)
PARTITION BY
(orderdate, orderstatus),
location => 's3://mybucket/my/unload/location',
format => 'TEXTFILE',
compression => 'GZIP',
separator => '|'
)
)
Supported parameters#
Supported format
parameters:
ORC
PARQUET
AVRO
RCBINARY
RCTEXT
SEQUENCEFILE
JSON
OPENX_JSON
TEXTFILE
CSV
Supported compression
parameters:
NONE (default)
SNAPPY
LZ4
ZSTD
GZIP
Supported existing_directory
parameters:
CHECK (default)
IGNORE
Limitations#
Each format has its own set of constraints. The CSV
format exclusively
supports VARCHAR
columns and AVRO files do not permit special characters in
the column names.
Performance#
The connector includes a number of performance improvements, detailed in the following sections.
Table statistics#
The Hive connector supports collecting and managing table statistics to improve query processing performance.
When writing data, the Hive connector always collects basic statistics
(numFiles
, numRows
, rawDataSize
, totalSize
) and by default will also
collect column level statistics:
Column type |
Collectible statistics |
---|---|
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values |
|
Number of nulls, number of distinct values |
|
Number of nulls |
|
Number of nulls, number of true/false values |
Updating table and partition statistics#
If your queries are complex and include joining large data sets, running ANALYZE on tables/partitions may improve query performance by collecting statistical information about the data.
When analyzing a partitioned table, the partitions to analyze can be specified
via the optional partitions
property, which is an array containing the values
of the partition keys in the order they are declared in the table schema:
ANALYZE table_name WITH (
partitions = ARRAY[
ARRAY['p1_value1', 'p1_value2'],
ARRAY['p2_value1', 'p2_value2']])
This query will collect statistics for two partitions with keys
p1_value1, p1_value2
and p2_value1, p2_value2
.
On wide tables, collecting statistics for all columns can be expensive and can
have a detrimental effect on query planning. It is also typically unnecessary -
statistics are only useful on specific columns, like join keys, predicates,
grouping keys. One can specify a subset of columns to be analyzed via the
optional columns
property:
ANALYZE table_name WITH (
partitions = ARRAY[ARRAY['p2_value1', 'p2_value2']],
columns = ARRAY['col_1', 'col_2'])
This query collects statistics for columns col_1
and col_2
for the partition
with keys p2_value1, p2_value2
.
Note that if statistics were previously collected for all columns, they must be dropped before re-analyzing just a subset:
CALL system.drop_stats('schema_name', 'table_name')
You can also drop statistics for selected partitions only:
CALL system.drop_stats(
schema_name => 'schema',
table_name => 'table',
partition_values => ARRAY[ARRAY['p2_value1', 'p2_value2']])
Table redirection#
Trino offers the possibility to transparently redirect operations on an existing table to the appropriate catalog based on the format of the table and catalog configuration.
In the context of connectors which depend on a metastore service (for example, Hive connector, Iceberg connector and Delta Lake connector), the metastore (Hive metastore service, AWS Glue Data Catalog) can be used to accustom tables with different table formats. Therefore, a metastore database can hold a variety of tables with different table formats.
As a concrete example, let’s use the following simple scenario which makes use of table redirection:
USE example.example_schema;
EXPLAIN SELECT * FROM example_table;
Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
...
Output[columnNames = [...]]
│ ...
└─ TableScan[table = another_catalog:example_schema:example_table]
...
The output of the EXPLAIN
statement points out the actual
catalog which is handling the SELECT
query over the table example_table
.
The table redirection functionality works also when using fully qualified names for the tables:
EXPLAIN SELECT * FROM example.example_schema.example_table;
Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
...
Output[columnNames = [...]]
│ ...
└─ TableScan[table = another_catalog:example_schema:example_table]
...
Trino offers table redirection support for the following operations:
Table read operations
Table write operations
Table management operations
Trino does not offer view redirection support.
The connector supports redirection from Hive tables to Iceberg, Delta Lake, and Hudi tables with the following catalog configuration properties:
hive.iceberg-catalog-name
: Name of the catalog, configured with the Iceberg connector, to use for reading Iceberg tables.hive.delta-lake-catalog-name
: Name of the catalog, configured with the Delta Lake connector, to use for reading Delta Lake tables.hive.hudi-catalog-name
: Name of the catalog, configured with the Hudi connector, to use for reading Hudi tables.
File system cache#
The connector supports configuring and using file system caching.
Dynamic filtering#
The Hive connector supports the dynamic filtering optimization. Dynamic partition pruning is supported for partitioned tables stored in any file format for broadcast as well as partitioned joins. Dynamic bucket pruning is supported for bucketed tables stored in any file format for broadcast joins only.
For tables stored in ORC or Parquet file format, dynamic filters are also pushed into local table scan on worker nodes for broadcast joins. Dynamic filter predicates pushed into the ORC and Parquet readers are used to perform stripe or row-group pruning and save on disk I/O. Sorting the data within ORC or Parquet files by the columns used in join criteria significantly improves the effectiveness of stripe or row-group pruning. This is because grouping similar data within the same stripe or row-group greatly improves the selectivity of the min/max indexes maintained at stripe or row-group level.
Delaying execution for dynamic filters#
It can often be beneficial to wait for the collection of dynamic filters before starting a table scan. This extra wait time can potentially result in significant overall savings in query and CPU time, if dynamic filtering is able to reduce the amount of scanned data.
For the Hive connector, a table scan can be delayed for a configured amount of
time until the collection of dynamic filters by using the configuration property
hive.dynamic-filtering.wait-timeout
in the catalog file or the catalog session
property <hive-catalog>.dynamic_filtering_wait_timeout
.
Dynamic row filtering#
Dynamic filtering, and specifically also dynamic row filtering, is enabled by default. Row filtering improves the effectiveness of dynamic filtering for a connector by using dynamic filters to remove unnecessary rows during a table scan. It is especially powerful for selective filters on columns that are not used for partitioning, bucketing, or when the values do not appear in any clustered order naturally.
As a result the amount of data read from storage and transferred across the network is further reduced. You get access to higher query performance and a reduced cost.
You can use the following properties to configure dynamic row filtering:
Property name |
Description |
---|---|
|
Toggle dynamic row filtering. Defaults to |
|
Control the threshold for the fraction of the selected rows from the
overall table above which dynamic row filters are not used. Defaults to
0.7. Catalog session property name is
|
|
Duration to wait for completion of dynamic row filtering. Defaults to 0.
The default causes query processing to proceed without waiting for the
dynamic row filter, it is collected asynchronously and used as soon as
it becomes available. Catalog session property name is
|
Storage caching#
The connector supports the default storage caching. In addition, if HDFS Kerberos authentication is enabled in your catalog properties file with the following setting, caching takes the relevant permissions into account and operates accordingly:
hive.hdfs.authentication.type=KERBEROS
Additional configuration for Kerberos is required.
If HDFS Kerberos authentication is enabled, you can also enable user impersonation using:
hive.hdfs.impersonation.enabled=true
The service user assigned to SEP needs to be able to access data files in underlying storage. Access permissions are checked against impersonated user, yet with caching in place, some read operations happen in context of system user.
Any access control defined with the integration of Apache Ranger or the Privacera platform is also enforced by the storage caching.
Auth-to-local user mapping#
The connector supports auth-to-local mapping of the impersonated username during
HDFS access. This requires enabling HDFS impersonation and setting the
hive.hdfs.auth-to-local.config-file
property to a path containing a mapping
file in the format described in auth-to-local translations
file. You can configure regular refresh of the
configuration file with hive.hdfs.auth-to-local.refresh-period
.
Starburst Cached Views#
The connector supports Starburst Cached Views and can therefore be configured for table scan redirection and materialized views to improve performance.
Performance tuning configuration properties#
The following table describes performance tuning properties for the Hive connector.
Warning
Performance tuning configuration properties are considered expert-level features. Altering these properties from their default values is likely to cause instability and performance degradation.
Property name |
Description |
Default value |
---|---|---|
|
The target number of buffered splits for each table scan in a query, before the scheduler tries to pause. |
|
|
The maximum size allowed for buffered splits for each table scan in a query, before the query fails. |
|
|
The maximum number of splits generated per second per table scan. This can be used to reduce the load on the storage system. By default, there is no limit, which results in SEP maximizing the parallelization of data access. |
|
|
For each table scan, the coordinator first assigns file sections of up to
|
|
|
The size of a single file section assigned to a worker until
|
|
|
The largest size of a single file section assigned to a worker. Smaller splits result in more parallelism and thus can decrease latency, but also have more overhead and increase load on the system. |
|
Security#
The connector includes a number of security-related features, detailed in the following sections.
The connector supports different means of authentication for the used file system and metastore.
Built-in access control#
If you have enabled built-in access control for SEP, you must add the following configuration to all Hive catalogs:
hive.security=starburst
Microsoft Entra ID (formerly Azure Active Directory) credential pass-through#
To use Microsoft Entra ID with credential pass-through, you must include the following configuration in the config.properties file:
http-server.authentication.oauth2.scopes=https://storage.azure.com/user_impersonation,openid
http-server.authentication.oauth2.additional-audiences=https://storage.azure.com
If you use Microsoft Entra ID as the identity provider when you integrate with Azure Storage, you can reuse the Entra ID token to access Azure Storage blobs.
To reuse the Entra ID token, set the following access property in the catalog properties file:
# If using the native Azure filesystem support, instead set
# azure.use-oauth-passthrough-token=true
hive.azure.abfs.oauth2.passthrough=true
Enable the OAuth 2.0 token pass-through authentication type with the following configuration in Config properties:
http-server.authentication.type=DELEGATED-OAUTH2
You must grant the Azure application user_impersonation
API permissions for
Azure Storage. You can further read about it in the Azure
docs
To ensure blobs are accessible, read about Azure Storage access configuration: * Roles for the Azure Storage * Access control model in ADLS Gen2 * Managing ACLs in ADLS Gen2
For more information about DELEGATED-OAUTH2
, see
OAuth 2.0 token pass-through.
You can only set one type or group of access properties in a catalog properties file. Setting more than one prevents SEP from running properly. The valid access property combinations for the catalog properties file include the following:
hive.azure.abfs.oauth2.passthrough
.hive.azure.abfs-storage-account
andhive.azure.abfs-access-key
.hive.azure.abfs.oauth.endpoint
,hive.azure.abfs.oauth.client-id
, andhive.azure.abfs.oauth.secret
.
HDFS permissions#
Before running any CREATE TABLE
or CREATE TABLE ... AS
statements for Hive
tables in SEP, you need to check that the operating system user running the
SEP server has access to the Hive warehouse directory on HDFS.
The Hive warehouse directory is specified by the configuration variable
hive.metastore.warehouse.dir
in hive-site.xml
, and the default value is
/user/hive/warehouse
. If that is not the case, either add the following to
jvm.config
on all of the nodes: -DHADOOP_USER_NAME=USER
, where USER
is an
operating system user that has proper permissions for the Hive warehouse
directory, or start the SEP server as a user with similar permissions. The
hive
user generally works as USER
, since Hive is often started with the
hive
user. If you run into HDFS permissions problems on CREATE TABLE ... AS
,
remove /tmp/presto-*
on HDFS, fix the user as described above, then restart
all of the SEP servers.
LDAP user translation#
The connector supports LDAP-based user translation with a HMS metastore and/or HDFS object storage system.
When using LDAP-based user translation, you must configure the appropriate prefix for the service you’re connecting to.
For a HMS metastore, the hive.metastore.thrift.impersonation.enabled
catalog
configuration property must be set to true
. For a HDFS object storage system,
the hive.hdfs.impersonation.enabled
catalog configuration property must be set
to true
.
Limitations#
For security reasons, the
sys
system catalog is not accessible.Hive’s
timestamp with local zone
data type is not supported in SEP. It is possible to read from a table having a column of this type, but the column itself will not be accessible. Writing to such a table is not supported.Due to Hive issues HIVE-21002 and HIVE-22167, Trino does not correctly read
TIMESTAMP
values from Parquet, RCBinary, or Avro file formats created by Hive 3.1 or later. When reading from these file formats, Trino returns different results than Hive.Trino does not support gathering table statistics for Hive transactional tables. You must use Hive to gather table statistics with ANALYZE statement after table creation.
Reading ORC ACID tables created with Hive Streaming ingest is not supported.
Redirections are supported for Hive tables but not Hive views.
Materialized views and HMS/HDFS impersonation cannot be enabled for the same catalog.