Starburst Galaxy

  • Starburst Galaxy Home
  •   Get started
  •   Global features
  • Help center
  • Release notes
  • Feature release types

  • Starburst Galaxy UI
  •   Query
  •   Catalogs
  •   Catalog explorer
  •   Data products
  •   Clusters
  • Partner connect
  •   Admin
  •   Access control
  •   Cloud settings

  • Administration
  •   Security
  •   Single sign-on
  •   Troubleshooting
  • Galaxy status

  • Reference
  •   Python
  • API
  •   SQL
  •   Tutorials
  • Delta Lake table format #

    Great Lakes connectivity abstracts the details of using different table formats and file types when using object storage catalogs.

    This page describes the features specific to the Delta Lake table format when used with Great Lakes connectivity.

    Specify Delta Lake format #

    Delta Lake connectivity allows querying data stored in Delta Lake tables, including Databricks Delta Lake. Delta Lake tables are automatically detected and read based on information in the storage environment’s associated metadata.

    For an object storage catalog that specifies Delta Lake as its default table format, no special syntax is required. A simple CREATE TABLE statement creates a Delta Lake format table.

    There is no format parameter for Delta Lake. To create a Delta Lake table when the default format is not Delta Lake, add type='delta' as a table property in your CREATE statement. For example:

    CREATE TABLE galaxy_delta.default.my_partitioned_table
    WITH (
      type = 'delta',
      location = 's3://my-bucket/at/this/path',
      partitioned_by = ARRAY['regionkey'],
      checkpoint_interval = 5
    )
    AS
      SELECT name, comment, regionkey
      FROM tpch.tiny.nation;
    

    Table properties #

    As part of a CREATE TABLE statement, append further comma-separated table properties as needed.

    Table property Description
    partitioned_by Specifies table partitioning. If a table is partitioned by column regionkey, this property is set with partitioned_by=ARRAY['regionkey'].
    location Specifies the file system location URI for the table.
    checkpoint_interval Specifies the checkpoint interval in seconds.
    change_data_feed_enabled Enables storing change data feed entries.

    Delta Lake metadata table #

    Great Lakes connectivity exposes several metadata tables for the Delta Lake table format. These metadata tables contain information about the internal structure of the Delta Lake table. Query each metadata table by appending the metadata table name to the table_name:

    SELECT * FROM catalog_name.schema_name."table_name$history";
    

    $history #

    The $history table provides a log of the metadata changes made on the table.

    Retrieve the changelog of the Delta Lake table table_name by using the following query statement:

    SELECT * FROM catalog_name.schema_name."table_name$history";
    

    The following table describes the table columns of the $history table query output:

    $history columns
    Name Type Description
    version BIGINT The version of the table corresponding to the operation.
    timestamp TIMESTAMP(3) WITH TIME ZONE The time when the table version became active.
    user_id VARCHAR The identifier for the user that performed the operation.
    user_name VARCHAR The username for the user that performed the operation.
    operation VARCHAR The name of the operation performed on the table.
    operation_parameters map(VARCHAR, VARCHAR) Parameters of the operation.
    cluster_id VARCHAR The ID of the cluster that ran the operation.
    read_version BIGINT The version of the table which was read in order to perform the operation.
    isolation_level VARCHAR The level of isolation used to perform the operation.
    is_blind_append BOOLEAN Whether or not the operation appended data.

    Table procedures #

    Use the CALL statement to perform data manipulation or administrative tasks. Procedures must include a qualified catalog name. For example, for a catalog named galaxy_delta:

    CALL galaxy_delta.system.example_procedure();
    

    The following Delta Lake table procedures are available:

    drop_extended_stats #

    system.drop_extended_stats('schema_name', 'table_name');
    

    Drops the extended statistics for a specified table in a specified schema.

    register_table #

    system.register_table('schema_name', 'table_name', 's3://bucket-name/lakehouse/data/table_name_location');
    

    Allows the caller to register an existing Delta Lake table in the metastore, using its existing transaction logs and data files.

    unregister_table #

    system.unregister_table('schema_name', 'table_name');
    

    Allows the caller to unregister an existing Delta Lake table from the metastore without deleting the data.

    vacuum #

    system.vacuum('schema_name', 'table_name', 'retention_period');
    

    Specify retention_period with standard SQL time period values such as '7d'.

    The vacuum procedure removes all old files that are not in the transaction log. Additionally, it removes files that are not necessary for read table snapshots newer than the current time minus the retention period defined by the retention period parameter.

    Users with INSERT and DELETE permissions on a table can run vacuum as follows:

    All parameters are required and must be presented in the following order:

    1. Schema name
    2. Table name
    3. Retention period

    ALTER TABLE EXECUTE #

    The ALTER TABLE EXECUTE statement followed by a command and parameters modifies the table according to the specified procedure and parameters. For Delta Lake tables, ALTER TABLE EXECUTE supports the optimize command.

    Use the => operator for passing named parameter values. The left side is the name of the parameter and the right side is the value being passed:

    ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize(file_size_threshold => '10MB');
    

    optimize #

    The optimize procedure improves read performance by rewriting the content of a specific table to reduce the number of files. This process increases file size. If the table is partitioned, the data compaction acts separately on each partition selected for optimization.

    All files with a size below the optional file_size_threshold parameter are merged. The default value is 100MB.

    ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize;
    

    The following statement merges files in a table that are under the 10MB file size threshold:

    ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize(file_size_threshold => '10MB');
    

    Optimize specific partitions using a WHERE clause with the columns used as partition_key value.

    ALTER TABLE galaxy_delta.schema_name.table_name EXECUTE optimize
    WHERE partition_key = 1;
    

    Table functions #

    Your active role set must have the following privileges:

    In the navigation menu, assign the following privileges:

    Create table privileges:

    If some of the following steps are not possible after you select the role to change, that role may already have the Create table privilege.

    1. Click Access control > Roles and privileges.
    2. Click the name of the role you want to change.
    3. Click Privileges > Add privilege > Table.
    4. Click Schema.
    5. From the drop-down menu, select the catalog of interest.
    6. In the next section, either select All schemas or from the drop-down menu, select the name of the schema of interest.
    7. Select Allow.
    8. In What can they do, select Create table.
    9. Click Add privilege.

    Select create table privilege

    table_changes function privilege:

    1. Click Access control > Roles and privileges.
    2. Click the name of the role you want to change.
    3. Click Privileges > Add privilege > Function.
    4. From the drop-down menu, select the catalog of interest.
    5. Select Execute table_changes table function.
    6. Click Add privilege.

    Select create table privilege

    For more details, read about assigning privileges to roles.

    table_changes #

    The table_changes function allows reading Change Data Feed (CDF) entries to expose row-level changes between two versions of a Delta Lake table.

    When the change_data_feed_enabled table property is set to true on a specific Delta Lake table, Great Lakes connectivity records change events for all data changes made on the table after change_data_feed_enabled was enabled. This includes the row data and metadata, which indicates whether the specified row was inserted, deleted, or updated.

    The following example displays the changes made on the table:

    SELECT * FROM TABLE( catalog_name.system.table_changes( 'schema_name','table_name', since_version => 0 ));
    

    The optional since_version value is the version from which changes are shown. If a value is not provided, the function produces change events starting from when the table was created. The default value is 0.

    In addition to returning the columns present in the table, the table_changesfunction returns the following values for each change event:

    Name Description
    _change_type Gives the type of change that occurred. Possible values are insert, delete, update_preimage , and update_postimage.
    _commit_version Shows the table version for which the change occurred.
    _commit_timestamp Represents the timestamp for the commit in which the specified change happened.

    The commit versions can also be retrieved from the $history metadata table.

    The following SQL statements are examples of using the table_change function:

    -- CREATE TABLE with type Delta and Change Data Feed enabled:
    CREATE TABLE catalog_name.schema_name.table_name (page_url VARCHAR, domain VARCHAR, views INTEGER)
      WITH (type='delta', change_data_feed_enabled = true);
    
    -- INSERT data:
    INSERT INTO catalog_name.schema_name.table_name
      VALUES ('url1', 'domain1', 1), ('url2', 'domain2', 2), ('url3', 'domain1', 3);
    
    INSERT INTO catalog_name.schema_name.table_name
      VALUES ('url4', 'domain1', 400), ('url5', 'domain2', 500), ('url6', 'domain3', 2);
    
    -- UPDATE and modify data:
    UPDATE catalog_name.schema_name.table_name SET domain = 'domain4'
      WHERE views = 2;
    
    -- SELECT and view table changes:
    SELECT * FROM TABLE( system.table_changes('test_schema', 'pages', 1))
      ORDER BY _commit_version ASC;
    

    The following output from the previous SQL statements shows what changes happened in which version:

    page_url domain views _commit_type _commit_version _commit_timestamp
    url4 domain1 400 insert 2 2023-03-10T21:22:23.000+0000
    url5 domain2 500 insert 2 2023-03-10T21:22:23.000+0000
    url6 domain3 2 insert 2 2023-03-10T21:22:23.000+0000
    url2 domain2 2 update_preimage 3 2023-03-10T22:23:24.000+0000
    url2 domain4 2 update_postimage 3 2023-03-10T22:23:24.000+0000
    url6 domain3 2 update_preimage 3 2023-03-10T22:23:24.000+0000
    url6 domain4 2 update_postimage 3 2023-03-10T22:23:24.000+0000

    Session properties #

    A session property temporarily modifies an internal Starburst Galaxy setting for the duration of the current connection session to the cluster. Use the SET SESSION statement followed by the property name, then a value such as true or false to modify the property:

    SET SESSION catalog_name.session_property = expression;
    

    Use the SHOW SESSION statement to view all current session properties. For additional information, read about the SET SESSION, and RESET SESSION SQL statements.

    Catalog session properties are internal session properties that can be set on a per-catalog basis. These properties must be set separately for each catalog by including the catalog name before the property name, for example, catalog_name.property_name.

    Session properties are linked to the current session, which lets you have multiple connections to a cluster that each have different values for the same session properties. Once a session ends, either by disconnecting or creating a new session, any changes made to session properties during the previous session are lost.

    The following sections describe the session properties supported by Delta Lake table type:

    compression_codec #

    SET SESSION catalog_name.compression_codec = 'SNAPPY';
    

    The compression codec is used when writing new data files. The possible values are:

    • NONE
    • SNAPPY
    • ZSTD
    • GZIP

    The default value is SNAPPY.

    dynamic_filtering_wait_timeout #

    SET SESSION catalog_name.dynamic_filtering_wait_timeout = '10s';
    

    The duration to wait for completion of dynamic filtering during split generation. Dynamic filtering optimizations significantly improve the performance of queries with selective joins by avoiding reading of data that would be filtered by the join conditions. In the previous example, the dynamic_filtering_wait_timeout is set to 10 seconds.

    extended_statistics_enabled #

    SET SESSION catalog_name.extended_statistics_enabled = true;
    

    Enables statistics collection with the ANALYZE statement and the use of extended statistics. The default value is true. The statistics_enabled session property must be set to true. Otherwise, the collection of any statistics is disabled.

    Use the ANALYZE statement to populate data size and number of distinct values (NDV) extended table statistics in Delta Lake. The minimum value, maximum value, value count, and null value count statistics are computed on the fly out of the transaction log of the Delta Lake table.

    extended_statistics_collect_on_write #

    SET SESSION catalog_name.extended_statistics_collect_on_write = true;
    

    Enable collection of extended statistics for write operations. The default value is true. The statistics_enabled session property must be set to true. Otherwise, the collection of any statistics is disabled.

    legacy_create_table_with_existing_location_enabled #

    SET SESSION catalog_name.legacy_create_table_with_existing_location_enabled = true;
    

    Enable using the CREATE TABLE statement to register an existing table. The default value is true.

    max_initial_split_size #

    SET SESSION catalog_name.max_initial_split_size = '32MB';
    

    Sets the initial data size for a single read section assigned to a worker. The default value is 32MB.

    max_split_size #

    SET SESSION catalog_name.max_split_size = '64MB';
    

    Sets the largest data size for a single read section assigned to a worker. The default value is 64MB.

    parquet_max_read_block_size #

    SET SESSION catalog_name.parquet_max_read_block_size = '16MB';
    

    The maximum block size used when reading Parquet files. The default value is 16MB.

    parquet_max_read_block_row_count #

    SET SESSION catalog_name.parquet_max_read_block_row_count = 8192;
    

    Sets the maximum number of rows read in a batch. The default value is 8192.

    parquet_native_snappy_decompressor_enabled #

    SET SESSION catalog_name.parquet_native_snappy_decompressor_enabled = true;
    

    Enable using native SNAPPY library for faster decompression of Parquet files. The default value is true.

    parquet_native_zstd_decompressor_enabled #

    SET SESSION catalog_name.parquet_native_zstd_decompressor_enabled = true;
    

    Enable using native ZSTD library for faster decompression of Parquet files. The default value is true.

    parquet_optimized_nested_reader_enabled #

    SET SESSION catalog_name.parquet_optimized_nested_reader_enabled = true;
    

    Specifies whether batched column readers are used when reading ARRAY, MAP, and ROW types from Parquet files for improved performance. Set this property to false to disable the optimized Parquet reader for structural data types. The default value is true.

    parquet_optimized_reader_enabled #

    SET SESSION catalog_name.parquet_optimized_reader_enabled = true;
    

    Specifies whether batched column readers are used when reading Parquet files for improved performance. The default value is true.

    parquet_use_column_index #

    SET SESSION catalog_name.parquet_use_column_index = true;
    

    Skip reading Parquet pages by using Parquet column indices. The default value is true.

    parquet_vectorized_decoding_enabled #

    SET SESSION catalog_name.parquet_use_column_names = true;
    

    Enable the use of the Java Vector API for faster decoding of Parquet files. The default value is true.

    parquet_writer_block_size #

    SET SESSION catalog_name.parquet_writer_block_size = '128MB';
    

    The maximum block size created by the Parquet writer. The default value is 128MB.

    parquet_writer_page_size #

    SET SESSION catalog_name.parquet_writer_page_size = '1MB';
    

    The maximum page size created by the Parquet writer. The default value is 1MB.

    parquet_writer_batch_size #

    SET SESSION catalog_name.parquet_writer_batch_size = 10000;
    

    Maximum number of rows processed by the Parquet writer in a batch. The default value is 10000.

    projection_pushdown_enabled #

    SET SESSION catalog_name.projection_pushdown_enabled = true;
    

    Read only projected fields, from row columns while performing SELECT queries. The default value is true.

    statistics_enabled #

    SET SESSION catalog_name.statistics_enabled = true;
    

    Enables table statistics for performance improvements. The default value is true. If the value is set to false the following session properties are disabled:

    target_max_file_size #

    SET SESSION catalog_name.target_max_file_size = '1GB';
    

    Target maximum size of written files. The default value is 1GB.

    timestamp_precision #

    SET SESSION catalog_name.timestamp_precision = 'MILLISECONDS';
    

    Specifies the precision for the timestamp columns in Hive tables. The possible values are:

    • MILLISECONDS
    • MICROSECONDS
    • NANOSECONDS

    The default value is MILLISECONDS.

    vacuum_min_retention #

    SET SESSION catalog_name.vacuum_min_retention = '7d';
    

    The minimum retention threshold for the files taken into account for removal by the vacuum procedure. The default value is 7d.

    Delta Lake SQL support #

    When using the Delta Lake table format with Great Lakes connectivity, the general SQL support details apply.