Confluent Cloud

This page describes how to create and maintain connections to your Confluent Cloud cluster.

In order to consume events from your Kafka topics hosted in Confluent Cloud within Upsolver, you need to first create a connection. The connection defines how to connect and authenticate to your Confluent Kafka cluster.

Create a Confluent connection

Simple example

A Confluent connection can be created as simply as follows:

CREATE CONFLUENT CONNECTION my_confluent_connection
    HOSTS = ('foo:9092', 'bar:9092')
    SASL_USERNAME = 'API_KEY'
    SASL_PASSWORD = 'SECRET'
    COMMENT = 'My new Confluent Cloud connection';

You'll need to provide your Confluent API credentials to read from your Confluent cluster.

Full example

The following example creates a Confluent connection but sets additional options, such as CONSUMER_PROPERTIES:

CREATE CONFLUENT CONNECTION my_confluent_connection
    HOSTS = ('foo:9092', 'bar:9092')
    CONSUMER_PROPERTIES = 
      'bootstrap.servers = HOST:PORT
      security.protocol = SASL_SSL
      sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule   
               required username = "API_KEY"   password = "SECRET";
      ssl.endpoint.identification.algorithm = https
      sasl.mechanism = PLAIN'
    VERSION = CURRENT
    REQUIRE_STATIC_IP = true
    SSL = false
    TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
    COMMENT = 'My new Confluent connection';

Alter a Confluent connection

A number of connection options are considered mutable, meaning that in some cases, you can run a SQL command to alter an existing Confluent connection rather than creating a new one.

For example, take the simple Confluent connection we created previously:

CREATE CONFLUENT CONNECTION my_confluent_connection
    HOSTS = ('foo:9092', 'bar:9092');

If you needed to update the consumer properties, rather than creating a new connection, you can run the following command:

ALTER CONFLUENT CONNECTION my_confluent_connection
    SET CONSUMER_PROPERTIES = 
       'bootstrap.servers = HOST:PORT
       security.protocol = SASL_SSL
       sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule   
             required username = "API_KEY"   password = "SECRET";
       ssl.endpoint.identification.algorithm = https
       sasl.mechanism = PLAIN'; 

Note that some options such as VERSION cannot be altered once the connection has been created.

Drop a Confluent connection

If you no longer need a connection, you can easily drop it with the following SQL command:

DROP CONNECTION my_confluent_connection; 

However, note that if existing tables or jobs are dependent upon the connection, they cannot be deleted.

Learn More

Last updated