Kudu connector#

The Kudu connector lets you query, insert, and delete data in Apache Kudu.

SEP includes additional enterprise features that are built on top of the existing Trino connector functionality. For more information on connector key feature differences between Trino and SEP, see the connectors feature matrix.

Requirements#

To connect to Kudu, you need:

  • Kudu version 1.13.0 or higher.

  • Network access from the Trino coordinator and workers to Kudu. Port 7051 is the default port.

  • A valid Starburst Enterprise license.

Configuration#

To configure the Kudu connector, create a catalog properties file that specifies the Kudu connector by setting the connector.name to kudu.

For example, to access a database as the example catalog, create the file etc/catalog/example.properties. Replace the connection properties as appropriate for your setup:

connector.name=kudu

Configuration properties#

The following table describes catalog configuration properties for the connector:

Kudu configuration properties#

Property name

Description

Default

kudu.authentication.type

Kudu authentication mechanism type. see also, Kerberos authentication.

NONE

kudu.client.master-addresses

Comma separated list of Kudu master addresses, at least one is needed. Supported formats: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051,[2001:db8::1], [2001:db8::1]:7051, 2001:db8::1

kudu.schema-emulation.enabled

Kuda schema emulation. Kudu does not support schemas, but the connector can emulate them. By default, this feature is disabled, and all tables belong to the default schema. See the Kudu documentation for more information.

false

kudu.schema-emulation.prefix

Prefix to use for schema emulation. To use this property, the kudu.schema-emulation.enabled property must be set to true. The standard prefix is presto::. Empty prefix is also supported.

kudu.client.default-admin-operation-timeout

Default timeout used for administrative operations such as createTable and deleteTable.

30s

kudu.client.default-operation-timeout

Default timeout used for user operations.

30s

kudu.client.disable-statistics

Disable Kudu client’s collection of statistics.

false

kudu.allow-local-scheduling

Assign Kudu splits to replica host if worker and kudu share the same cluster.

false

kudu.scanner.batch-size

Maximum data size returned per batch by the scanner.

1MB

kudu.scanner.keepalive-interval

The interval for sending keep-alive requests to the tablet server to prevent scanner timeouts.

15s

kudu.scanner.scan-request-timeout

Maximum duration for each scan request to a server.

30s

Querying Kudu#

Apache Kudu does not support schemas such as namespaces for tables. The connector supports emulating schemas by table naming conventions.

Default behavior#

By default, schema emulation is disabled. When disabled, all Kudu tables are part of the default schema.

For example, a Kudu table named orders can be queried in SEP with SELECT * FROM example.default.orders or if the catalog is set to kudu and schema is default, you can use SELECT * FROM orders.

Table names can contain any characters in Kudu but must use double quotes. For example, to query a Kudu table named special.table! use the following command:

SELECT * FROM example.default."special.table!"

Example:

Create a users table in the default schema:

CREATE TABLE example.default.users (
  user_id int WITH (primary_key = true),
  first_name VARCHAR,
  last_name VARCHAR
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);

When creating a Kudu table you must specify information about the primary key, encoding, compression of columns, and hash or range partitioning. For details, see the section for CREATE TABLE.

Describe the table:

DESCRIBE example.default.users;
   Column   |  Type   |                      Extra                      | Comment
------------+---------+-------------------------------------------------+---------
 user_id    | integer | primary_key, encoding=auto, compression=default |
 first_name | varchar | nullable, encoding=auto, compression=default    |
 last_name  | varchar | nullable, encoding=auto, compression=default    |
(3 rows)

Insert some data:

INSERT INTO example.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');

Select the inserted data:

SELECT * FROM example.default.users;

Schema emulation#

If schema emulation has been enabled in your connector properties file, tables are mapped to schemas depending on the following conventions.

  • With kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=, the mapping works like:

    Kudu table name

    SEP qualified name

    orders

    kudu.default.orders

    part1.part2

    kudu.part1.part2

    x.y.z

    kudu.x."y.z"

    As schemas are not directly supported by Kudu, a special table named $schemas is created for managing the schemas.

  • With kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=presto::, the mapping works like:

    Kudu table name

    SEP qualified name

    orders

    kudu.default.orders

    part1.part2

    kudu.default."part1.part2"

    x.y.z

    kudu.default."x.y.z"

    presto::part1.part2

    kudu.part1.part2

    presto:x.y.z

    kudu.x."y.z"

    As schemas are not directly supported by Kudu, a special table named presto::$schemas is created for managing the schemas.

Type mapping#

Because Trino and Kudu each support types that the other does not, this connector modifies some types when reading or writing data. Data types may not map the same way in both directions between Trino and the data source. Refer to the following sections for type mapping in each direction.

Kudu type to Trino type mapping#

The connector maps Kudu types to the corresponding Trino types following this table:

Kudu type to Trino type mapping#

Kudu type

Trino type

BOOL

BOOLEAN

INT8

TINYINT

INT16

SMALLINT

INT32

INTEGER

INT64

BIGINT

FLOAT

REAL

DOUBLE

DOUBLE

DECIMAL(p,s)

DECIMAL(p,s)

STRING

VARCHAR

BINARY

VARBINARY

DATE

DATE

UNIXTIME_MICROS

TIMESTAMP(3)

No other types are supported.

Trino type to Kudu type mapping#

The connector maps Trino types to the corresponding Kudu types following this table:

Trino type to Kudu type mapping#

Trino type

Kudu type

Notes

BOOLEAN

BOOL

TINYINT

INT8

SMALLINT

INT16

INTEGER

INT32

BIGINT

INT64

REAL

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s)

DECIMAL(p,s)

Only supported for Kudu server 1.7.0 or higher

VARCHAR

STRING

The optional maximum length is lost

VARBINARY

BINARY

DATE

DATE

TIMESTAMP(3)

UNIXTIME_MICROS

µs resolution in Kudu column is reduced to ms resolution

No other types are supported.

SQL support#

The connector provides read and write access to data and metadata in Kudu. In addition to the globally available and read operation statements, the connector supports the following features:

INSERT#

INSERT INTO ... values and INSERT INTO ... select behave like UPSERT.

SQL DELETE#

If a WHERE clause is specified, the DELETE operation only works if the predicate in the clause can be fully pushed down to the data source.

CREATE SCHEMA#

For CREATE SCHEMA to work, schema emulation must be enabled. See the Schema emulation section.

DROP SCHEMA#

For DROP SCHEMA to work, schema emulation must be enabled. See the Schema emulation section.

CREATE TABLE#

To create a Kudu table, you need to provide the columns and their types. In addition, Kudu needs information about partitioning. You can optionally provide column encoding and compression information.

Example:

CREATE TABLE user_events (
  user_id INTEGER WITH (primary_key = true),
  event_name VARCHAR WITH (primary_key = true),
  message VARCHAR,
  details VARCHAR WITH (nullable = true, encoding = 'plain')
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 5,
  number_of_replicas = 3
);

The primary key consists of user_id and event_name. The table is partitioned into five partitions by hash values of the column user_id, and the number_of_replicas is explicitly set to 3.

The primary key columns must always be the first columns of the column list. All columns used in partitions must be part of the primary key.

The table property number_of_replicas is optional. It defines the number of tablet replicas, and must be an odd number. If it is not specified, the default replication factor from the Kudu master configuration is used.

Kudu supports the following types of partitioning:

  • Hash partitioning: Distribute rows by hash value into one of many buckets.

  • Range partitioning: Distribute rows using a totally-ordered range partition key. The concrete range partitions must be created explicitly.

  • Multi-level partitioning: A table must have at least one partitioning, either hash or range. It can have at most one range partitioning, but multiple hash partitioning ‘levels’.

For more information, see Partitioning design.

ALTER TABLE#

Adding a column to an existing table uses the SQL statement ALTER TABLE ADD COLUMN. You can specify the same column properties that you can when you create a table.

ALTER TABLE example_table ADD COLUMN extraInfo VARCHAR WITH (nullable = true, encoding = 'plain')

See also Column properties.

  • ALTER TABLE RENAME COLUMN is only allowed if not part of a primary key.

  • ALTER TABLE DROP COLUMN is only allowed if not part of a primary key.

Procedures#

Partitioning design#

A table must have at least one partitioning, either hash or range. It can have at most one range partitioning, but multiple hash partitioning levels. For more details see Apache Kudu documentation.

If you create a Kudu table in SEP, the partitioning design is given by several table properties as described in the following sections.

Hash partitioning#

You can provide the first hash partition group with two table properties:

The partition_by_hash_columns defines the column belonging to the partition group and partition_by_hash_buckets defines the number of partitions to split the hash values range into. All partition columns must be part of the primary key.

Example:

CREATE TABLE example_table (
  col1 VARCHAR WITH (primary_key=true),
  col2 VARCHAR WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['col1', 'col2'],
  partition_by_hash_buckets = 4
)

This defines a hash partitioning with the columns col1 and col2 distributed over 4 partitions.

To define two separate hash partition groups, also use the second pair of table properties named partition_by_second_hash_columns and partition_by_second_hash_buckets.

Example:

CREATE TABLE example_table (
  col1 VARCHAR WITH (primary_key=true),
  col2 VARCHAR WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['col1'],
  partition_by_hash_buckets = 2,
  partition_by_second_hash_columns = ARRAY['col2'],
  partition_by_second_hash_buckets = 3
)

This defines a two-level hash partitioning, with the first hash partition group over the column col1 distributed over 2 buckets, and the second hash partition group over the column col2 distributed over 3 buckets. As a result you have table with 2 x 3 = 6 partitions.

Range partitioning#

You can provide at most one range partitioning in Kudu. The columns are defined with the table property partition_by_range_columns. The ranges themselves are given in the table property range_partitions. Alternatively, the procedures kudu.system.add_range_partition and kudu.system.drop_range_partition can be used to manage range partitions for existing tables.

Example:

CREATE TABLE events (
  rack VARCHAR WITH (primary_key=true),
  machine VARCHAR WITH (primary_key=true),
  event_time TIMESTAMP WITH (primary_key=true),
  ...
) WITH (
  partition_by_hash_columns = ARRAY['rack'],
  partition_by_hash_buckets = 2,
  partition_by_second_hash_columns = ARRAY['machine'],
  partition_by_second_hash_buckets = 3,
  partition_by_range_columns = ARRAY['event_time'],
  range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
                       {"lower": "2018-01-01T00:00:00", "upper": null}]'
)

This defines a tree-level partitioning with two hash partition groups and one range partitioning on the event_time column. Two range partitions are created with a split at “2018-01-01T00:00:00”.

Managing range partitions#

For existing tables, there are procedures to add and drop a range partition.

  • adding a range partition:

    CALL example.system.add_range_partition(<schema>, <table>, <range_partition_as_json_string>)
    
  • dropping a range partition:

    CALL example.system.drop_range_partition(<schema>, <table>, <range_partition_as_json_string>)
    
    • <schema>: schema of the table

    • <table>: table names

    • <range_partition_as_json_string>: lower and upper bound of the range partition as JSON string in the form '{"lower": <value>, "upper": <value>}', or if the range partition has multiple columns: '{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}'. The concrete literal for lower and upper bound values are depending on the column types.

Example:

Trino data type

JSON string example

BIGINT

‘{“lower”: 0, “upper”: 1000000}’

SMALLINT

‘{“lower”: 10, “upper”: null}’

VARCHAR

‘{“lower”: “A”, “upper”: “M”}’

TIMESTAMP

‘{“lower”: “2018-02-01T00:00:00.000”, “upper”: “2018-02-01T12:00:00.000”}’

BOOLEAN

‘{“lower”: false, “upper”: true}’

VARBINARY

values encoded as base64 strings

To specified an unbounded bound, use the value null.

Example:

CALL example.system.add_range_partition('example_schema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')

This adds a range partition for a table events in the schema example_schema with the lower bound 2018-01-01, more exactly 2018-01-01T00:00:00.000, and the upper bound 2018-07-01.

Use the SQL statement SHOW CREATE TABLE to query the existing range partitions. Fro more information, see Table properties.

Table properties#

Table properties supply or set metadata for the underlying tables. Table properties are passed to the connector using a WITH clause:

CREATE TABLE events (
  serialno VARCHAR WITH (primary_key = true),
  event_time TIMESTAMP WITH (primary_key = true),
  message VARCHAR
) WITH (
  partition_by_hash_columns = ARRAY['serialno'],
  partition_by_hash_buckets = 4,
  partition_by_range_columns = ARRAY['event_time'],
  range_partitions = '[{"lower": null, "upper": "2017-01-01T00:00:00"},
                       {"lower": "2017-01-01T00:00:00", "upper": "2017-07-01T00:00:00"},
                       {"lower": "2017-07-01T00:00:00", "upper": "2018-01-01T00:00:00"}]'
);

The example creates a table with a hash partition on column serialno with 4 buckets and range partitioning on column event_time. Additionally, three range partitions are created:

  1. for all event_times before the year 2017, lower bound = null means it is unbound.

  2. for the first half of the year 2017.

  3. for the second half the year 2017.

Any attempt to add rows with event_time of year 2018 or greater fails, as no partition is defined.

Property name

Description

Default

range_partitions

Specify the concrete range partitions to be created.

partition_design

The range partition definition used. Used with range_partitions

partition_by_range_columns

The columns defined in one range partition. See also, Range partitioning.

number_of_replicas

Defines the number of tablet replicas. Must be an odd number. See also,

CREATE TABLE.

Column properties#

The connector supports the following column properties.

Property name

Type

Description

primary_key

BOOLEAN

If true, the column belongs to primary key columns. The Kudu primary key enforces a uniqueness constraint. Inserting a second row with the same primary key results in updating the existing row (‘UPSERT’). See the Kudu documentation.

nullable

BOOLEAN

If true, the value can be null. Primary key columns cannot be nullable.

encoding

VARCHAR

The column encoding can help to save storage space and to improve query performance. Kudu uses an auto encoding depending on the column type if not specified. Valid values are: 'auto', 'plain', 'bitshuffle', 'runlength', 'prefix', 'dictionary', 'group_varint'. See the Kudu documentation.

compression

VARCHAR

The encoded column values can be compressed. Kudu uses a default compression if not specified. Valid values are: 'default', 'no', 'lz4', 'snappy', 'zlib'. See the Kudu documentation.

Example:

CREATE TABLE example_table (
  name VARCHAR WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
  index BIGINT WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
  comment VARCHAR WITH (nullable = true, encoding = 'plain', compression = 'default'),
   ...
) WITH (...);

Performance#

The connector includes a number of performance features, detailed in the following sections.

Managed statistics#

The connector supports Managed statistics for performance optimizations in query planning. Use ANALYZE to collect statistics for a given table. See ANALYZE for details and examples.

Security#

The connector includes a number of security-related features, detailed in the following sections.

Kerberos authentication#

The connector supports Kerberos authentication. To configure Kerberos authentication, add the following catalog configuration properties to the catalog properties file:

kudu.authentication.type = KERBEROS
kudu.authentication.client.principal = example@example.com
kudu.authentication.client.keytab = etc/kerberos/example.keytab
kudu.authentication.config = /path/to/example/krb5.conf
// If Kudu is running with a custom SPN this needs to be configured - Optional and defaults to "kudu".
kudu.authentication.server.principal.primary = kudu

Limitations#

Only lower case table and column names in Kudu are supported.