Starburst Kafka connector#
The Starburst Kafka connector included in Starburst Enterprise platform (SEP) is an extended version of the Kafka connector with configuration and usage identical. It includes the following features:
TLS/SSL authentication to Kafka
Kerberos support including TLS/SSL support
Encoder and decoder for protobuf messages
Note
The additional features of the connector require a valid Starburst Enterprise license, unless otherwise noted.
Configuration#
The connector configuration is identical to the configuration for the base Kafka connector.
A minimal configuration uses the connector.name kafka
, and adds
configuration for nodes and table names as shown in the following snippet:
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
TLS/SSL authentication for schema registry access#
The connector supports table schema and schema registry usage, and adds TLS/SSL support to access the schema registry.
Typically your schema registry is secured with TLS/SSL, and therefore accessed securely with the HTTPS protocol. The connector supports the 2-way authentication used by the protocol, if you enable the HTTPS protocol in your catalog properties file:
kafka.confluent-schema-registry.security-protocol=HTTPS
If your TLS certificates on the schema registry and on SEP are signed by a certificate authority, it is recognized as such, and no further configuration is necessary.
If you use a custom certificate, you have to configure the truststore and keystore to use on SEP after adding the relevant certificates to these files. After creating these files, you have to place them on your cluster nodes and configure the relevant properties:
Property name |
Description |
---|---|
|
Location of the truststore file. Absolute path, or relative path to
|
|
Password to the truststore file. |
|
The file format of truststore key, |
|
Location of the keystore file. Absolute path, or relative path to
|
|
Password to the keystore file. |
|
The file format of keystore key. |
|
Password of the private key stored in the keystore file. |
You can use the secrets support to avoid plain text password values in the catalog file.
Security#
The connector includes a number of security-related features, detailed in the following sections.
TLS/SSL authentication#
With TLS/SSL authentication, the connector authenticates with the Kafka server/broker, also called 2-way authentication. Add the following configuration to your catalog file to use TLS/SSL:
kafka.security-protocol=SSL
You need to set the following required configuration properties:
Property name |
Description |
---|---|
|
Location of the truststore file. |
|
Password to the truststore file. |
|
Location of the keystore file. |
|
Password to the keystore file. |
|
Password of the private key stored in the keystore file. |
In addition, you can set the following optional properties:
Property name |
Description |
---|---|
|
The endpoint identification algorithm used by SEP to validate the
server host name. The default value is |
You can see a full example configuration with SSL
security protocol in the
following snippet:
connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.truststore.location=/etc/secrets/kafka.broker.truststore.jks
kafka.ssl.truststore.password=truststore_passwrod
kafka.ssl.keystore.location=/etc/secrets/kafka.broker.keystore.jks
kafka.ssl.keystore.password=keystore_password
kafka.ssl.key.password=private_key_password
SASL authentication#
With SASL authentication, the connector authenticates with the Kafka server using one of the following, supported SASL mechanisms:
GSSAPI
(Kerberos)OAUTHBEARER
(OAuth2)
SASL authentication can be enabled for both PLAINTEXT
and SSL
protocols
by setting kafka.security-protocol
to SASL_PLAINTEXT
or SASL_SSL
values.
Example configuration of GSSAPI
(Kerberos) authentication over TLS/SSL:
kafka.security-protocol=SASL_SSL
kafka.sasl.mechanism=GSSAPI
Note
If SASL authentication mechanism is enabled, then the SSL client authentication (2-way authentication) is disabled, but the client still verifies the server certificate (1-way authentication).
Kerberos authentication#
The GSSAPI SASL mechanism can be used to authenticate the Kafka connector using the Kerberos service. Add the following configuration to your catalog properties file to use the GSSAPI (Kerberos) SASL mechanism:
kafka.security-protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=GSSAPI
Set the following required configuration properties:
Property Name |
Description |
---|---|
|
Kafka Kerberos client principal. |
|
Kafka Kerberos client keytab location. |
|
Kerberos service file location. Typically |
|
The Kerberos principal name of Kafka service. |
Example configuration of Kerberos authentication using GSSAPI
with SASL
:
connector.name=kafka
...
kafka.security-protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=GSSAPI
kafka.kerberos.client.principal=kafka/broker1.your.org@YOUR.ORG
kafka.kerberos.client.keytab=/etc/secrets/kafka_client.keytab
kafka.kerberos.config=/etc/krb5.conf
kafka.kerberos.service-name=kafka
OAuth2 authentication#
The OAUTHBEARER SASL mechanism can be used to authenticate with the Kafka connector using an access token obtained from an OAuth 2.0 compliant authorization server. Only the client credentials flow is supported.
Add the following configuration to your catalog properties file to use the OAUTHBEARER SASL mechanism:
kafka.security-protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=OAUTHBEARER
Set the following required configuration properties:
Property Name |
Description |
---|---|
|
The token URL of an OAuth 2.0 compliant authorization server. |
|
ID of the Kafka connector OAuth2 client. |
|
Secret for the client. |
If the authorization server is using SSL with a self-signed certificate set the additional properties to use a custom truststore while validating the certificate:
Property Name |
Description |
---|---|
|
Location of the SSL truststore file used to verify the OAUTH2 authorization server certificate. |
|
Password to the truststore file. |
|
Type of the truststore file. Supported values are: |
Protobuf encoder#
The Protobuf encoder serializes rows to DynamicMessages as defined by the proto file.
Note
The Protobuf schema is encoded with the table column values in each Kafka message
The dataSchema
must be defined in the table definition file to use the
Protobuf encoder. It points to the location of the proto file for the key or
message.
Proto files can be retrieved via HTTP or HTTPS from a remote server with the following syntax:
"dataSchema": "http://example.org/schema/schema.proto"
Local files need to be available on all nodes and use an absolute path in the syntax, for example:
"dataSchema": "/usr/local/schema/schema.proto"
The following field attributes are supported:
name
- Name of the column in the SEP table.type
- SEP type of column.mapping
- slash-separated list of field names to select a field from the Protobuf schema. If the field specified inmapping
does not exist in the original Avro schema, then a write operation fails.
The following table lists supported SEP types, which can be used in type
for the equivalent Avro field type.
SEP type |
Allowed Protobuf types |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Protobuf type with |
|
|
|
|
Example Protobuf field definition in a table definition file for a Kafka message:
{
"tableName": "your-table-name",
"schemaName": "your-schema-name",
"topicName": "your-topic-name",
"key": { "..." },
"message":
{
"dataFormat": "protobuf",
"dataSchema": "/message_schema.proto",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
Example Protobuf schema definition for the preceding table definition:
syntax = "proto3";
message schema {
uint64 field1 = 1 ;
string field2 = 2;
bool field3 = 3;
}
Example insert query for the preceding table definition:
INSERT INTO example_protobuf_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);
Protobuf decoder#
The Protobuf decoder converts the bytes representing a message or key in Protobuf formatted message based on a schema.
For key/message, using the protobuf
decoder, the dataSchema
must be
defined. It points to the location of a valid proto file of the message which
needs to be decoded. This location can be a remote web server, dataSchema:
'http://example.org/schema/schema.proto'
, or local file, dataSchema:
'/usr/local/schema/schema.proto'
. The decoder fails if the location is not
accessible from the coordinator.
For fields, the following attributes are supported:
name
- Name of the column in the SEP table.type
- SEP type of column.mapping
- slash-separated list of field names to select a field from the Avro schema. If field specified inmapping
does not exist in the original proto file then a read operation returns NULL.
Table below lists supported SEP types which can be used in type
for the
equivalent Proto field type/s.
SEP type |
Allowed Protobuf types |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Protobuf data type with |
|
|
|
|
Protobuf schema evolution#
The Protobuf decoder supports the schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Protobuf serialized data created with an older schema. Any change in the Proto file must also be reflected in the topic definition file.
The schema evolution behavior is as follows:
Column added in new schema: Data created with an older schema produces a default value, when the table is using the new schema.
Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.
Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.
Changing type of column in the new schema: If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.
Protobuf limitations#
Protobuf specific types like
any
,oneof
are not supported.Protobuf Timestamp has a nanosecond precision but SEP supports decoding/ encoding at microsecond precision.