LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
Reference
Reference
  • Reference
  • ✨Learning Paths
    • Amazon Kinesis
    • Amazon S3
    • Apache Kafka
    • Confluent Cloud
    • Microsoft SQL Server
    • MongoDB
    • MySQL
    • PostgreSQL
  • SQL COMMANDS
    • Clusters
      • CREATE CLUSTER
      • ALTER CLUSTER
      • ROLL CLUSTER
      • STOP/START CLUSTER
      • DROP CLUSTER
    • Connections
      • CREATE CONNECTION
        • Amazon Kinesis
        • Amazon Redshift
        • Amazon S3
        • Apache Kafka
        • AWS Glue Data Catalog
        • ClickHouse
        • Confluent Cloud
        • Elasticsearch
        • Hive Metastore
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • Polaris Catalog
        • Iceberg REST catalogs
        • PostgreSQL
        • Snowflake
      • ALTER CONNECTION
      • DROP CONNECTION
      • CDC Connections with SSL
    • Jobs
      • CREATE JOB
        • Ingestion
          • Amazon Kinesis
          • Amazon S3
          • Apache Kafka
          • Confluent Kafka
          • Content Types
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Replication
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Transformation
          • INSERT
            • MAP_COLUMNS_BY_NAME
            • UNNEST
          • MERGE
          • SELECT
          • Job Options
            • Amazon Redshift
            • Amazon S3
            • Apache Iceberg
            • ClickHouse
            • Data Lake Tables
            • Elasticsearch
            • PostgreSQL
            • Snowflake
        • Monitoring
          • Amazon CloudWatch
          • Datadog
          • Dynatrace
      • ALTER JOB
      • PAUSE / RESUME JOB
      • DROP JOB
    • Materialized Views
      • CREATE MATERIALIZED VIEW
      • ALTER MATERIALIZED VIEW
      • DROP MATERIALIZED VIEW
      • Join with a Materialized View
    • Tables
      • CREATE TABLE
      • ALTER TABLE
      • DROP TABLE
    • Iceberg Tables
      • Upsolver Managed Tables
        • CREATE ICEBERG TABLE
        • ALTER ICEBERG TABLE
        • DROP ICEBERG TABLE
        • OPTIMIZE ICEBERG TABLE
      • External Iceberg Tables
        • CREATE EXTERNAL ICEBERG TABLE
        • DROP EXTERNAL ICEBERG TABLE
      • Mirror Iceberg Tables
        • CREATE MIRROR
        • ALTER MIRROR
        • PAUSE / RESUME MIRROR
        • DROP MIRROR
  • FUNCTIONS & OPERATORS
    • Data Types
    • Functions
      • Aggregate
        • APPROX_COUNT_DISTINCT
        • APPROX_COUNT_DISTINCT_EACH
        • AVG
        • AVG_EACH
        • AVG_TIME_SERIES
        • COLLECT_SET
        • COLLECT_SET_EACH
        • COUNT
        • COUNT(*)
        • COUNT(DISTINCT ...)
        • COUNT_EACH
        • COUNT_IF
        • DYNAMIC_SESSIONS
        • FIRST
        • FIRST_ARRAY
        • FIRST_EACH
        • FIRST_TIME_SERIES
        • LAST
        • LAST_ARRAY
        • LAST_EACH
        • LAST_K
        • LAST_K_EACH
        • LAST_TIME_SERIES
        • MAX
        • MAX_BY
        • MAX_EACH
        • MAX_TIME_SERIES
        • MIN
        • MIN_BY
        • MIN_EACH
        • MIN_TIME_SERIES
        • SESSION_COUNT
        • STD_DEV
        • STD_DEV_EACH
        • STRING_MAX
        • STRING_MAX_EACH
        • STRING_MIN_EACH
        • SUM
        • SUM_EACH
        • SUM_TIME_SERIES
        • WEIGHTED_AVERAGE
      • Array
        • ARRAY_DISTINCT
        • ARRAY_JOIN
        • ARRAY_MAX
        • ARRAY_MIN
        • ARRAY_SORT
        • ARRAY_SORT_DESC
        • ARRAY_SUM
        • COUNT_VALUES_IF
        • COUNT_VALUES
        • ELEMENT_AT
        • FIRST_ELEMENT
        • LAST_ELEMENT
        • VALUE_INDEX_IN_ARRAY
        • VALUE_INDEX_IN_ROW
      • Comparison
        • GREATEST
        • LEAST
      • Conditional
        • COALESCE
        • IF_ELSE
        • NULL_IF
      • Date & Time
        • ADD_TIME_ZONE_OFFSET
        • DATE
        • DATE_ADD
        • DATE_DIFF
        • DATE_TRUNC
        • DAY
        • DAY_OF_WEEK
        • DAY_OF_YEAR
        • EXTRACT_TIMESTAMP
        • EXTRACT
        • FORMAT_DATETIME
        • FROM_ISO8601_DATE
        • FROM_UNIXTIME
        • HOUR
        • MILLISECOND
        • MINUTE
        • MONTH
        • QUARTER
        • RUN_END_TIME
        • RUN_START_TIME
        • SECOND
        • SUBTRACT_TIME_ZONE_OFFSET
        • TO_UNIX_EPOCH_MILLIS
        • TO_UNIX_EPOCH_SECONDS
        • TO_UNIXTIME
        • WEEK
        • YEAR_OF_WEEK
        • YEAR
      • Filter
        • IS_DUPLICATE
        • NOT
      • Interval
        • PARSE_DURATION
      • Mathematical
        • ABS
        • CBRT
        • CEIL
        • CEILING
        • DEGREES
        • EXP
        • FLOOR
        • GET_SHARD_NUMBER
        • LN
        • LOG
        • LOG2
        • LOG10
        • MOD
        • MODULO
        • POW
        • POWER
        • RADIANS
        • RAND
        • RANDOM
        • RECIPROCAL
        • ROUND
        • SIGN
        • SORT_VALUES
        • SQRT
        • TRUNCATE
      • Regular Expressions
        • REGEXP_EXTRACT
        • REGEXP_EXTRACT_ALL
        • REGEXP_LIKE
        • REGEX_MATCH_POSITION
        • REGEX_NAMED_GROUPS
        • REGEXP_REPLACE
      • Spatial
        • ST_DISTANCE
        • ST_WGS84_DISTANCE
        • WKT_SPATIAL_CONTAINS
        • WKT_SPATIAL_INTERSECT
      • String
        • BASE64_DECODE
        • BASE64_TO_HEX
        • BYTES_SUBSTRING
        • CONCAT
        • DATE
        • JOIN_ARRAYS
        • LENGTH
        • LOWER
        • LPAD
        • LTRIM
        • MD5
        • PARSE_DATETIME
        • REPLACE
        • REVERSE
        • RPAD
        • RTRIM
        • SHA1
        • SHA3_512
        • SHA256
        • SHA512
        • SORT_VALUES
        • SPLIT
        • SPLIT_TO_RECORD
        • STRING_FORMAT
        • STRIP_MARGIN
        • STRIP_PREFIX
        • STRIP_SUFFIX
        • STRPOS
        • SUBSTR
        • SUBSTRING
        • TRANSLATE
        • TRIM_CHARS
        • TRIM
        • UPPER
        • UUID_GENERATOR
        • XX_HASH
      • Structural
        • FROM_KEY_VALUE
        • GET_RANGE
        • JOIN_ALL_BY_KEY
        • JSON_PATH
        • JSON_TO_RECORD
        • MAP_WITH_INDEX
        • QUERY_STRING_TO_RECORD
        • RECORD_TO_JSON
        • SORT_BY
        • TO_ARRAY
        • ZIP_WITH_INDEX
        • ZIP
      • Trigonometric
        • COS
        • SIN
        • TAN
        • TANH
      • Type Conversion
        • CAST
        • CHR
        • DECIMAL_TO_HEX
        • HEX_TO_DECIMAL
        • TO_BIGINT
        • TO_DOUBLE
        • TO_STRING
      • URL
        • TOP_PRIVATE_DOMAIN
        • URL_DECODE
        • URL_ENCODE
        • URL_PARSER
    • Operators
      • Comparison
      • Conditional
        • CASE
      • Logical
      • Mathematical
      • String
  • MONITORING
    • Clusters
    • Datasets
      • Ingested Data
        • Column
      • Lineage
      • Data Violations
      • Statistics
      • Maintenance
        • Compactions
        • Expire Snapshots
        • Orphan Files
      • Columns
      • Partitions
      • Properties
    • Job Status
      • Stream & File Sources
        • Monitoring
        • Graphs
        • Lineage
        • Settings
      • CDC Sources
        • Monitoring
        • Replication Settings
        • Job Settings
    • System Catalog
      • Information Schema
        • Clusters
        • Columns
        • Connections
        • Jobs
        • Mirrors
        • Tables
        • Users
        • Views
      • Insights
        • job_output_column_stats
        • dataset_column_stats
      • Monitoring
        • CDC Status
        • Clusters
        • Expectations
        • Jobs
        • Partition Statistics
        • Recent Compactions
        • Running Queries
        • Table Statistics
      • Task Executions Table
  • GENERAL
    • Common SQL Syntax
    • View Entity Syntax
    • Keyboard Shortcuts
Powered by GitBook
On this page
  • Syntax
  • Connection options
  • Minimum example
  • Full example
  1. SQL COMMANDS
  2. Connections
  3. CREATE CONNECTION

Apache Kafka

Last updated 11 months ago

To ingest data from your Apache Kafka topic into a table within Upsolver, you must first create a connection that provides the appropriate credentials to access your topic.

Syntax

CREATE KAFKA CONNECTION <connection_identifier> 
    HOST[S] = { '<host>' | ('<host>' [, ...]) }
    [ CONSUMER_PROPERTIES = '<properties>' ] 
    [ VERSION = { CURRENT | LEGACY } ]  
    [ REQUIRE_STATIC_IP = { TRUE | FALSE } ]
    [ SSL = { TRUE | FALSE } ]
    [ TOPIC_DISPLAY_FILTER[S] = { '<topic_name>' | ('<topic_name>' [, ...]) } ]
    [ SASL_USERNAME = '<username/api_key>' ]
    [ SASL_PASSWORD = '<password/api_secret>' ]    
    [ COMMENT = '<comment>' ]

Jump to

Connection options

HOST[S]

Type: text | list

A single host or a list of Kafka hosts in the format of hostname:port.

CONSUMER_PROPERTIES — editable

Type: text_area

(Optional) Extra properties to configure for the consumer.

VERSION

Values: { CURRENT | LEGACY }

Default: CURRENT

(Optional) Legacy is for anything before 0.10.

REQUIRE_STATIC_IP — editable

Type: Boolean

Default: true

(Optional) With Upsolver clusters, you can configure how many elastic IPs it should allocate and use within that cluster.

If the cluster running the job has at least one elastic IP set and REQUIRE_STATIC_IP is enabled, then the job runs on a server that has an elastic IP associated with it

SSL — editable

Type: Boolean

Default: false

(Optional) If enabled, SSL is used to connect.

Please contact Upsolver to ensure your CA certificate is supported.

TOPIC_DISPLAY_FILTER[S] — editable

Type: text | list

(Optional) A single topic or the list of topics to show. If left empty, all topics are visible.

SASL_USERNAME

Type: text

(Optional) Configure the SASL username to be used to authenticate with Kafka.

SASL_PASSWORD

Type: text

(Optional) Configure the SASL password to be used to authenticate with Kafka.

security.protocol = SASL_SSL
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule   
    required username = "API_KEY"   password = "SECRET";
ssl.endpoint.identification.algorithm = https
sasl.mechanism = PLAIN

COMMENT — editable

Type: text

(Optional) A description or comment regarding this connection.

Minimum example

CREATE KAFKA CONNECTION my_kafka_connection
    HOSTS = ('foo:9092', 'bar:9092');

Full example

CREATE KAFKA CONNECTION my_kafka_connection
    HOSTS = ('foo:9092', 'bar:9092')
    SASL_USERNAME = 'API_KEY'
    SASL_PASSWORD = 'SECRET'
    VERSION = CURRENT
    REQUIRE_STATIC_IP = true
    SSL = false
    TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
    COMMENT = 'My new Kafka connection';

See:

When this setting is enabled along with the following configuration will get added to the consumer/producer properties of the client created:

For other authentication settings, you may configure the directly.

Consumer configurations
HOST[S]
CONSUMER_PROPERTIES
VERSION
REQUIRE_STATIC_IP
SSL
TOPIC_DISPLAY_FILTER[S]
SASL_USERNAME
SASL_PASSWORD
COMMENT
SASL_USERNAME
CONSUMER_PROPERTIES