LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
Quickstarts
Quickstarts
  • Quickstarts
  • DATA INGESTION WIZARD
    • Using the Wizard
      • Source Set-up
        • Amazon Kinesis
        • Amazon S3
        • Apache Kafka
        • Confluent Cloud
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • PostgreSQL
      • Target Set-up
        • Amazon Redshift
        • AWS Glue Data Catalog
        • ClickHouse
        • Polaris Catalog
        • Snowflake
      • Job Configuration
        • Job Configuration
        • Job Configuration for CDC
      • Review and Run Job
  • CONNECTORS
    • Connectors
      • Amazon Kinesis
      • Amazon Redshift
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • ClickHouse
      • Confluent Cloud
      • Elasticsearch
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • Polaris Catalog
      • PostgreSQL
      • Snowflake
  • JOBS
    • Ingestion
      • Job Basics
        • Ingest to a Staging Table
        • Output to a Target Table
      • Stream and File Sources
        • Amazon Kinesis
        • Amazon S3
        • Apache Kafka
        • Confluent Kafka
      • CDC Sources
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • PostgreSQL
    • Transformation
      • Updating Data
        • Upsert Data to a Target Table
        • Delete Data from a Target Table
        • Aggregate and Output Data
        • Join Two Data Streams
      • Data Targets
        • Output to Amazon Athena
        • Output to Amazon Redshift
        • Output to Amazon S3
        • Output to Elasticsearch
        • Output to Snowflake
  • APACHE ICEBERG
    • Optimize Your Iceberg Tables
    • Install the Iceberg Table Analyzer
Powered by GitBook
On this page
  • Create a job that reads from Confluent
  • Ingest to the data lake
  • Ingest directly to the target
  • Job options
  • Alter a job that reads from Confluent
  • Drop a job that reads from Confluent
  1. JOBS
  2. Ingestion
  3. Stream and File Sources

Confluent Kafka

This page describes how to ingest your data from Confluent Kafka.

Last updated 11 months ago

Prerequisites

Ensure that you have a connection with the correct permissions to read from your cluster.

Additionally, if you are ingesting to the data lake, you need a metastore connection that can be used to create a staging table as well as a corresponding storage connection that can be used to store your table's underlying files.

Create a job that reads from Confluent

You can create a job to ingest your data from Confluent into a staging table in the data lake or ingest directly into your target.

Jump to

Ingest to the data lake

After completing the prerequisites, you can create your staging tables. The example below creates a table without defining columns or data types, as these will be inferred automatically by Upsolver, though you can define columns if required:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data()
    PARTITIONED BY $event_date;

Upsolver recommends partitioning by the system column $event_date or another date column within the data in order to optimize your query performance.

Next, you can create an ingestion job as follows:

CREATE SYNC JOB "extract data from kafka"
    COMMENT = 'Load raw orders data from Kafka topic to a staging table'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

Note that multiple ingestion jobs can write to the same table, resulting in a final table that contains a UNION ALL of all data copied into that table. This means that any duplicate rows that are written are not removed and the column list may expand if new columns are detected.

This may not be your intended behavior, so ensure you are writing to the correct table before running your job.

The example above only uses a small subset of all job options available when reading from Confluent. Depending on your use case, there may be other options you want to configure. For instance, you may want to specify the compression of your source data rather than have it be auto-detected.

Ingest directly to the target

Directly ingesting your data enables you to copy your data straight into the target system, bypassing the need for a staging table. The syntax and job options are identical to ingesting into a staging table, however, the target connector differs:

CREATE SYNC JOB ingest_kafka_to_snowflake
    COMMENT = 'Load raw orders data from Kafka topic to Snowflake'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;

Job options

Transformations can be applied to your ingestion job to correct issues, exclude columns, or mask data before it lands in the target. Furthermore, you can use expectations to define data quality rules on your data stream and take appropriate action.

Alter a job that reads from Confluent

Some job options are considered mutable, enabling you to run a SQL command to alter an existing ingestion job rather than create a new job. The job options apply equally to jobs that ingest into the data lake or directly to the target, and the syntax to alter a job is identical.

For example, take the job we created earlier:

CREATE SYNC JOB "extract data from kafka"
    COMMENT = 'Load raw orders data from Kafka topic to a staging table'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

If you want to keep the job as is, but only change the cluster that is running the job, execute the following command:

ALTER JOB my_confluent_ingestion_job 
    SET COMPUTE_CLUSTER = my_new_cluster;

Note that some options such as COMPRESSION cannot be altered once the connection has been created.

Drop a job that reads from Confluent

If you no longer need a job, you can easily drop it using the following SQL command. This applies to jobs that ingest into the data lake and directly into the target:

DROP JOB my_confluent_ingestion_job;

Learn More

To learn about the available job options, see the jobs page, which describes each option in detail and includes examples.

To check which job options are mutable, see .

Ingest to the data lake
Ingest directly to the target
Job options
Confluent
Ingestion
Confluent Kafka