Starburst Delta Lake connector#
The Delta Lake connector allows querying data stored in Delta Lake format, including Databricks Delta Lake. It can natively read the Delta transaction log and thus detect when external systems change data.
Requirements#
To connect to Databricks Delta Lake, you need:
Delta Lake 0.7.3 or higher.
Deployments using AWS, HDFS, and Azure Storage are fully supported. Using Google Cloud Storage is not supported.
Network access from the coordinator and workers to the Delta Lake storage.
Access to the Hive metastore service (HMS) of Delta Lake or a separate HMS.
Network access to the HMS from the coordinator and workers. Port 9083 is the default port for the Thrift protocol used by the HMS.
A valid Starburst Enterprise license.
Configuration#
The connector requires a Hive metastore for table metadata and supports the same
metastore configuration properties as the Hive connector. At a minimum, hive.metastore.uri
must be configured.
The connector recognizes Delta tables created in the metastore by the Databricks runtime. If non-Delta tables are present in the metastore, as well, they will not be visible to the connector.
To configure the Delta Lake connector, create a catalog file, for example
etc/catalog/delta.properties
, that references the delta-lake
connector.
Update the hive.metastore.uri
with the URI of your Hive metastore Thrift
service:
connector.name=delta-lake
hive.metastore.uri=thrift://example.net:9083
If you are using AWS Glue as Hive metastore, you can simply set the metastore to
glue
:
connector.name=delta-lake
hive.metastore=glue
The Delta Lake connector reuses certain functionalities from the Hive connector, including the metastore thrift and glue configuration, detailed in the Hive connector documentation.
To configure access to S3, S3-compatible storage, Azure storage, and others, consult the Amazon S3 section of Hive connector documentation or the Azure storage documentation, respectively.
Configuration properties#
The following configuration properties are all using reasonable, tested default values. Typical usage does not require you to configure them.
Property name |
Description |
Default value |
---|---|---|
|
Frequency of checks for metadata updates, equivalent to transactions, to update the metadata cache |
5 min |
|
Amount of memory allocated for caching information about files. Needs
to be specified in data size values such as |
Calculated to 10% of the maximum memory allocated to the coordinator JVM. |
|
Maximum number of partitions per writer. |
100 |
|
Hide information about tables that are not managed by Delta Lake. Hiding only applies to tables with the metadata managed in a Glue catalog, does not apply to usage with a Hive metastore service. |
false |
|
Determines whether or not the connector checks that the location contains a valid transaction log entry when creating a new table. |
true |
|
Enable write support for all supported file systems, specifically take note of the warning about concurrency and checkpoints. |
|
|
Default integer count to write transaction log checkpoint entries. If
the value is set to N, then checkpoints are after every Nth statement
performing table writes. The value can be overridden for a specific
table with the |
|
|
Name of the catalog to which |
The following table describes performance tuning catalog properties for the connector.
Warning
Performance tuning configuration properties are considered expert-level features. Altering these properties from their default values is likely to cause instability and performance degradation. We strongly suggest that you use them only to address non-trivial performance issues, and that you keep a backup of the original values if you change them.
Property name |
Description |
Default value |
---|---|---|
|
Sets the number of transactions to act as threshold. Once reached the connector initiates compaction of the underlying files and the delta files. A higher compaction threshold means reading less data from the underlying data source, but a higher memory and network consumption. |
100 |
|
The target number of buffered splits for each table scan in a query, before the scheduler tries to pause. |
1000 |
|
Sets the maximum number of splits used per second to access underlying storage. Reduce this number if your limit is routinely exceeded, based on your filesystem limits. This is set to the absolute maximum value, which results in Trino maximizing the parallelization of data access by default. Attempting to set it higher results in Trino not being able to start. |
Integer.MAX_VALUE |
|
For each query, the coordinator assigns file sections to read first
at the |
200 |
|
Sets the initial data size for a single read section assigned to a
worker until |
|
|
Sets the largest size for a single read section assigned to a worker
after max-initial-splits have been processed. You must specify units.
You can also use the corresponding catalog session property
|
|
The following table describes catalog session properties supported by the Delta Lake connector to configure processing of Parquet files.
Property name |
Description |
---|---|
|
The maximum block size used when reading Parquet files. |
|
The maximum block size created by the Parquet writer. |
|
The maximum page size created by the Parquet writer. |
|
Enables the experimental, native Parquet writer. |
SQL support#
The connector provides read and write access to data and metadata in Delta Lake. In addition to the globally available and read operation statements, the connector supports the following features:
Data management, see also Updating data
CREATE SCHEMA, see also Creating schemas
CREATE TABLE, see also Creating tables
Security operations, see also SQL security
Special columns#
In addition to the defined columns, the Delta Lake connector automatically exposes metadata in a number of hidden columns in each table. You can use these columns in your SQL statements like any other column, e.g., they can be selected directly or used in conditional statements.
$path
Full file system path name of the file for this row.
$file_modified_time
Date and time of the last modification of the file for this row.
$file_size
Size of the file for this row.
Creating schemas#
The connector supports creating metastore schemas, optionally with a location specified.
You can create a schema without a location with the base CREATE SCHEMA statement. Tables in this schema must have a location included when you create them. The data files for these tables are not removed if the table is dropped:
CREATE SCHEMA delta.my_schema;
Optionally, a location can be specified. Tables in this schema do not need to have a location. By default they are located in a subdirectory under the schema location. The data files for tables in this schema using the default location are cleaned up if the table is dropped:
CREATE SCHEMA delta.my_schema WITH (location = 's3://my-bucket/a/path');
Creating tables#
When Delta tables exist in storage, but not in the metastore, SEP can be used to register them:
CREATE TABLE delta.default.my_table (
dummy bigint
)
WITH (
location = '...'
)
Columns listed in the DDL, such as dummy
in the above example, are ignored.
The table schema is read from the transaction log, instead. If the schema is
changed by an external system, SEP automatically uses the new schema.
The Delta Lake connector also supports creating tables using the CREATE TABLE
AS
syntax. There are two table properties available for use in table creation.
The connector automatically writes the initial transaction log entries and registers the table in the metastore, when creating a new table. As a result any Databricks engine can write to the table:
CREATE TABLE delta.default.new_table (id bigint, address varchar);
The connector also supports creating tables using the CREATE TABLE AS
syntax. There are two table properties available for use in table creation.
A set of partition columns can be provided using the partitioned_by
table
property, and the checkpoint entry interval can be provided using the
checkpoint_interval
table property:
CREATE TABLE delta.default.my_partitioned_table
WITH (
location = '...',
partitioned_by = ARRAY['regionkey'],
checkpoint_interval = 5
)
AS SELECT name, comment, regionkey FROM tpch.tiny.nation;
Updating data#
You can use the connector to INSERT, DELETE and UPDATE data in Delta Lake tables.
Write operations are supported for tables stored on the following systems:
Azure ADLS Gen2
Writes to the Azure ADLS Gen2 file system are enabled by default. SEP detects write collisions on ADLS Gen2 when writing from multiple SEP clusters, or from as any number of open source Delta Lake clusters.
S3 compatible storage
Writes to AWS S3 must be enabled with the
delta.enable-non-concurrent-writes
property. Writes to S3 can safely be made from multiple SEP clusters, however write collisions are not detected when writing concurrently from other Delta Lake engines. You need to make sure that no concurrent data modifications are run to avoid data corruption.
SQL security#
You must set the hive.security
property in your catalog properties file to
sql-standard
in order to use SQL security operation statements. See SQL standard based authorization for
more information.
Performance#
The connector includes a number of performance improvements, detailed in the following sections:
Support for write partitioning.
Table statistics#
You can use ANALYZE statements in SEP to populate the table statistics in Delta Lake. The cost-based optimizer then uses these statistics to improve query performance.
Extended statistics enable a broader set of optimizations, including join reordering. It is enabled by default.
Each ANALYZE
statement updates the table statistics incrementally, so only
the data changed since the last ANALYZE
is counted. The table statistics are
not automatically updated by write operations such as INSERT
, UPDATE
,
and DELETE
. You must manually run ANALYZE
again to update the table
statistics.
To collect statistics for a table, execute the following statement:
ANALYZE table_schema.table_name;
To gain the most benefit from cost-based optimizations, run periodic ANALYZE
statements on every large table that is frequently queried.
Fine tuning#
The files_modified_after
property is useful if you want to run the initial
ANALYZE
statement on a table that wasn’t previously analyzed. You can use it
to limit the amount of data used to generate the table statistics:
ANALYZE my_table WITH(files_modified_after = TIMESTAMP '2021-08-23
16:43:01.321 Z')
As a result, only files newer than the specified time stamp are used in the analysis.
You can also specify a set or subset of columns to analyze using the columns
property:
ANALYZE my_table WITH(columns = ARRAY['nationkey', 'regionkey'])
To run ANALYZE
with columns
more than once, the next ANALYZE
must
run on the same set or a subset of the original columns used.
To broaden the set of columns
, drop the statistics and reanalyze the table.
Disable and drop extended statistics#
You can disable extended statistics with the catalog configuration property
delta.extended-statistics.enabled
set to false
. Alternatively, you can
disable it for a session, with the catalog session property extended_statistics_enabled
set to false
.
If a table is changed with many delete and update operation, calling ANALYZE
does not result in accurate statistics. To correct the statistics you have to
drop the extended stats and analyze table again.
Use the system.drop_extended_stats
procedure in the catalog to drop the
extended statistics for a specified table in a specified schema:
CALL delta_catalog.system.drop_extended_stats('my_schema', 'my_table')
Data layout optimization#
You can send a request to the Databricks Spark cluster to optimize a table’s
data layout using the invoke_optimize
procedure:
CALL delta.system.invoke_optimize('my_schema', 'my_table')
Alternatively you can use named parameters:
CALL delta.system.invoke_optimize(
schema_name => 'my_schema',
table_name => 'my_table')
Optionally you can pass a list of columns to use for Z-ordering:
CALL delta.system.invoke_optimize(
schema_name => 'my_schema',
table_name => 'my_table',
z_order => ARRAY['col1', 'col2'])
You must configure a JDBC connection to your Databricks Spark cluster in your
catalog properties file to call the invoke_optimize
procedure :
Property name |
Description |
---|---|
|
JDBC URL to connect to the Databricks Spark cluster. |
|
User name for the JDBC connection. |
|
Password for the JDBC connection. |
More information about the OPTIMIZE
procedure in Databricks is available in
the documentation.
Memory usage#
The Delta Lake connector is memory intensive and the amount of required memory grows with the size of Delta Lake transaction logs of any accessed tables. It is important to take that into account when provisioning the coordinator.
You need to decrease memory usage by keeping the number of active data files in
table low by running OPTIMIZE
and VACUUM
in Delta Lake regularly.
When using the Delta Lake connector you need to monitor memory usage on the coordinator. Specifically monitor JVM heap utilization using standard tools as part of routine operation of the cluster.
A good proxy for memory usage is the cache utilization of Delta Lake caches. It
is exposed by the connector with the
plugin.deltalake.transactionlog:name=<catalog-name>,type=transactionlogaccess
JMX bean.
You can access it with any standard monitoring software with JMX support, or use the JMX connector with the following query:
SELECT * FROM jmx.current."*.plugin.deltalake.transactionlog:name=<catalog-name>,type=transactionlogaccess"
Following is an example result:
datafilemetadatacachestats.hitrate | 0.97
datafilemetadatacachestats.missrate | 0.03
datafilemetadatacachestats.requestcount | 3232
metadatacachestats.hitrate | 0.98
metadatacachestats.missrate | 0.02
metadatacachestats.requestcount | 6783
node | sep-master
object_name | com.starburstdata.presto.plugin.deltalake.transactionlog:type=TransactionLogAccess,name=delta
In a healthy system both datafilemetadatacachestats.hitrate
and
metadatacachestats.hitrate
are close to 1.0
.
Accelerated Parquet reader#
The connector includes a Parquet reader with numerous performance improvements for reading data files using the Parquet format.
The reader is enabled by default. You can disable it, and use the default reader from Trino in your catalog properties file:
delta.accelerated-parquet-reader.enabled=false
Alternatively, you can disable it for a specific user session with the
catalog session property
accelerated_parquet_reader_enabled
.
Starburst Cached Views#
The connector supports table scan redirection to improve performance and reduce load on the data source.
Security#
The connector includes a number of security-related features, detailed in the following sections.
Authorization#
The connector supports standard Hive security for authorization. For configuration properties, see the ‘Authorization’ section in Hive connector security configuration.