Apache Kafka
This article describes how to create a connection to Apache Kafka using a SQL command.
To ingest data from your Apache Kafka topic into a table within SQLake, you must first create a connection that provides the appropriate credentials to access your topic.
CREATE KAFKA CONNECTION
<connection_identifier>
HOST[S] = { '<host>' | ('<host>' [, ...]) }
[ CONSUMER_PROPERTIES = '<properties>' ]
[ VERSION = { CURRENT | LEGACY } ]
[ REQUIRE_STATIC_IP = { TRUE | FALSE } ]
[ SSL = { TRUE | FALSE } ]
[ TOPIC_DISPLAY_FILTER[S] = { '<topic_name>' | ('<topic_name>' [, ...]) } ]
[ COMMENT = '<comment>' ]
Type:
text | list
A single host or a list of Kafka hosts in the format of
hostname:port
.Type:
text_area
(Optional) Extra properties to configure for the consumer.
Values:
{ CURRENT | LEGACY }
Default:
CURRENT
(Optional) Legacy is for anything before 0.10.
Type:
boolean
Default:
true
(Optional) With Upsolver clusters, you can configure how many elastic IPs it should allocate and use within that cluster.
If the cluster running the job has at least one elastic IP set and
REQUIRE_STATIC_IP
is enabled, then the job runs on a server that has an elastic IP associated with itType:
boolean
Default:
false
(Optional) If enabled, SSL is used to connect.
Please contact Upsolver to ensure your CA certificate is supported.
Type:
text | list
(Optional) A single topic or the list of topics to show. If left empty, all topics are visible.
Type:
text
(Optional) A description or comment regarding this connection.
CREATE KAFKA CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092');
CREATE KAFKA CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092')
CONSUMER_PROPERTIES = 'bootstrap.servers = HOST:PORT
security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username = "API_KEY" password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN'
VERSION = CURRENT
REQUIRE_STATIC_IP = true
SSL = false
TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
COMMENT = 'My new Kafka connection';
Last modified 14d ago