Comment on page
Confluent Cloud
This page describes how to create and maintain connections to your Confluent Cloud cluster.
In order to consume events from your Kafka topics hosted in Confluent Cloud within Upsolver, you need to first create a connection. The connection defines how to connect and authenticate to your Confluent Kafka cluster.
A Confluent connection can be created as simply as follows:
CREATE CONFLUENT CONNECTION my_kafka_connection
HOSTS = ('foo:9092', 'bar:9092')
SASL_USERNAME = 'API_KEY'
SASL_PASSWORD = 'SECRET'
COMMENT = 'My new Confluent Cloud connection';
You'll need to provide your Confluent API credentials to read from your Confluent cluster.
The following example creates a Confluent connection but sets additional options, such as
CONSUMER_PROPERTIES
:CREATE CONFLUENT CONNECTION my_confluent_connection
HOSTS = ('foo:9092', 'bar:9092')
CONSUMER_PROPERTIES = 'bootstrap.servers = HOST:PORT
security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username = "API_KEY" password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN'
KAFKA_VERSION = CURRENT
REQUIRE_STATIC_IP = true
SSL = false
TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
COMMENT = 'My new Kafka connection';
For the full list of connection options with syntax and detailed descriptions, see Confluent connection with SQL.
After you have created your connection, you are ready to move onto the next step of building your data pipeline. See reading your data into Upsolver with an ingestion job for more information.
A number of connection options are considered mutable, meaning that in some cases, you can run a SQL command to alter an existing Confluent connection rather than creating a new one.
For example, take the simple Confluent connection we created previously:
CREATE CONFLUENT CONNECTION my_confluent_connection
HOSTS = ('foo:9092', 'bar:9092');
If you needed to update the consumer properties, rather than creating a new connection, you can run the following command:
ALTER CONFLUENT CONNECTION my_confluent_connection
SET CONSUMER_PROPERTIES = 'bootstrap.servers = HOST:PORT
security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username = "API_KEY" password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN';
Note that some options such as
VERSION
cannot be altered once the connection has been created.To check which specific connection options are mutable, please refer to the Confluent connection with SQL guide.
If you no longer need a connection, you can easily drop it with the following SQL command:
DROP CONNECTION my_confluent_connection;
However, note that if existing tables or jobs are dependent upon the connection, it cannot be deleted.
Last modified 21d ago