Delta Lake table format #
Great Lakes connectivity abstracts the details of using different table formats and file types when using object storage catalogs.
This page describes the features specific to the Delta Lake table format when used with Great Lakes connectivity.
Specify Delta Lake format #
Delta Lake connectivity allows querying data stored in Delta Lake tables, including Databricks Delta Lake. Delta Lake tables are automatically detected and read based on information in the storage environment’s associated metadata.
For an object storage catalog that specifies Delta Lake as its default table
format, no special syntax
is required. A simple CREATE TABLE
statement creates a Delta Lake format
table.
There is no format
parameter for Delta Lake. To create a Delta Lake table when
the default format is not Delta Lake, add type='delta'
as a table property in
your CREATE
statement. For example:
CREATE TABLE galaxy_delta.default.my_partitioned_table
WITH (
type = 'delta',
location = 's3://my-bucket/at/this/path',
partitioned_by = ARRAY['regionkey'],
checkpoint_interval = 5
)
AS
SELECT name, comment, regionkey
FROM tpch.tiny.nation;
Table properties #
As part of a CREATE TABLE
statement, append further comma-separated table
properties as needed.
Table property | Description |
---|---|
partitioned_by |
Specifies table partitioning. If a table is partitioned by
column regionkey , this property is set with
partitioned_by=ARRAY['regionkey'] . |
location |
Specifies the file system location URI for the table. |
checkpoint_interval |
Specifies the checkpoint interval in seconds. |
change_data_feed_enabled |
Enables storing change data feed entries. |
Delta Lake metadata table #
Great Lakes connectivity exposes several metadata tables
for the Delta Lake table format. These metadata tables contain information about
the internal structure of the Delta Lake table. Query each metadata table by
appending the metadata table name to the table_name
:
SELECT * FROM catalog_name.schema_name."table_name$history";
$history #
The $history
table provides a log of the metadata changes made on the table.
Retrieve the changelog of the Delta Lake table table_name
by using the
following query statement:
SELECT * FROM catalog_name.schema_name."table_name$history";
The following table describes the table columns of the $history
table query
output:
Name | Type | Description |
---|---|---|
version |
BIGINT | The version of the table corresponding to the operation. |
timestamp |
TIMESTAMP(3) WITH TIME ZONE | The time when the table version became active. |
user_id |
VARCHAR | The identifier for the user that performed the operation. |
user_name |
VARCHAR | The username for the user that performed the operation. |
operation |
VARCHAR | The name of the operation performed on the table. |
operation_parameters |
map(VARCHAR, VARCHAR) | Parameters of the operation. |
cluster_id |
VARCHAR | The ID of the cluster that ran the operation. |
read_version |
BIGINT | The version of the table which was read in order to perform the operation. |
isolation_level |
VARCHAR | The level of isolation used to perform the operation. |
is_blind_append |
BOOLEAN | Whether or not the operation appended data. |
Table procedures #
Use the CALL
statement to perform data manipulation or administrative tasks.
Procedures must include a qualified catalog name. For example, for a catalog
named galaxy_delta
:
CALL galaxy_delta.system.example_procedure();
The following Delta Lake table procedures are available:
drop_extended_stats #
system.drop_extended_stats('schema_name', 'table_name');
Drops the extended statistics for a specified table in a specified schema.
register_table #
system.register_table('schema_name', 'table_name', 's3://bucket-name/lakehouse/data/table_name_location');
Allows the caller to register an existing Delta Lake table in the metastore, using its existing transaction logs and data files.
unregister_table #
system.unregister_table('schema_name', 'table_name');
Allows the caller to unregister an existing Delta Lake table from the metastore without deleting the data.
vacuum #
system.vacuum('schema_name', 'table_name', 'retention_period');
Specify retention_period
with standard SQL time period values such as '7d'
.
The vacuum
procedure removes all old files that are not in the transaction
log. Additionally, it removes files that are not necessary for read table
snapshots newer than the current time minus the retention period defined by the
retention period parameter.
Users with INSERT
and DELETE
permissions on a table can run vacuum
as
follows:
All parameters are required and must be presented in the following order:
- Schema name
- Table name
- Retention period
ALTER TABLE EXECUTE #
The ALTER TABLE EXECUTE statement followed by a
command and parameters modifies the table according to the specified procedure
and parameters. For Delta Lake tables, ALTER TABLE EXECUTE
supports the
optimize
command.
Use the =>
operator for passing named parameter values. The left side is
the name of the parameter and the right side is the value being passed:
ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize(file_size_threshold => '10MB');
optimize #
The optimize
procedure improves read performance by rewriting the content of a
specific table to reduce the number of files. This process increases file size.
If the table is partitioned, the data compaction acts separately on each
partition selected for optimization.
All files with a size below the optional file_size_threshold
parameter are
merged. The default value is 100MB
.
ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize;
The following statement merges files in a table that are under the 10MB
file
size threshold:
ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize(file_size_threshold => '10MB');
Optimize specific partitions using a WHERE
clause with the columns used as
partition_key
value.
ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize
WHERE partition_key = 1;
Table functions #
Your active role set must have the following privileges:
- Create a table for a schema.
- Execute the
table_changes
table function.
In the navigation menu, assign the following privileges:
Create table privileges:
If some of the following steps are not possible after you select the role to change, that role may already have the Create table privilege.
- Click Access control > Roles and privileges.
- Click the name of the role you want to change.
- Click Privileges > Add privilege > Table.
- Click Schema.
- From the drop-down menu, select the catalog of interest.
- In the next section, either select All schemas or from the drop-down menu, select the name of the schema of interest.
- Select Allow.
- In What can they do, select Create table.
- Click Add privilege.
table_changes
function privilege:
- Click Access control > Roles and privileges.
- Click the name of the role you want to change.
- Click Privileges > Add privilege > Function.
- From the drop-down menu, select the catalog of interest.
- Select Execute table_changes table function.
- Click Add privilege.
For more details, read about assigning privileges to roles.
table_changes #
The table_changes
function allows reading Change Data Feed (CDF) entries to
expose row-level changes between two versions of a Delta Lake table.
When the change_data_feed_enabled
table property is set to true
on a
specific Delta Lake table, Great Lakes connectivity records change
events for all data changes made on the table after change_data_feed_enabled
was enabled. This includes the row data and metadata, which indicates whether
the specified row was inserted, deleted, or updated.
The following example displays the changes made on the table:
SELECT * FROM TABLE( catalog_name.system.table_changes( 'schema_name','table_name', since_version => 0 ));
The optional since_version
value is the version from which changes are shown.
If a value is not provided, the function produces change events starting from
when the table was created. The default value is 0
.
In addition to returning the columns present in the table, the
table_changes
function returns the following values for each change event:
Name | Description |
---|---|
_change_type |
Gives the type of change that occurred. Possible values are
insert , delete , update_preimage
, and update_postimage . |
_commit_version |
Shows the table version for which the change occurred. |
_commit_timestamp |
Represents the timestamp for the commit in which the specified change happened. |
The commit versions can also be retrieved from the $history
metadata table.
The following SQL statements are examples of using the table_change
function:
-- CREATE TABLE with type Delta and Change Data Feed enabled:
CREATE TABLE catalog_name.schema_name.table_name (page_url VARCHAR, domain VARCHAR, views INTEGER)
WITH (type='delta', change_data_feed_enabled = true);
-- INSERT data:
INSERT INTO catalog_name.schema_name.table_name
VALUES ('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain1', 3);
INSERT INTO catalog_name.schema_name.table_name
VALUES ('url4', 'domain1', 400), ('url5', 'domain2', 500), ('url6', 'domain3', 2);
-- UPDATE and modify data:
UPDATE catalog_name.schema_name.table_name SET domain = 'domain4'
WHERE views = 2;
-- SELECT and view table changes:
SELECT * FROM TABLE( system.table_changes('test_schema', 'pages', 1))
ORDER BY _commit_version ASC;
The following output from the previous SQL statements shows what changes happened in which version:
page_url | domain | views | _commit_type | _commit_version | _commit_timestamp |
---|---|---|---|---|---|
url4 | domain1 | 400 | insert | 2 | 2023-03-10T21:22:23.000+0000 |
url5 | domain2 | 500 | insert | 2 | 2023-03-10T21:22:23.000+0000 |
url6 | domain3 | 2 | insert | 2 | 2023-03-10T21:22:23.000+0000 |
url2 | domain2 | 2 | update_preimage | 3 | 2023-03-10T22:23:24.000+0000 |
url2 | domain4 | 2 | update_postimage | 3 | 2023-03-10T22:23:24.000+0000 |
url6 | domain3 | 2 | update_preimage | 3 | 2023-03-10T22:23:24.000+0000 |
url6 | domain4 | 2 | update_postimage | 3 | 2023-03-10T22:23:24.000+0000 |
Session properties #
A session property temporarily modifies an
internal Starburst Galaxy setting for the duration of the current connection
session to the cluster. Use the SET SESSION
statement followed by the property
name, then a value such as true
or false
to modify the property:
SET SESSION catalog_name.session_property = expression;
Use the SHOW SESSION statement to view all current session properties. For additional information, read about the SET SESSION, and RESET SESSION SQL statements.
Catalog session properties are internal session properties that can
be set on a per-catalog basis. These properties must be set separately for each
catalog by including the catalog name before the property name, for example,
catalog_name.property_name
.
Session properties are linked to the current session, which lets you have multiple connections to a cluster that each have different values for the same session properties. Once a session ends, either by disconnecting or creating a new session, any changes made to session properties during the previous session are lost.
The following sections describe the session properties supported by Delta Lake table type:
compression_codec #
SET SESSION catalog_name.compression_codec = 'SNAPPY';
The compression codec is used when writing new data files. The possible values are:
NONE
SNAPPY
ZSTD
GZIP
The default value is SNAPPY
.
dynamic_filtering_wait_timeout #
SET SESSION catalog_name.dynamic_filtering_wait_timeout = '10s';
The duration to wait for completion of dynamic filtering during split
generation. Dynamic filtering optimizations significantly improve the
performance of queries with selective joins by avoiding reading of data that
would be filtered by the join conditions. In the previous example, the
dynamic_filtering_wait_timeout
is set to 10
seconds.
extended_statistics_enabled #
SET SESSION catalog_name.extended_statistics_enabled = true;
Enables statistics collection with the ANALYZE
statement and the use of extended statistics. The default value is true
. The
statistics_enabled session property must be set to
true
. Otherwise, the collection of any statistics is disabled.
Use the ANALYZE
statement to populate data size and number of distinct values
(NDV) extended table statistics in Delta Lake. The minimum value, maximum value,
value count, and null value count statistics are computed on the fly out of the
transaction log of the Delta Lake table.
extended_statistics_collect_on_write #
SET SESSION catalog_name.extended_statistics_collect_on_write = true;
Enable collection of extended statistics for write operations. The default value
is true
. The statistics_enabled session property must
be set to true
. Otherwise, the collection of any statistics is disabled.
legacy_create_table_with_existing_location_enabled #
SET SESSION catalog_name.legacy_create_table_with_existing_location_enabled = true;
Enable using the CREATE TABLE statement to
register an existing table. The default value is true
.
max_initial_split_size #
SET SESSION catalog_name.max_initial_split_size = '32MB';
Sets the initial data size for a single read section assigned to a worker. The
default value is 32MB
.
max_split_size #
SET SESSION catalog_name.max_split_size = '64MB';
Sets the largest data size for a single read section assigned to a worker. The
default value is 64MB
.
parquet_max_read_block_size #
SET SESSION catalog_name.parquet_max_read_block_size = '16MB';
The maximum block size used when reading Parquet files. The default value is
16MB
.
parquet_max_read_block_row_count #
SET SESSION catalog_name.parquet_max_read_block_row_count = 8192;
Sets the maximum number of rows read in a batch. The default value is 8192
.
parquet_native_snappy_decompressor_enabled #
SET SESSION catalog_name.parquet_native_snappy_decompressor_enabled = true;
Enable using native SNAPPY
library for faster decompression of Parquet files.
The default value is true
.
parquet_native_zstd_decompressor_enabled #
SET SESSION catalog_name.parquet_native_zstd_decompressor_enabled = true;
Enable using native ZSTD
library for faster decompression of Parquet files.
The default value is true
.
parquet_optimized_nested_reader_enabled #
SET SESSION catalog_name.parquet_optimized_nested_reader_enabled = true;
Specifies whether batched column readers are used when reading ARRAY
, MAP
,
and ROW
types from Parquet files for improved performance. Set this property
to false
to disable the optimized Parquet reader for structural data types.
The default value is true
.
parquet_optimized_reader_enabled #
SET SESSION catalog_name.parquet_optimized_reader_enabled = true;
Specifies whether batched column readers are used when reading Parquet files for
improved performance. The default value is true
.
parquet_use_column_index #
SET SESSION catalog_name.parquet_use_column_index = true;
Skip reading Parquet pages by using Parquet column indices. The default value is
true
.
parquet_vectorized_decoding_enabled #
SET SESSION catalog_name.parquet_use_column_names = true;
Enable the use of the Java Vector API for faster decoding of Parquet files. The
default value is true
.
parquet_writer_block_size #
SET SESSION catalog_name.parquet_writer_block_size = '128MB';
The maximum block size created by the Parquet writer. The default value is
128MB
.
parquet_writer_page_size #
SET SESSION catalog_name.parquet_writer_page_size = '1MB';
The maximum page size created by the Parquet writer. The default value is 1MB
.
parquet_writer_batch_size #
SET SESSION catalog_name.parquet_writer_batch_size = 10000;
Maximum number of rows processed by the Parquet writer in a batch. The default
value is 10000
.
projection_pushdown_enabled #
SET SESSION catalog_name.projection_pushdown_enabled = true;
Read only projected fields, from row columns while performing SELECT
queries.
The default value is true
.
statistics_enabled #
SET SESSION catalog_name.statistics_enabled = true;
Enables table statistics for performance improvements. The default value is
true
. If the value is set to false
the following session properties are
disabled:
target_max_file_size #
SET SESSION catalog_name.target_max_file_size = '1GB';
Target maximum size of written files. The default value is 1GB
.
timestamp_precision #
SET SESSION catalog_name.timestamp_precision = 'MILLISECONDS';
Specifies the precision for the timestamp columns in Hive tables. The possible values are:
MILLISECONDS
MICROSECONDS
NANOSECONDS
The default value is MILLISECONDS
.
vacuum_min_retention #
SET SESSION catalog_name.vacuum_min_retention = '7d';
The minimum retention threshold for the files taken into account for removal by
the vacuum procedure. The default value is 7d
.
Delta Lake SQL support #
When using the Delta Lake table format with Great Lakes connectivity, the general SQL support details apply.
Is the information on this page helpful?
Yes
No
- Delta Lake table format
- Specify Delta Lake format
- Delta Lake metadata table
- Table procedures
- Table functions
- Session properties
- compression_codec
- dynamic_filtering_wait_timeout
- extended_statistics_enabled
- extended_statistics_collect_on_write
- legacy_create_table_with_existing_location_enabled
- max_initial_split_size
- max_split_size
- parquet_max_read_block_size
- parquet_max_read_block_row_count
- parquet_native_snappy_decompressor_enabled
- parquet_native_zstd_decompressor_enabled
- parquet_optimized_nested_reader_enabled
- parquet_optimized_reader_enabled
- parquet_use_column_index
- parquet_vectorized_decoding_enabled
- parquet_writer_block_size
- parquet_writer_page_size
- parquet_writer_batch_size
- projection_pushdown_enabled
- statistics_enabled
- target_max_file_size
- timestamp_precision
- vacuum_min_retention
- Delta Lake SQL support
Is the information on this page helpful?
Yes
No