Links

Apache Kafka

This article describes how to create a connection to Apache Kafka using a SQL command.
To ingest data from your Apache Kafka topic into a table within SQLake, you must first create a connection that provides the appropriate credentials to access your topic.

Syntax

CREATE KAFKA CONNECTION
<connection_identifier>
HOST[S] = { '<host>' | ('<host>' [, ...]) }
[ CONSUMER_PROPERTIES = '<properties>' ]
[ VERSION = { CURRENT | LEGACY } ]
[ REQUIRE_STATIC_IP = { TRUE | FALSE } ]
[ SSL = { TRUE | FALSE } ]
[ TOPIC_DISPLAY_FILTER[S] = { '<topic_name>' | ('<topic_name>' [, ...]) } ]
[ COMMENT = '<comment>' ]

Jump to

Connection options

HOST[S]

Type: text | list
A single host or a list of Kafka hosts in the format of hostname:port.

CONSUMER_PROPERTIES — editable

Type: text_area
(Optional) Extra properties to configure for the consumer.

VERSION

Values: { CURRENT | LEGACY }
Default: CURRENT
(Optional) Legacy is for anything before 0.10.

REQUIRE_STATIC_IP — editable

Type: boolean
Default: true
(Optional) With Upsolver clusters, you can configure how many elastic IPs it should allocate and use within that cluster.
If the cluster running the job has at least one elastic IP set and REQUIRE_STATIC_IP is enabled, then the job runs on a server that has an elastic IP associated with it

SSL — editable

Type: boolean
Default: false
(Optional) If enabled, SSL is used to connect.
Please contact Upsolver to ensure your CA certificate is supported.

TOPIC_DISPLAY_FILTER[S] — editable

Type: text | list
(Optional) A single topic or the list of topics to show. If left empty, all topics are visible.

COMMENT — editable

Type: text
(Optional) A description or comment regarding this connection.

Minimum example

CREATE KAFKA CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092');

Full example

CREATE KAFKA CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092')
CONSUMER_PROPERTIES = 'bootstrap.servers = HOST:PORT
security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username = "API_KEY" password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN'
VERSION = CURRENT
REQUIRE_STATIC_IP = true
SSL = false
TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
COMMENT = 'My new Kafka connection';