Starburst Galaxy

  •  Get started

  •  Working with data

  •  Data engineering

  •  Developer tools

  •  Cluster administration

  •  Security and compliance

  •  Troubleshooting

  • Galaxy status

  •  Reference

  • Iceberg table format #

    Great Lakes connectivity abstracts the details of using different table formats and file types when using object storage catalogs.

    This page describes the features specific to the Iceberg table format when used with Great Lakes connectivity.

    Specify Iceberg format #

    Apache Iceberg is an open table format for huge analytic datasets. Starburst Galaxy allows querying data stored in files written in the Iceberg format, as defined in the Iceberg Table Spec. Iceberg tables are automatically detected and read based on information in the storage environment’s associated metadata.

    For an object storage catalog that specifies Iceberg as its default table format, no special syntax is required. A simple CREATE TABLE statement creates an Iceberg format table.

    To create an Iceberg table when the default format is not Iceberg, add type='iceberg' as a table property in your CREATE statement. For example:

    CREATE TABLE customer (
        name varchar,
        address varchar)
    WITH (type='iceberg');
    

    Specify the file format of table data files with an additional format property with value either parquet or orc, defaulting to parquet:

    WITH (type='iceberg',
          format='parquet');
    

    Iceberg table properties #

    In your CREATE TABLE statements, append further comma-separated table properties as needed:

    Table property Description
    format Specifies the format of table data files: either PARQUET, ORC, or AVRO. Defaults to ORC.
    partitioning Specifies table partitioning. If a table is partitioned by columns c1 and c2, this property setting is partitioning=ARRAY['c1', 'c2'].
    location Specifies the file system location URI for the table.
    format_version Specifies the format version of the Iceberg specification, either 1 or 2. For example format_version=1.
    orc_bloom_filter_columns Comma-separated list of columns to use for ORC bloom filter. Improves the performance of queries using equality and IN predicates when reading ORC files. Requires ORC format. Defaults to [].
    orc_bloom_filter_fpp The ORC bloom filters false positive probability. Requires ORC format. Defaults to 0.05.

    Iceberg metadata tables #

    Metadata tables reveal information about the internal structure of an Iceberg table. Each Iceberg table has the following metadata tables:

    Metadata table Description
    $properties Provides general configuration information including key-value tags.
    $history Provides a log of the metadata changes made to the table.
    $snapshots Provides a detailed view of snapshots of the table.
    $manifests Provides a detailed view of the manifests corresponding to the snapshots of the table.
    $partitions Provides a detailed view of the partitions of the table.
    $files Provides a detailed view of the data files in current snapshot of the table.
    $refs Provides information about the references of a table, including branches and tags.

    Query a metadata table by appending the metadata table name to a table name with a $ separator:

    SELECT * FROM "customer$history";
    

    Table procedures #

    Use the CALL statement to perform data manipulation or administrative tasks. Procedures must include a qualified catalog name. For example, for a catalog named examplecatalog:

    CALL examplecatalog.system.example_procedure();
    

    The following Iceberg table procedures are available:

    register_table #

    system.register_table('schema_name', 'table_name','s3://bucket-name/lakehouse/data/table_name_location');
    

    Allows the caller to register an existing Iceberg table in the metastore, using its existing metadata and data files.

    The table_location must be a complete URI, as the previous example shows.

    Provide a file name to register a table with specific metadata. In addition, this procedure can be used to register the table with a specific table state, or may be necessary if Starburst Galaxy cannot automatically determine which metadata version to use:

    system.register_table('schema_name', 'table_name', 's3://bucket-name/lakehouse/data/table_name_location', '00003-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json');
    

    unregister_table #

    system.unregister_table('schema_name', 'table_name');
    

    Allows the caller to unregister an existing Iceberg table from the metastores without deleting the data.

    rollback_to_snapshot #

    system.rollback_to_snapshot('schema_name', 'table_name', 8954597067493422955)
    

    Allows the caller to roll back the state of the table to a previous snapshot ID.

    ALTER TABLE EXECUTE #

    The ALTER TABLE EXECUTE statement followed by a command and parameters modifies the table according to the specified procedure and parameters. For Iceberg tables, ALTER TABLE EXECUTE supports the following commands:

    Use the => operator for passing named parameter values. The left side is the name of the parameter and the right side is the value being passed:

    ALTER TABLE catalog_name.schema_name.table_name EXECUTE optimize(file_size_threshold => '10MB');
    

    optimize #

    The optimize procedure improves read performance by rewriting the content of a specific table to reduce the number of files. This process increases file size. If the table is partitioned, the data compaction acts separately on each partition selected for optimization.

    All files with a size below the optional file_size_threshold parameter are merged. The default value is 100MB.

    ALTER TABLE catalog_name.schema_name.table_name EXECUTE optimize;
    

    The following statement merges files in a table that are under the 10MB file size threshold:

    ALTER TABLE catalog_name.schema_name.table_name EXECUTE optimize(file_size_threshold => '10MB');
    

    Optimize specific partitions using a WHERE clause with the columns used as partition_key value.

    ALTER TABLE partitioned_table_name EXECUTE optimize
    WHERE partition_key = 1;
    

    drop_extended_stats #

    ALTER TABLE catalog_name.schema_name.table_name EXECUTE drop_extended_stats;
    

    Removes all extended statistics information from the table.

    expire_snapshots #

    The expire_snapshots procedure removes all snapshots and all related metadata and data files. Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. The procedure affects all snapshots that are older than the time period configured with the retention_threshold parameter. The following example changes the threshold to a value of nine days:

    ALTER TABLE catalog_name.schema_name.table_name EXECUTE expire_snapshots(retention_threshold => '9d');
    

    The value for retention_threshold must be higher than or equal to the default value of 7d. Otherwise, the procedure fails with a message similar to:

    Retention specified (1d) is shorter than the minimum retention configured in the system (7d). The default value is 7d.

    remove_orphan_files #

    The remove_orphan_files procedure removes all files from a table’s data directory that are not associated with metadata files. It also removes files that are older than the value of the retention_threshold parameter. Deleting orphaned files periodically is recommended to keep the size of a table’s data directory under control. The following example changes the retention_threshold to nine days:

    ALTER TABLE catalog_name.schema_name.table_name EXECUTE remove_orphan_files(retention_threshold => '9d');
    

    The value for retention_threshold must be higher than or equal to the default value of 7d. Otherwise, the procedure fails with a message similar to:

    Retention specified (1d) is shorter than the minimum retention configured in the system (7d). The default value is 7d.

    Migrate Hive to Iceberg #

    Migration from Hive to Iceberg table format is supported.

    The migration process uses the ALTER TABLE SET PROPERTIES syntax. See more information on the Hive table format page.

    Add files to Iceberg #

    Great Lakes connectivity supports adding files from Hive tables or locations to existing Iceberg format tables. The procedures add_files_from_table and add_files create metadata without copying any data or generating table statistics.

    The following procedures only support Parquet, ORC, and Avro file formats.

    Use the following ALTER TABLE EXECUTE statement to add files from the Hive table format to the Iceberg table format:

    ALTER TABLE example_catalog.schema_name.iceberg_table_name
        EXECUTE add_files_from_table(
        schema_name => 'schema_name',
        table_name => 'hive_table_name')
    

    The procedure adds the files to the target table, specified after ALTER TABLE, and loads them from the source table specified with the required parameters schema_name and table_name. The source table must be accessible in the same catalog as the target table and must use the Hive format. The target table must use the Iceberg format.

    Use a partition_filter argument to add files from specified partitions. The following example adds files from a partition where the region is ASIA and country is JAPAN:

    ALTER TABLE example_catalog.schema_name.iceberg_table_name
    EXECUTE add_files_from_table(
        schema_name => 'schema_name',
        table_name => 'hive_table_name',
        partition_filter => map(ARRAY['region', 'country'], ARRAY['ASIA', 'JAPAN']))
    

    To migrate a Hive table that contains subdirectories you can provide a recursive_directory argument:

    ALTER TABLE example_catalog.schema_name.iceberg_table_name
    EXECUTE add_files_from_table(
        schema_name => 'schema_name',
        table_name => 'hive_table_name',
        recursive_directory => 'true')
    

    The default value of recursive_directory is fail which causes the procedure to throw an exception if subdirectories are found. Set the value to true to add files from nested directories, or false to ignore them.

    The add_files procedure supports adding files to a target table, specified after ALTER TABLE. It loads the files from an object storage path specified with the required location parameter. The files must use the specified format, with ORC and PARQUET as valid values. The target Iceberg table must use the same format as the added files. The procedure does not validate file schemas for compatibility with the target Iceberg table. The location property is supported for partitioned tables.

    ALTER TABLE example_catalog.schema_name.iceberg_table_name
    EXECUTE add_files(
        location => 's3://my-bucket/a/path',
        format => 'ORC')
    

    To add files in subdirectories, you can provide a recursive_directory argument:

    ALTER TABLE example_catalog.schema_name.iceberg_table_name
    EXECUTE add_files(
        location => 's3://my-bucket/a/path',
        format => 'ORC',
        recursive_directory => 'true')
    

    Session properties #

    See session properties on the Table formats page to understand how these properties are used.

    The following table describes the session properties supported by the Iceberg table type.

    Session property Description
    collect_extended_statistics_ on_write Collect extended statistics while writing files. The default value is true.
    compression_codec The compression codec is used when writing new data files. The possible values are:
    • NONE
    • DEFAULT
    • SNAPPY
    • LZ4
    • ZSTD
    • GZIP
    The default value is DEFAULT.
    expire_snapshots_min_retention The minimum retention period for the expire_snapshot procedure. The default value is 7d.
    extended_statistics_enabled Enables statistics collection with the ANALYZE SQL statement and the use of extended statistics. The default value is true. The statistics_enabled session property must be set to true. Otherwise, the collection of any statistics is disabled.

    Use the ANALYZE statement to populate data size and number of distinct values (NDV) extended table statistics in Iceberg. The minimum value, maximum value, value count, and null value count statistics are computed on the fly out of the transaction log of the Iceberg table.
    merge_manifests_on_write Compact the manifest files when performing write operations. The default value is true.
    minimum_assigned_split_weight The minimum assigned split weight when size based split weighting is enabled. The default value is 0.05.
    orc_writer_max_dictionary_emory The maximum dictionary memory. The default value is 16MB.
    orc_writer_max_stripe_rows The maximum stripe row count. The default value is 10000000.
    orc_writer_max_stripe_size The maximum stripe size. The default value is 64MB.
    orc_writer_min_stripe_size The minimum stripe size. The default value is 32MB.
    orc_writer_validate_mode The level of detail in ORC validation. The possible values are:
    • HASHED
    • DETAILED
    • BOTH
    The default value is BOTH.
    orc_writer_validate_percentage The percentage of written files to validate by rereading them. The default value is 0.
    parquet_native_zstd_ decompressor_enabled Enable using native ZSTD library for faster decompression of Parquet files. The default value is true.
    remove_orphan_files_min_retention The minimum retention for the files taken into account for removal by the remove_orphan procedure. The default value is 7d.
    use_file_size_from_metadata Use the file size stored in Iceberg metadata. The default value is true.

    Iceberg SQL support #

    When using the Iceberg table format with Great Lakes connectivity, the general SQL support details apply.

    Replacing tables #

    The catalog supports replacing a table, in case the table already exists, as an atomic operation. Atomic table replacement creates a new snapshot with the new table definition and keeps the existing table history. See $history metadata table.

    To replace a table, use CREATE OR REPLACE TABLE and/or CREATE OR REPLACE TABLE AS. For more information on creating tables, see CREATE TABLE and CREATE TABLE AS. These statements do not support changing table formats defined by the type table property.

    Earlier snapshots of the table can be queried through time travel.

    After replacement, the table definition is completely new and separate from the old table.

    For example, a table example_table can be replaced by a completely new definition:

    CREATE TABLE example_table (
        a BIGINT,
        b DATE,
        c BIGINT)
    WITH (type = 'iceberg');
    

    Use the CREATE OR REPLACE TABLE AS statement to overwrite the table content:

    CREATE OR REPLACE TABLE example_table
    WITH (type = 'iceberg') AS
      SELECT * FROM (
        VALUES
          (1, DATE '2024-01-01', BIGINT '10'),
          (2, DATE '2024-01-02', BIGINT '20')
      ) AS t(a, b, c);
    

    Use the CREATE OR REPLACE TABLE statement to change the table definition:

    CREATE OR REPLACE TABLE example_table (
        a INT,
        b DATE,
        c BIGINT,
        d VARCHAR)
    WITH (type = 'iceberg');
    

    Time travel queries #

    The catalog offers the ability to query historical data. This allows you to query the table as it was when a previous snapshot of the table was taken, even if the data has since been modified or deleted.

    The historical data of the table can be retrieved by specifying the snapshot identifier corresponding to the version of the table to be retrieved:

    SELECT *
    FROM example.testdb.customer_orders FOR VERSION AS OF 8954597067493422955
    

    A different approach of retrieving historical data is to specify a point in time in the past, such as a day or week ago. The latest snapshot of the table taken before or at the specified timestamp in the query is internally used for providing the previous state of the table:

    SELECT *
    FROM example.testdb.customer_orders
      FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna'
    

    You can capture a snapshot of a table in a query by specifying a date in the past. For example, assuming that the session time zone is Europe/Vienna, the following queries are equivalent:

    SELECT *
    FROM example.testdb.customer_orders FOR TIMESTAMP AS OF DATE '2022-03-23'
    
    SELECT *
    FROM example.testdb.customer_orders
      FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00'
    
    SELECT *
    FROM example.testdb.customer_orders
      FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00.000 Europe/Vienna'
    

    Iceberg supports named references of snapshots via branches and tags. Time travel can be performed on branches and tags in the table.

    SELECT *
    FROM example.testdb.customer_orders FOR VERSION AS OF 'historical-tag'
    SELECT *
    FROM example.testdb.customer_orders FOR VERSION AS OF 'test-branch'