Kafka connector#
This connector allows the use of Apache Kafka topics as tables in Starburst Enterprise. Each message is presented as a row in SEP.
Topics can be live. Rows appear as data arrives, and disappear as segments get dropped. This can result in strange behavior if accessing the same table multiple times in a single query such as performing a self join.
For a detailed example, see the Kafka connector tutorial.
SEP includes additional enterprise features that are built on top of the existing Trino connector functionality. For more information on connector key feature differences between Trino and SEP, see the connectors feature matrix.
Requirements#
To connect to Kafka, you need:
Kafka broker version 0.10.0 or higher.
Network access from the SEP coordinator and workers to the Kafka nodes. Port 9092 is the default port.
A valid Starburst Enterprise license.
When using Protobuf decoder with the Confluent table description supplier, the following additional steps must be taken:
Copy the
kafka-protobuf-provider
andkafka-protobuf-types
JAR files from Confluent for Confluent version 7.3.1 to the Kafka connector plugin directory (<install directory>/plugin/kafka
) on all nodes in the cluster. The plugin directory depends on the Local installation method.By copying those JARs and using them, you agree to the terms of the Confluent Community License Agreement under which Confluent makes them available.
These steps are not required if you are not using Protobuf and Confluent table description supplier.
Configuration#
To configure the Kafka connector, create a catalog properties file that
specifies the Kafka connector by setting the connector.name
to kafka
.
For example, to access a database as the example
catalog, create the file
etc/catalog/example.properties
. Replace the connection properties as
appropriate for your setup.
When using specialized authentication methods, you must specify additional Kafka
client properties in order to access your Kafka cluster. To do so, add the
kafka.config.resources
property to reference your Kafka config files. Note
that configs can be overwritten if defined explicitly in kafka.properties
:
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
kafka.config.resources=/etc/kafka-configuration.properties
Multiple Kafka servers#
You can have as many catalogs as you need. If you have additional Kafka clusters, configure another catalog.
To add another catalog, add a new properties file to etc/catalog
. For example,
if you name the property file sales.properties
, SEP creates a catalog
named sales
.
Log levels#
Kafka consumer logging can be verbose and pollute SEP logs. To lower the
log level, add the following to etc/log.properties
:
org.apache.kafka=WARN
General configuration properties#
The following table describes general catalog configuration properties for the connector:
Property name |
Description |
Default |
---|---|---|
|
Size of the internal data buffer for reading data from Kafka. The data buffer must be able to hold at least one message and ideally can hold many messages. There is one data buffer allocated per worker and data node. |
|
|
A comma-separated list of Kafka client configuration files. These files must
exist on the machines running SEP. Only specify this if absolutely necessary
to access Kafka. Example: |
|
|
Defines the schema which contains all tables that were defined without a qualifying schema name. |
|
|
Controls whether internal columns are part of the table schema or not. In
addition to the data columns defined in a table description file, the
connector maintains a number of additional columns for each table. If these
columns are hidden, they can still be used in queries but do not show up in
|
|
|
Prefix for internal columns. |
|
|
Number of messages that are processed by each SEP split. |
|
|
A comma-separated list of |
|
|
Enable support for encoding Protobuf |
|
|
Protocol used to communicate with brokers. Valid values are: |
|
|
The endpoint identification algorithm used by clients to validate server
host name for connecting to the Kafka cluster. Use |
|
|
Password for the private key in the keystore file used for connecting to the
Kafka cluster. This is only required for clients if two-way authentication
is configured. For example: |
|
|
Location of the keystore file used for connecting to the Kafka cluster. |
|
|
Password for the keystore file used for connecting to the Kafka cluster.
This property is required when using |
|
|
File format of the keystore file. Valid values are |
|
|
Location of the truststore file used for connecting to the Kafka cluster. |
|
|
Password for the truststore file used for connecting to the Kafka cluster.
This property is required when using |
|
|
File format of the truststore file. Valid values are JKS and PKCS12. |
|
|
The upper bound predicate on |
|
Internal columns#
The internal column prefix can be configured using the
kafka.internal-column-prefix
configuration property and defaults to _
. A
different prefix affects the internal column names as described in the following
sections. For example, a value of internal_
changes the partition ID column
name from _partition_id
to internal_partition_id
.
For each defined table, the connector maintains the following columns:
Column name |
Type |
Description |
---|---|---|
|
BIGINT |
ID of the Kafka partition which contains this row. |
|
BIGINT |
Offset within the Kafka partition for this row. |
|
BIGINT |
Lowest offset in the segment (inclusive) which contains this row. This offset is partition specific. |
|
BIGINT |
Highest offset in the segment (exclusive) which contains this row. The
offset is partition specific. This is the same value as |
|
BIGINT |
Running count for the current row within the segment. For an uncompacted
topic, |
|
BOOLEAN |
True if the decoder could not decode the message for this row. When true, data columns mapped from the message should be treated as invalid. |
|
VARCHAR |
Message bytes as a UTF-8 encoded string. This is only useful for a text topic. |
|
BIGINT |
Number of bytes in the message. |
|
map(VARCHAR, array(VARBINARY)) |
Headers of the message where values with the same key are grouped as array. |
|
BOOLEAN |
True if the key decoder could not decode the key for this row. When true, data columns mapped from the key should be treated as invalid. |
|
VARCHAR |
Key bytes as a UTF-8 encoded string. This is only useful for textual keys. |
|
BIGINT |
Number of bytes in the key. |
|
TIMESTAMP |
Message timestamp. |
For tables without a table definition file, the _key_corrupt
and
_message_corrupt
columns will always be false
.
Table schema and schema registry usage#
The table schema for the messages can be supplied to the connector with a configuration file or a schema registry. It also provides a mechanism for the connector to discover tables.
You must configure the supplier with the kafka.table-description-supplier
property, setting it to FILE
or CONFLUENT
. Each table description
supplier has a separate set of configuration properties.
Refer to the following sections for more information. The FILE
table
description supplier is the default, and the value is case insensitive.
File table description supplier#
To use the file-based table description supplier, set
kafka.table-description-supplier
to FILE
. In addition, you must configure
kafka.table-names
and kafka.table-description-dir
as described in the
following sections:
kafka.table-names
#
Comma-separated list of all tables provided by this catalog. A table name can be
unqualified (simple name), and is placed into the default schema, or it can be
qualified with a schema name (<schema-name>.<table-name>
).
For each table defined here, a table description file may exist. If no table description file exists, the table name is used as the topic name on Kafka, and no data columns are mapped into the table. The table still contains all internal columns.
This property is required. At least one table must be defined.
kafka.table-description-dir
#
References a folder within Trino deployment that holds one or more JSON files.
The file must end with .json
which contain table description files.
This property is optional; the default is etc/kafka
.
Table definition files#
Kafka maintains topics only as byte messages and leaves it to producers and consumers to define how a message should be interpreted. For SEP, this data must be mapped into columns to allow queries against the data.
Note
For textual topics that contain JSON data, it is entirely possible to not use
any table definition files, but instead use the Trino JSON functions and operators to
parse the _message
column which contains the bytes mapped into a UTF-8 string.
This is cumbersome and makes it difficult to write SQL queries. This only works
when reading data.
A table definition file consists of a JSON definition for a table. The name of
the file can be arbitrary but must end in .json
. Place the file in the
directory configured with the kafka.table-description-dir
property. The table
definition file must be accessible from all SEP nodes.
{
"tableName": ...,
"schemaName": ...,
"topicName": ...,
"key": {
"dataFormat": ...,
"fields": [
...
]
},
"message": {
"dataFormat": ...,
"fields": [
...
]
}
}
Field |
Required |
Type |
Description |
---|---|---|---|
|
required |
string |
SEP table name defined by this file. |
|
optional |
string |
Schema containing the table. If omitted, the default schema name is used. |
|
required |
string |
Kafka topic that is mapped. |
|
optional |
JSON object |
Field definitions for data columns mapped to the message key. |
|
optional |
JSON object |
Field definitions for data columns mapped to the message itself. |
Key and message in Kafka#
Starting with Kafka 0.8, each message in a topic can have an optional key. A table definition file contains sections for both key and message to map the data onto table columns.
Each of the key
and message
fields in the table definition is a
JSON object that must contain two fields:
Field |
Required |
Type |
Description |
---|---|---|---|
|
required |
string |
Selects the decoder for this group of fields. |
|
required |
JSON array |
A list of field definitions. Each field definition creates a new column in the SEP table. |
Each field definition is a JSON object:
{
"name": ...,
"type": ...,
"dataFormat": ...,
"mapping": ...,
"formatHint": ...,
"hidden": ...,
"comment": ...
}
Field |
Required |
Type |
Description |
---|---|---|---|
|
required |
string |
Name of the column in the SEP table. |
|
required |
string |
SEP type of the column. |
|
optional |
string |
Selects the column decoder for this field. Defaults to the default decoder for this row data format and column type. |
|
optional |
string |
The path or URL where the Avro schema resides. Used only for Avro decoder. |
|
optional |
string |
Mapping information for the column. This is decoder specific, see below. |
|
optional |
string |
Sets a column-specific format hint to the column decoder. |
|
optional |
boolean |
Hides the column from |
|
optional |
string |
Adds a column comment, which is shown with |
There is no limit on field descriptions for either key or message.
Confluent table description supplier#
The Confluent table description supplier uses the Confluent Schema Registry to discover table definitions. It is only tested to work with the Confluent Schema Registry.
The benefits of using the Confluent table description supplier over the file table description supplier are:
New tables can be defined without a cluster restart.
Schema updates are detected automatically.
There is no need to define tables manually.
Some Protobuf specific types like
oneof
andany
are supported and mapped to JSON.
When using Protobuf decoder with the Confluent table description supplier, some additional steps are necessary. For details, refer to Requirements.
To use the schema registry, set the kafka.table-description-supplier
property
to CONFLUENT
. You must also configure the additional properties in the
following table:
Note
Inserts are not supported. The only data format supported is AVRO.
Property name |
Description |
Default value |
---|---|---|
|
Comma-separated list of URL addresses for the Confluent schema registry. For
example,
|
|
|
The maximum number of subjects that can be stored in the local cache. The
cache stores the schemas locally by subjectId, and is provided by the
Confluent |
1000 |
|
Avro allows empty struct fields, but this is not allowed in SEP. There are three strategies for handling empty struct fields:
This can also be modified with the |
|
|
The interval used for refreshing the list of subjects and the definition of the schema for the subject in the subject’s cache. |
|
Confluent subject to table name mapping#
The subject naming strategy determines how a subject is resolved from the table name.
The default strategy is the TopicNameStrategy
, where the key subject is
defined as <topic-name>-key
and the value subject is defined as
<topic-name>-value
. If other strategies are used there is no way to
determine the subject name beforehand, so it must be specified manually in the
table name.
To manually specify the key and value subjects, append to the topic name.
For example: <topic name>&key-subject=<key subject>&value-subject=<value subject>
. Both the key-subject
and value-subject
parameters are
optional. If neither are specified, then the default TopicNameStrategy
is
used to resolve the subject name via the topic name. Note that a case
insensitive match must be done. Identifiers cannot contain upper case
characters.
Protobuf-specific type handling in Confluent table description supplier#
When using the Confluent table description supplier, the following Protobuf specific types are supported in addition to the normally supported types:
oneof#
Protobuf schemas containing oneof
fields are mapped to a JSON
field in
Trino.
For example, given the following Protobuf schema:
syntax = "proto3";
message schema {
oneof test_oneof_column {
string string_column = 1;
uint32 integer_column = 2;
uint64 long_column = 3;
double double_column = 4;
float float_column = 5;
bool boolean_column = 6;
}
}
The corresponding Trino row is a JSON
field test_oneof_column
containing a
JSON object with a single key. The value of the key matches the name of the
oneof
type that is present.
In the previous example, if the Protobuf message has the test_oneof_column
containing string_column
set to a value Trino
then the corresponding Trino
row includes a column named test_oneof_column
with the value JSON '{"string_column": "Trino"}'
.
Kafka inserts#
The Kafka connector supports the use of INSERT statements to write data to a Kafka topic. Table column data is mapped to Kafka messages as defined in the table definition file. There are five supported data formats for key and message encoding:
These data formats each have an encoder that maps column values into bytes to be sent to a Kafka topic.
SEP supports at-least-once delivery for Kafka producers. This means that messages are guaranteed to be sent to Kafka topics at least once. If a producer acknowledgement times out, or if the producer receives an error, it might retry sending the message. This could result in a duplicate message being sent to the Kafka topic.
The Kafka connector does not allow the user to define which partition will be used as the target for a message. If a message includes a key, the producer will use a hash algorithm to choose the target partition for the message. The same key will always be assigned the same partition.
Type mapping#
Because Trino and Kafka each support types that the other does not, this connector maps some types when reading (decoding) or writing (encoding) data. Type mapping depends on the format (Raw, Avro, JSON, CSV).
Row encoding#
Encoding is required to allow writing data; it defines how table columns in Trino map to Kafka keys and message data.
The Kafka connector contains the following encoders:
raw encoder - Table columns are mapped to a Kafka message as raw bytes.
CSV encoder - Kafka message is formatted as a comma-separated value.
JSON encoder - Table columns are mapped to JSON fields.
Avro encoder - Table columns are mapped to Avro fields based on an Avro schema.
Protobuf encoder - Table columns are mapped to Protobuf fields based on a Protobuf schema.
Note
A table definition file must be defined for the encoder to work.
Raw encoder#
The raw encoder formats the table columns as raw bytes using the mapping information specified in the table definition file.
The following field attributes are supported:
dataFormat
- Specifies the width of the column data type.type
- Trino data type.mapping
- start and optional end position of bytes to convert (specified asstart
orstart:end
).
The dataFormat
attribute selects the number of bytes converted.
If absent, BYTE
is assumed. All values are signed.
Supported values:
BYTE
- one byteSHORT
- two bytes (big-endian)INT
- four bytes (big-endian)LONG
- eight bytes (big-endian)FLOAT
- four bytes (IEEE 754 format, big-endian)DOUBLE
- eight bytes (IEEE 754 format, big-endian)
The type
attribute defines the Trino data type.
Different values of dataFormat
are supported, depending on the Trino data
type:
Trino data type |
|
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No other types are supported.
The mapping
attribute specifies the range of bytes in a key or
message used for encoding.
Note
Both a start and end position must be defined for VARCHAR
types.
Otherwise, there is no way to know how many bytes the message contains. The
raw format mapping information is static and cannot be dynamically changed
to fit the variable width of some Trino data types.
If only a start position is given:
For fixed width types, the appropriate number of bytes are used for the specified
dataFormat
(see above).
If both a start and end position are given, then:
For fixed width types, the size must be equal to number of bytes used by specified
dataFormat
.All bytes between start (inclusive) and end (exclusive) are used.
Note
All mappings must include a start position for encoding to work.
The encoding for numeric data types (BIGINT
, INTEGER
, SMALLINT
,
TINYINT
, REAL
, DOUBLE
) is straightforward. All numeric types use
big-endian. Floating point types use IEEE 754 format.
Example raw field definition in a table definition file for a Kafka message:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message": {
"dataFormat": "raw",
"fields": [
{
"name": "field1",
"type": "BIGINT",
"dataFormat": "LONG",
"mapping": "0"
},
{
"name": "field2",
"type": "INTEGER",
"dataFormat": "INT",
"mapping": "8"
},
{
"name": "field3",
"type": "SMALLINT",
"dataFormat": "LONG",
"mapping": "12"
},
{
"name": "field4",
"type": "VARCHAR(6)",
"dataFormat": "BYTE",
"mapping": "20:26"
}
]
}
}
Columns should be defined in the same order they are mapped. There can be no
gaps or overlaps between column mappings. The width of the column as defined by
the column mapping must be equivalent to the width of the dataFormat
for all
types except for variable width types.
Example insert query for the above table definition:
INSERT INTO example_raw_table (field1, field2, field3, field4)
VALUES (123456789, 123456, 1234, 'abcdef');
Note
The raw encoder requires the field size to be known ahead of time, including
for variable width data types like VARCHAR
. It also disallows inserting
values that do not match the width defined in the table definition
file. This is done to ensure correctness, as otherwise longer values are
truncated, and shorter values are read back incorrectly due to an undefined
padding character.
CSV encoder#
The CSV encoder formats the values for each row as a line of
comma-separated-values (CSV) using UTF-8 encoding. The CSV line is formatted
with a comma ,
as the column delimiter.
The type
and mapping
attributes must be defined for each field:
type
- Trino data typemapping
- The integer index of the column in the CSV line (the first column is 0, the second is 1, and so on)
dataFormat
and formatHint
are not supported and must be omitted.
The following Trino data types are supported by the CSV encoder:
BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
REAL
BOOLEAN
VARCHAR
/VARCHAR(x)
No other types are supported.
Column values are converted to strings before they are formatted as a CSV line.
The following is an example CSV field definition in a table definition file for a Kafka message:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message": {
"dataFormat": "csv",
"fields": [
{
"name": "field1",
"type": "BIGINT",
"mapping": "0"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "1"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "2"
}
]
}
}
Example insert query for the above table definition:
INSERT INTO example_csv_table (field1, field2, field3)
VALUES (123456789, 'example text', TRUE);
JSON encoder#
The JSON encoder maps table columns to JSON fields defined in the table definition file according to RFC 4627.
For fields, the following attributes are supported:
type
- Trino data type of column.mapping
- A slash-separated list of field names to select a field from the JSON object.dataFormat
- Name of formatter. Required for temporal types.formatHint
- Pattern to format temporal data. Only use withcustom-date-time
formatter.
The following Trino data types are supported by the JSON encoder:
BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
REAL
BOOLEAN
VARCHAR
DATE
TIME
TIME WITH TIME ZONE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
No other types are supported.
The following dataFormats
are available for temporal data:
iso8601
rfc2822
custom-date-time
- Formats temporal data according to Joda Time pattern given byformatHint
field.milliseconds-since-epoch
seconds-since-epoch
All temporal data in Kafka supports milliseconds precision.
The following table defines which temporal data types are supported by
dataFormats
:
Trino data type |
Decoding rules |
---|---|
|
|
|
|
|
|
|
|
|
|
The following is an example JSON field definition in a table definition file for a Kafka message:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message": {
"dataFormat": "json",
"fields": [
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "TIMESTAMP",
"dataFormat": "custom-date-time",
"formatHint": "yyyy-dd-MM HH:mm:ss.SSS",
"mapping": "field3"
}
]
}
}
The following shows an example insert query for the preceding table definition:
INSERT INTO example_json_table (field1, field2, field3)
VALUES (123456789, 'example text', TIMESTAMP '2020-07-15 01:02:03.456');
Avro encoder#
The Avro encoder serializes rows to Avro records as defined by the Avro schema. Trino does not support schemaless Avro encoding.
Note
The Avro schema is encoded with the table column values in each Kafka message.
The dataSchema
must be defined in the table definition file to use the Avro
encoder. It points to the location of the Avro schema file for the key or
message.
Avro schema files can be retrieved via HTTP or HTTPS from remote server with the syntax:
"dataSchema": "http://example.org/schema/avro_data.avsc"
Local files need to be available on all Trino nodes and use an absolute path in the syntax, for example:
"dataSchema": "/usr/local/schema/avro_data.avsc"
The following field attributes are supported:
name
- Name of the column in the Trino table.type
- Trino data type of column.mapping
- A slash-separated list of field names to select a field from the Avro schema. If the field specified inmapping
does not exist in the original Avro schema, then a write operation fails.
The following table lists supported Trino data types, which can be used in
type
for the equivalent Avro field type.
Trino data type |
Avro data type |
---|---|
|
|
|
|
|
|
|
|
|
|
No other types are supported.
The following example shows an Avro field definition in a table definition file for a Kafka message:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message":
{
"dataFormat": "avro",
"dataSchema": "/avro_message_schema.avsc",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
In the following example, an Avro schema definition for the preceding table definition is shown:
{
"type" : "record",
"name" : "example_avro_message",
"namespace" : "io.trino.plugin.kafka",
"fields" :
[
{
"name":"field1",
"type":["null", "long"],
"default": null
},
{
"name": "field2",
"type":["null", "string"],
"default": null
},
{
"name":"field3",
"type":["null", "boolean"],
"default": null
}
],
"doc:" : "A basic avro schema"
}
The following is an example insert query for the preceding table definition:
- INSERT INTO example_avro_table (field1, field2, field3)
VALUES (123456789, ‘example text’, FALSE);
Protobuf encoder#
The Protobuf encoder serializes rows to Protobuf DynamicMessages as defined by the Protobuf schema.
Note
The Protobuf schema is encoded with the table column values in each Kafka message.
The dataSchema
must be defined in the table definition file to use the
Protobuf encoder. It points to the location of the proto
file for the key
or message.
Protobuf schema files can be retrieved via HTTP or HTTPS from a remote server with the syntax:
"dataSchema": "http://example.org/schema/schema.proto"
Local files need to be available on all Trino nodes and use an absolute path in the syntax, for example:
"dataSchema": "/usr/local/schema/schema.proto"
The following field attributes are supported:
name
- Name of the column in the Trino table.type
- Trino type of column.mapping
- slash-separated list of field names to select a field from the Protobuf schema. If the field specified inmapping
does not exist in the original Protobuf schema, then a write operation fails.
The following table lists supported Trino data types, which can be used in type
for the equivalent Protobuf field type.
Trino data type |
Protobuf data type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Protobuf type with |
|
|
|
|
The following example shows a Protobuf field definition in a table definition file for a Kafka message:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message":
{
"dataFormat": "protobuf",
"dataSchema": "/message_schema.proto",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
In the following example, a Protobuf schema definition for the preceding table definition is shown:
syntax = "proto3";
message schema {
uint64 field1 = 1 ;
string field2 = 2;
bool field3 = 3;
}
The following is an example insert query for the preceding table definition:
INSERT INTO example_protobuf_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);
Row decoding#
For key and message, a decoder is used to map message and key data onto table columns.
The Kafka connector contains the following decoders:
raw
- Kafka message is not interpreted; ranges of raw message bytes are mapped to table columns.csv
- Kafka message is interpreted as comma separated message, and fields are mapped to table columns.json
- Kafka message is parsed as JSON, and JSON fields are mapped to table columns.avro
- Kafka message is parsed based on an Avro schema, and Avro fields are mapped to table columns.protobuf
- Kafka message is parsed based on a Protobuf schema, and Protobuf fields are mapped to table columns.
Note
If no table definition file exists for a table, the dummy
decoder is used,
which does not expose any columns.
Raw decoder#
The raw decoder supports reading of raw byte-based values from Kafka message or key, and converting it into Trino columns.
For fields, the following attributes are supported:
dataFormat
- Selects the width of the data type converted.type
- Trino data type. See table later min this document for list of supported data types.mapping
-<start>[:<end>]
- Start and end position of bytes to convert (optional).
The dataFormat
attribute selects the number of bytes converted.
If absent, BYTE
is assumed. All values are signed.
Supported values are:
BYTE
- one byteSHORT
- two bytes (big-endian)INT
- four bytes (big-endian)LONG
- eight bytes (big-endian)FLOAT
- four bytes (IEEE 754 format)DOUBLE
- eight bytes (IEEE 754 format)
The type
attribute defines the Trino data type on which the value is mapped.
Depending on the Trino type assigned to a column, different values of dataFormat can be used:
Trino data type |
Allowed |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No other types are supported.
The mapping
attribute specifies the range of the bytes in a key or message
used for decoding. It can be one or two numbers separated by a colon
(<start>[:<end>]
).
If only a start position is given:
For fixed width types, the column will use the appropriate number of bytes for the specified
dataFormat
(see above).When
VARCHAR
value is decoded, all bytes from start position till the end of the message will be used.
If start and end position are given:
For fixed width types, the size must be equal to number of bytes used by specified
dataFormat
.For
VARCHAR
all bytes between start (inclusive) and end (exclusive) are used.
If no mapping
attribute is specified, it is equivalent to setting start
position to 0 and leaving end position undefined.
The decoding scheme of numeric data types (BIGINT
, INTEGER
, SMALLINT
,
TINYINT
, DOUBLE
) is straightforward. A sequence of bytes is read from input
message and decoded according to either:
big-endian encoding (for integer types)
IEEE 754 format for (for
DOUBLE
).
Length of decoded byte sequence is implied by the dataFormat
.
For VARCHAR
data type a sequence of bytes is interpreted according to UTF-8
encoding.
CSV decoder#
The CSV decoder converts the bytes representing a message or key into a string using UTF-8 encoding and then interprets the result as a CSV, comma-separated value line.
For fields, the type
and mapping
attributes must be defined:
type
- Trino data type. See the following table for a list of supported data types.mapping
- The index of the field in the CSV record.
The dataFormat
and formatHint
attributes are not supported and must be
omitted.
Table below lists supported Trino types, which can be used in type
and
decoding scheme:
Trino data type |
Decoding rules |
---|---|
|
Decoded using Java |
|
Decoded using Java |
|
“true” character sequence maps to |
|
Used as is |
No other types are supported.
JSON decoder#
The JSON decoder converts the bytes representing a message or key into a JSON according to RFC 4627. Note that the message or key MUST convert into a JSON object, not an array or simple type.
For fields, the following attributes are supported:
type
- Trino data type of column.dataFormat
- Field decoder to be used for column.mapping
- slash-separated list of field names to select a field from the JSON object.formatHint
- Only forcustom-date-time
.
The JSON decoder supports multiple field decoders, with _default
being used
for standard table columns and a number of decoders for date- and time-based
types.
The following table lists Trino data types, which can be used as in type
, and
matching field decoders, which can be specified via dataFormat
attribute.
Trino data type |
Allowed |
---|---|
|
Default field decoder (omitted |
|
|
|
|
|
|
|
|
|
|
No other types are supported.
Default field decoder#
This is the standard field decoder, supporting all the Trino physical data types. A field value is transformed under JSON conversion rules into boolean, long, double or string values. For non-date/time based columns, this decoder should be used.
Date and time decoders#
To convert values from JSON objects into Trino DATE
, TIME
, TIME WITH TIME ZONE
, TIMESTAMP
or TIMESTAMP WITH TIME ZONE
columns, special decoders must
be selected using the dataFormat
attribute of a field definition.
iso8601
- Text based, parses a text field as an ISO 8601 timestamp.rfc2822
- Text based, parses a text field as an RFC 2822 timestamp.custom-date-time
- Text based, parses a text field according to Joda format pattern : specified viaformatHint
attribute. Format pattern should conform to https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html.milliseconds-since-epoch
- Number-based; interprets a text or number as number of milliseconds since the epoch.seconds-since-epoch
- Number-based; interprets a text or number as number of milliseconds since the epoch.
For TIMESTAMP WITH TIME ZONE
and TIME WITH TIME ZONE
data types, if timezone
information is present in decoded value, it will be used as Trino value.
Otherwise result time zone will be set to UTC
.
Avro decoder#
The Avro decoder converts the bytes representing a message or key in Avro format based on a schema. The message must have the Avro schema embedded. Trino does not support schemaless Avro decoding.
For key/message, using avro
decoder, the dataSchema
must be defined. This
should point to the location of a valid Avro schema file of the message which
needs to be decoded. This location can be a remote web server (For example,
dataSchema: 'http://example.org/schema/avro_data.avsc'
) or local file
system(For example, dataSchema: '/usr/local/schema/avro_data.avsc'
). The
decoder fails if this location is not accessible from the Trino coordinator
node.
For fields, the following attributes are supported:
name
- Name of the column in the Trino table.type
- Trino data type of column.mapping
- A slash-separated list of field names to select a field from the Avro schema. If field specified inmapping
does not exist in the original Avro schema, then a read operation returnsNULL
.
The following table lists the supported Trino types which can be used in type
for the equivalent Avro field types:
Trino data type |
Allowed Avro data type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No other types are supported.
Avro schema evolution#
The Avro decoder supports schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Avro data created with an older schema. Any change in the Avro schema must also be reflected in Trino’s topic definition file. Newly added/renamed fields must have a default value in the Avro schema file.
The schema evolution behavior is as follows:
Column added in new schema: Data created with an older schema produces a default value when the table is using the new schema.
Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.
Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.
Changing type of column in the new schema: If the type coercion is supported by Avro, then the conversion happens. An error is thrown for incompatible types.
Protobuf decoder#
The Protobuf decoder converts the bytes representing a message or key in Protobuf formatted message based on a schema.
For key/message, using the protobuf
decoder, the dataSchema
must be defined.
It points to the location of a valid proto
file of the message which needs to
be decoded. This location can be a remote web server, dataSchema: 'http://example.org/schema/schema.proto'
, or local file, dataSchema: '/usr/local/schema/schema.proto'
. The decoder fails if the location is not
accessible from the coordinator.
For fields, the following attributes are supported:
name
- Name of the column in the Trino table.type
- Trino data type of column.mapping
- slash-separated list of field names to select a field from the Protobuf schema. If field specified inmapping
does not exist in the originalproto
file then a read operation returns NULL.
The following table lists the supported Trino types which can be used in type
for the equivalent Protobuf field types:
Trino data type |
Allowed Protobuf data type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Protobuf type with |
|
|
|
|
|
|
any#
Message types with an Any
field contain an arbitrary serialized message as bytes and a type URL to resolve
that message’s type with a scheme of file://
, http://
, or https://
.
The connector reads the contents of the URL to create the type descriptor
for the Any
message and convert the message to JSON. This behavior is enabled
by setting kafka.protobuf-any-support-enabled
to true
.
The descriptors for each distinct URL are cached for performance reasons and any modifications made to the type returned by the URL requires a restart of Trino.
For example, the following Protobuf schema defines MyMessage
with three
columns:
syntax = "proto3";
message MyMessage {
string stringColumn = 1;
uint32 integerColumn = 2;
uint64 longColumn = 3;
}
And a separate schema which uses an Any
type which is a packed message
of the above type and a valid URL:
syntax = "proto3";
import "google/protobuf/any.proto";
message schema {
google.protobuf.Any any_message = 1;
}
The corresponding Trino column is named any_message
of type JSON
containing a JSON-serialized representation of the Protobuf message:
{
"@type":"file:///path/to/schemas/MyMessage",
"longColumn":"493857959588286460",
"numberColumn":"ONE",
"stringColumn":"Trino"
}
Protobuf schema evolution#
The Protobuf decoder supports the schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Protobuf data created with an older schema. Any change in the Protobuf schema must also be reflected in the topic definition file.
The schema evolution behavior is as follows:
Column added in new schema: Data created with an older schema produces a default value when the table is using the new schema.
Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.
Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.
Changing type of column in the new schema: If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.
Limitations
Protobuf Timestamp has a nanosecond precision but Trino supports decoding/encoding at microsecond precision.
SQL support#
The connector provides read and write access to data and metadata in SEP tables populated by Kafka topics. See Row decoding for more information.
In addition to the globally available and read operation statements, the connector supports the following features:
INSERT, encoded to a specified data format. Kafka inserts.
Performance#
The connector includes a number of performance improvements, detailed in the following sections.
Parallelism#
The connector reads and writes message data from Kafka topics in parallel across workers. The size of data sets for this parallelization is configurable and can therefore be adapted to your specific needs.
For a detailed example, see the Kafka connector tutorial.
Starburst Cached Views#
The connector supports table scan redirection to improve performance and reduce load on the data source.
Security#
The connector includes a number of security-related features, detailed in the following sections.
Password credential pass-through#
The connector supports password credential pass-through. To enable it, edit the catalog properties file to include the authentication type:
kafka.authentication.type=PASSWORD_PASS_THROUGH
For more information about configurations and limitations, see Password credential pass-through.
TLS/SSL encryption#
By default, the connector communicates with the Kafka server using the
PLAINTEXT
protocol. This means sent data is not encrypted.
To encrypt the communication between the connector and the server, change the
kafka-security-protocol
configuration property to:
SSL
- to connect to the server without any authentication or when using SSL authentication (2-way SSL).SASL_SSL
- to connect to the server using SASL authentication.
In addition, you can set following optional configuration properties:
Property name |
Description |
---|---|
|
Location of the truststore file. |
|
Password to the truststore file. |
|
The endpoint identification algorithm used by SEP to validate the
server host name. The default value is |
You can see a full example configuration with SSL
encryption in the
following snippet:
connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.truststore.location=/etc/secrets/kafka.broker.truststore.jks
kafka.ssl.truststore.password=truststore_password
TLS/SSL authentication#
The connector supports TLS/SSL authentication to the Kafka server/broker, also called 2-way authentication.
To use TLS/SSL authentication, add the following configuration properties to your catalog configuration file.
kafka.security-protocol=SSL
You must set the following required configuration properties:
Property name |
Description |
---|---|
|
Location of the keystore file. |
|
Password to the keystore file. |
|
Password of the private key stored in the keystore file. |
The following snippet provides a full example configuration using the SSL authentication:
connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.keystore.location=/etc/secrets/kafka.broker.keystore.jks
kafka.ssl.keystore.password=keystore_password
kafka.ssl.key.password=private_key_password
SASL authentication#
The connector supports SASL authentication to the Kafka server using one of the supported authentication types in the following table:
Authentication type name |
Corresponding Kafka SASL mechanism |
Documentation |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SASL authentication can be enabled for both PLAINTEXT
and SSL
protocols
by setting kafka.security-protocol
to SASL_PLAINTEXT
and SASL_SSL
respectively.
Example configuration of the Kerberos authentication over TLS/SSL:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS
Note
If the SASL authentication type is enabled, then the SSL client authentication (2-way authentication) is disabled, but the client still verifies the server certificate (1-way authentication).
Password authentication#
The password authentication is simple username and password authentication using
the SASL PLAIN
authentication type to authenticate.
Password authentication should only be used with SSL encryption enabled to ensure that the password is not sent without encryption.
Add the following configuration to your catalog properties file to use the password authentication:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=PASSWORD
Set the following required configuration properties:
Property name |
Description |
---|---|
|
User name for Kafka access. |
|
Password for the user. |
Kerberos authentication#
The Kerberos authentication uses the Kerberos service and the SASL GSSAPI
authentication type to authenticate. Add the following configuration to your
catalog properties file to use the Kerberos authentication type:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS
Set the following required configuration properties:
Property Name |
Description |
---|---|
|
Kerberos client principal name. |
|
Kerberos client keytab location. |
|
Kerberos service file location, typically |
|
Kerberos principal name of the Kafka service. |
Example configuration using the Kerberos authentication:
connector.name=kafka
...
kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS
kafka.authentication.client.principal=kafka/broker1.your.org@YOUR.ORG
kafka.authentication.client.keytab=/etc/secrets/kafka_client.keytab
kafka.authentication.config=/etc/krb5.conf
kafka.authentication.service-name=kafka
OAuth 2.0 authentication#
The OAuth 2.0 authentication uses an access token obtained from an OAuth
2.0-compliant authorization server and SASL OAUTHBEARER
authentication type
to authenticate the Kafka connector. Only the client credentials flow is
currently supported.
Add the following configuration to your catalog properties file to use the OAuth 2.0 authentication:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=OAUTH2
Set the following required configuration properties:
Property name |
Description |
---|---|
|
The token URL of an OAuth 2.0-compliant authorization server. |
|
ID of the Kafka connector OAuth2 client. |
|
Secret for the client. |
If the authorization server is using SSL with a self-signed certificate, set the additional properties to use a custom truststore while validating the certificate:
Property name |
Description |
---|---|
|
Location of the SSL truststore file used to verify the OAUTH2 authorization server certificate. |
|
Password to the truststore file. |
|
Type of the truststore file, supported values are: |
OAuth 2.0 token pass-through#
The Kafka connector supports OAuth 2.0 token pass-through.
Configure this option the same as OAuth 2.0 authentication, except for the additional settings described in this section.
Set the authentication type in the coordinator’s config properties file:
http-server.authentication.type=DELEGATED-OAUTH2
Additionally, enable OAUTH2_PASSTHROUGH
in the catalog properties file using
the Kafka connector:
kafka.authentication.type=OAUTH2_PASSTHROUGH
In addition, the SASL mechanism must be enabled with
kafka.security-protocol=SASL_SSL
or
kafka.security-protocol=SASL_PLAINTEXT
as described in the previous section.
SCRAM authentication#
Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512. All examples below use SCRAM-SHA-256, but you can substitute the configuration for SCRAM-SHA-512 as needed.
Add the following configuration to your catalog properties file to use the SCRAM authentication:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=SCRAM_SHA_256
Set the following required configuration properties:
Property name |
Description |
---|---|
|
The user name. |
|
The password. |
(kafka-schema-registry-security)
Security for schema registry access#
The connector supports table schema and schema registry usage, and includes a number of security-related features, detailed in the following sections.
TLS/SSL authentication#
Typically, your schema registry is secured with TLS/SSL, and therefore accessed securely with the HTTPS protocol. The connector supports the 2-way authentication used by the protocol, if you enable the HTTPS protocol in your catalog properties file:
kafka.confluent-schema-registry.security-protocol=HTTPS
If your TLS certificates on the schema registry and on SEP are signed by a certificate authority, it is recognized as such, and no further configuration is necessary.
If you use a custom certificate, you must configure the truststore and keystore to use on SEP after adding the relevant certificates to these files. After creating these files, you must place them on your cluster nodes and configure the relevant properties:
Property name |
Description |
---|---|
|
Location of the truststore file. Absolute path or relative path to
|
|
Password to the truststore file. |
|
The file format of truststore key, |
|
Location of the keystore file. Absolute path or relative path to
|
|
Password to the keystore file. |
|
The file format of keystore key. |
|
Password of the private key stored in the keystore file. |
You can use the secrets support to avoid plain text password values in the catalog file.
Basic authentication#
The schema registry can be configured to require users to authenticate using a
username and password via the basic HTTP authentication type. The connector
supports the Basic authentication used by the schema registry, if you enable
the PASSWORD
authentication type and relevant properties in your catalog
properties file:
kafka.confluent-schema-registry.authentication.type=PASSWORD
kafka.confluent-schema-registry.authentication.username=examplename
kafka.confluent-schema-registry.authentication.password=examplepassword
Kerberos authentication#
The schema registry can be configured to use the Kerberos service and the SASL
GSSAPI
authentication type. Add the following configuration to your catalog
properties file to use the Kerberos authentication type for the schema registry:
kafka.confluent-schema-registry.authentication.type=KERBEROS
kafka.confluent-schema-registry.authentication.client.principal=kafka/host.your.org@YOUR.ORG
kafka.confluent-schema-registry.authentication.client.keytab=/etc/secrets/kafka_client.keytab
kafka.confluent-schema-registry.authentication.config=/etc/krb5.conf
kafka.confluent-schema-registry.authentication.service-name=kafka
Cloudera schema registry#
To use the Cloudera (CDP) schema registry, the following properties must be set:
kafka.table-description-supplier=cloudera
kafka.cloudera-schema-registry-url=http://schema-registry.example.com:8081/api/v1
To configure Kerberos authentication with a CDP schema registry, you must include the following additional properties:
kafka.cloudera-schema-registry.authentication.type=KERBEROS
kafka.cloudera-schema-registry.authentication.client.principal=kafka/broker1.example.com@EXAMPLE.COM
kafka.cloudera-schema-registry.authentication.client.keytab=/etc/kafka/kerberos/broker1.keytab
kafka.cloudera-schema-registry.authentication.config=/etc/krb5.conf
kafka.cloudera-schema-registry.authentication.service-name=kafka
The full list of configuration properties is as follows:
Property name |
Description |
---|---|
|
URL for Cloudera Schema Registry, for example: http://example.com:8080/api/v1. |
|
Credential source for schema registry. Valid values are |
|
The interval at which the topic to subjects cache will be refreshed. |
|
How to handle struct types with no fields. Valid values are:
|
|
Kerberos client principal name. |
|
Kerberos client keytab location. |
|
Kerberos service file location, typically |
|
Kerberos principal name of the Kafka service. |
Warning
The connector only supports CDP Schema Registry Kafka messages serialized using
protocol ID 3 (VERSION_ID_AS_INT_PROTOCOL
). Attempting to use other protocols
results in an error.