Links

Apache Kafka

This article describes how to create a connection to Apache Kafka using a SQL command.
To ingest data from your Apache Kafka topic into a table within SQLake, you must first create a connection that provides the appropriate credentials to access your topic.

Syntax

CREATE KAFKA CONNECTION
<connection_identifier>
HOST[S] = { '<host>' | ('<host>' [, ...]) }
[ CONSUMER_PROPERTIES = '<properties>' ]
[ KAFKA_VERSION = { CURRENT | LEGACY } ]
[ REQUIRE_STATIC_IP = { TRUE | FALSE } ]
[ SSL = { TRUE | FALSE } ]
[ TOPIC_DISPLAY_FILTER[S] = { '<topic_name>' | ('<topic_name>' [, ...]) } ]
[ SASL_USERNAME = '<username/api_key>' ]
[ SASL_PASSWORD = '<password/api_secret>' ]
[ COMMENT = '<comment>' ]

Jump to

Connection options

HOST[S]

Type: text | list
A single host or a list of Kafka hosts in the format of hostname:port.

CONSUMER_PROPERTIES — editable

Type: text_area
(Optional) Extra properties to configure for the consumer.

KAFKA_VERSION

Values: { CURRENT | LEGACY }
Default: CURRENT
(Optional) Legacy is for anything before 0.10.

REQUIRE_STATIC_IP — editable

Type: Boolean
Default: true
(Optional) With Upsolver clusters, you can configure how many elastic IPs it should allocate and use within that cluster.
If the cluster running the job has at least one elastic IP set and REQUIRE_STATIC_IP is enabled, then the job runs on a server that has an elastic IP associated with it

SSL — editable

Type: Boolean
Default: false
(Optional) If enabled, SSL is used to connect.
Please contact Upsolver to ensure your CA certificate is supported.

TOPIC_DISPLAY_FILTER[S] — editable

Type: text | list
(Optional) A single topic or the list of topics to show. If left empty, all topics are visible.

SASL_USERNAME

Type: text
(Optional) Configure the SASL username to be used to authenticate with Kafka.

SASL_PASSWORD

Type: text
(Optional) Configure the SASL password to be used to authenticate with Kafka.
When this setting is enabled along with SASL_USERNAME the following configuration will get added to the consumer/producer properties of the client created:
security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username = "API_KEY" password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN
For other authentication settings, you may configure the CONSUMER_PROPERTIES directly.

COMMENT — editable

Type: text
(Optional) A description or comment regarding this connection.

Minimum example

CREATE KAFKA CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092');

Full example

CREATE KAFKA CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092')
SASL_USERNAME = 'API_KEY'
SASL_PASSWORD = 'SECRET'
KAFKA_VERSION = CURRENT
REQUIRE_STATIC_IP = true
SSL = false
TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
COMMENT = 'My new Kafka connection';