Apache Kafka

To ingest data from your Apache Kafka topic into a table within Upsolver, you must first create a connection that provides the appropriate credentials to access your topic.

Syntax

CREATE KAFKA CONNECTION <connection_identifier> 
    HOST[S] = { '<host>' | ('<host>' [, ...]) }
    [ CONSUMER_PROPERTIES = '<properties>' ] 
    [ VERSION = { CURRENT | LEGACY } ]  
    [ REQUIRE_STATIC_IP = { TRUE | FALSE } ]
    [ SSL = { TRUE | FALSE } ]
    [ TOPIC_DISPLAY_FILTER[S] = { '<topic_name>' | ('<topic_name>' [, ...]) } ]
    [ SASL_USERNAME = '<username/api_key>' ]
    [ SASL_PASSWORD = '<password/api_secret>' ]    
    [ COMMENT = '<comment>' ]

Jump to

Connection options

HOST[S]

Type: text | list

A single host or a list of Kafka hosts in the format of hostname:port.

CONSUMER_PROPERTIES — editable

Type: text_area

(Optional) Extra properties to configure for the consumer.

See: Consumer configurations

VERSION

Values: { CURRENT | LEGACY }

Default: CURRENT

(Optional) Legacy is for anything before 0.10.

REQUIRE_STATIC_IP — editable

Type: Boolean

Default: true

(Optional) With Upsolver clusters, you can configure how many elastic IPs it should allocate and use within that cluster.

If the cluster running the job has at least one elastic IP set and REQUIRE_STATIC_IP is enabled, then the job runs on a server that has an elastic IP associated with it

SSL — editable

Type: Boolean

Default: false

(Optional) If enabled, SSL is used to connect.

Please contact Upsolver to ensure your CA certificate is supported.

TOPIC_DISPLAY_FILTER[S] — editable

Type: text | list

(Optional) A single topic or the list of topics to show. If left empty, all topics are visible.

SASL_USERNAME

Type: text

(Optional) Configure the SASL username to be used to authenticate with Kafka.

SASL_PASSWORD

Type: text

(Optional) Configure the SASL password to be used to authenticate with Kafka.

When this setting is enabled along with SASL_USERNAME the following configuration will get added to the consumer/producer properties of the client created:

security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule   
    required username = "API_KEY"   password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN

For other authentication settings, you may configure the CONSUMER_PROPERTIES directly.

COMMENT — editable

Type: text

(Optional) A description or comment regarding this connection.

Minimum example

CREATE KAFKA CONNECTION my_kafka_connection
    HOSTS = ('foo:9092', 'bar:9092');

Full example

CREATE KAFKA CONNECTION my_kafka_connection
    HOSTS = ('foo:9092', 'bar:9092')
    SASL_USERNAME = 'API_KEY'
    SASL_PASSWORD = 'SECRET'
    VERSION = CURRENT
    REQUIRE_STATIC_IP = true
    SSL = false
    TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
    COMMENT = 'My new Kafka connection';

Last updated