Starburst Kafka connector#

The Starburst Kafka connector included in Starburst Enterprise platform (SEP) is an extended version of the Kafka connector with configuration and usage identical. It includes the following features:

  • TLS/SSL authentication to Kafka

  • Kerberos support including TLS/SSL support

  • Encoder and decoder for protobuf messages

Note

The additional features of the connector require a valid Starburst Enterprise license, unless otherwise noted.

Configuration#

The connector configuration is identical to the configuration for the base Kafka connector.

A minimal configuration uses the connector.name kafka, and adds configuration for nodes and table names as shown in the following snippet:

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

TLS/SSL authentication for schema registry access#

The connector supports table schema and schema registry usage, and adds TLS/SSL support to access the schema registry.

Typically your schema registry is secured with TLS/SSL, and therefore accessed securely with the HTTPS protocol. The connector supports the 2-way authentication used by the protocol, if you enable the HTTPS protocol in your catalog properties file:

kafka.confluent-schema-registry.security-protocol=HTTPS

If your TLS certificates on the schema registry and on SEP are signed by a certificate authority, it is recognized as such, and no further configuration is necessary.

If you use a custom certificate, you have to configure the truststore and keystore to use on SEP after adding the relevant certificates to these files. After creating these files, you have to place them on your cluster nodes and configure the relevant properties:

Truststore and keystore properties#

Property name

Description

kafka.confluent-schema-registry.ssl.truststore.location

Location of the truststore file. Absolute path, or relative path to etc.

kafka.confluent-schema-registry.ssl.truststore.password

Password to the truststore file.

kafka.confluent-schema-registry.ssl.truststore.type

The file format of truststore key, JKS or PKCS12.

kafka.confluent-schema-registry.ssl.keystore.location

Location of the keystore file. Absolute path, or relative path to etc.

kafka.confluent-schema-registry.ssl.keystore.password

Password to the keystore file.

kafka.confluent-schema-registry.ssl.keystore.type

The file format of keystore key. JKS or PKCS12.

kafka.confluent-schema-registry.ssl.key.password

Password of the private key stored in the keystore file.

You can use the secrets support to avoid plain text password values in the catalog file.

Security#

The connector includes a number of security-related features, detailed in the following sections.

TLS/SSL authentication#

With TLS/SSL authentication, the connector authenticates with the Kafka server/broker, also called 2-way authentication. Add the following configuration to your catalog file to use TLS/SSL:

kafka.security-protocol=SSL

You need to set the following required configuration properties:

Required settings#

Property name

Description

kafka.ssl.truststore.location

Location of the truststore file.

kafka.ssl.truststore.password

Password to the truststore file.

kafka.ssl.keystore.location

Location of the keystore file.

kafka.ssl.keystore.password

Password to the keystore file.

kafka.ssl.key.password

Password of the private key stored in the keystore file.

In addition, you can set the following optional properties:

Optional setting#

Property name

Description

kafka.endpoint-identification-algorithm

The endpoint identification algorithm used by SEP to validate the server host name. The default value is HTTPS. SEP verifies that the broker host name matches the host name in the broker’s certificate. To disable server host name verification use disabled.

You can see a full example configuration with SSL security protocol in the following snippet:

connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.truststore.location=/etc/secrets/kafka.broker.truststore.jks
kafka.ssl.truststore.password=truststore_passwrod
kafka.ssl.keystore.location=/etc/secrets/kafka.broker.keystore.jks
kafka.ssl.keystore.password=keystore_password
kafka.ssl.key.password=private_key_password

SASL authentication#

With SASL authentication, the connector authenticates with the Kafka server using one of the following, supported SASL mechanisms:

  • GSSAPI (Kerberos)

  • OAUTHBEARER (OAuth2)

SASL authentication can be enabled for both PLAINTEXT and SSL protocols by setting kafka.security-protocol to SASL_PLAINTEXT or SASL_SSL values.

Example configuration of GSSAPI (Kerberos) authentication over TLS/SSL:

kafka.security-protocol=SASL_SSL
kafka.sasl.mechanism=GSSAPI

Note

If SASL authentication mechanism is enabled, then the SSL client authentication (2-way authentication) is disabled, but the client still verifies the server certificate (1-way authentication).

Kerberos authentication#

The GSSAPI SASL mechanism can be used to authenticate the Kafka connector using the Kerberos service. Add the following configuration to your catalog properties file to use the GSSAPI (Kerberos) SASL mechanism:

kafka.security-protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=GSSAPI

Set the following required configuration properties:

Required settings#

Property Name

Description

kafka.kerberos.client.principal

Kafka Kerberos client principal.

kafka.kerberos.client.keytab

Kafka Kerberos client keytab location.

kafka.kerberos.config

Kerberos service file location. Typically /etc/krb5.conf.

kafka.kerberos.service-name

The Kerberos principal name of Kafka service.

Example configuration of Kerberos authentication using GSSAPI with SASL:

connector.name=kafka
...
kafka.security-protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=GSSAPI
kafka.kerberos.client.principal=kafka/broker1.your.org@YOUR.ORG
kafka.kerberos.client.keytab=/etc/secrets/kafka_client.keytab
kafka.kerberos.config=/etc/krb5.conf
kafka.kerberos.service-name=kafka

OAuth2 authentication#

The OAUTHBEARER SASL mechanism can be used to authenticate with the Kafka connector using an access token obtained from an OAuth 2.0 compliant authorization server. Only the client credentials flow is supported.

Add the following configuration to your catalog properties file to use the OAUTHBEARER SASL mechanism:

kafka.security-protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=OAUTHBEARER

Set the following required configuration properties:

Required settings#

Property Name

Description

kafka.sasl.oauth2.token-url

The token URL of an OAuth 2.0 compliant authorization server.

kafka.sasl.oauth2.client-id

ID of the Kafka connector OAuth2 client.

kafka.sasl.oauth2.client-secret

Secret for the client.

If the authorization server is using SSL with a self-signed certificate set the additional properties to use a custom truststore while validating the certificate:

Additional settings#

Property Name

Description

kafka.sasl.oauth2.ssl.truststore.path

Location of the SSL truststore file used to verify the OAUTH2 authorization server certificate.

kafka.sasl.oauth2.ssl.truststore.password

Password to the truststore file.

kafka.sasl.oauth2.ssl.truststore.type

Type of the truststore file. Supported values are: JKS and PKCS12.

Protobuf encoder#

The Protobuf encoder serializes rows to DynamicMessages as defined by the proto file.

Note

The Protobuf schema is encoded with the table column values in each Kafka message

The dataSchema must be defined in the table definition file to use the Protobuf encoder. It points to the location of the proto file for the key or message.

Proto files can be retrieved via HTTP or HTTPS from a remote server with the following syntax:

"dataSchema": "http://example.org/schema/schema.proto"

Local files need to be available on all nodes and use an absolute path in the syntax, for example:

"dataSchema": "/usr/local/schema/schema.proto"

The following field attributes are supported:

  • name - Name of the column in the SEP table.

  • type - SEP type of column.

  • mapping - slash-separated list of field names to select a field from the Protobuf schema. If the field specified in mapping does not exist in the original Avro schema, then a write operation fails.

The following table lists supported SEP types, which can be used in type for the equivalent Avro field type.

SEP to Protobuf type mapping for encoding#

SEP type

Allowed Protobuf types

BOOLEAN

bool

INTEGER

int32, uint32, sint32, fixed32, sfixed32

BIGINT

int64, uint64, sint64, fixed64, sfixed64

DOUBLE

double

REAL

float

VARCHAR / VARCHAR(x)

string

VARBINARY

bytes

ROW

Message

ARRAY

Protobuf type with repeated field

MAP

Map

TIMESTAMP

Timestamp, predefined in proto file

Example Protobuf field definition in a table definition file for a Kafka message:

{
  "tableName": "your-table-name",
  "schemaName": "your-schema-name",
  "topicName": "your-topic-name",
  "key": { "..." },
  "message":
  {
    "dataFormat": "protobuf",
    "dataSchema": "/message_schema.proto",
    "fields":
    [
      {
        "name": "field1",
        "type": "BIGINT",
        "mapping": "field1"
      },
      {
        "name": "field2",
        "type": "VARCHAR",
        "mapping": "field2"
      },
      {
        "name": "field3",
        "type": "BOOLEAN",
        "mapping": "field3"
      }
    ]
  }
}

Example Protobuf schema definition for the preceding table definition:

syntax = "proto3";

message schema {
  uint64 field1 = 1 ;
  string field2 = 2;
  bool field3 = 3;
}

Example insert query for the preceding table definition:

INSERT INTO example_protobuf_table (field1, field2, field3)
  VALUES (123456789, 'example text', FALSE);

Protobuf decoder#

The Protobuf decoder converts the bytes representing a message or key in Protobuf formatted message based on a schema.

For key/message, using the protobuf decoder, the dataSchema must be defined. It points to the location of a valid proto file of the message which needs to be decoded. This location can be a remote web server, dataSchema: 'http://example.org/schema/schema.proto', or local file, dataSchema: '/usr/local/schema/schema.proto'. The decoder fails if the location is not accessible from the coordinator.

For fields, the following attributes are supported:

  • name - Name of the column in the SEP table.

  • type - SEP type of column.

  • mapping - slash-separated list of field names to select a field from the Avro schema. If field specified in mapping does not exist in the original proto file then a read operation returns NULL.

Table below lists supported SEP types which can be used in type for the equivalent Proto field type/s.

SEP to Protobuf type mapping for decoding#

SEP type

Allowed Protobuf types

BOOLEAN

bool

INTEGER

int32, uint32, sint32, fixed32, sfixed32

BIGINT

int64, uint64, sint64, fixed64, sfixed64

DOUBLE

double

REAL

float

VARCHAR / VARCHAR(x)

string

VARBINARY

bytes

ROW

Message

ARRAY

Protobuf data type with repeated field

MAP

map

TIMESTAMP

Timestamp (predefined proto file)

Protobuf schema evolution#

The Protobuf decoder supports the schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Protobuf serialized data created with an older schema. Any change in the Proto file must also be reflected in the topic definition file.

The schema evolution behavior is as follows:

  • Column added in new schema: Data created with an older schema produces a default value, when the table is using the new schema.

  • Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.

  • Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.

  • Changing type of column in the new schema: If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.

Protobuf limitations#

  • Protobuf specific types like any, oneof are not supported.

  • Protobuf Timestamp has a nanosecond precision but SEP supports decoding/ encoding at microsecond precision.