Confluent Kafka

Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM CONFLUENT <connection_identifier>
    [{ source_options }]
INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> } ;
 [ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ]; 

Jump to

Job options

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CONSUMER_PROPERTIES = '<properties>' ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]    
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]
[ SKIP_VALIDATIONS = ('MISSING_TOPIC') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]

Jump to

Confluent job options:

General job options:

Default: 1

(Optional) Determines how many readers are used in parallel to read the stream.

This number does not need to equal your number of partitions in Confluent.

A recommended value would be to increase it by 1 for every 70 MB/s sent to your topic.

RUN_PARALLELISM — editable

Type: integer

Default: 1

(Optional) The number of parser jobs to run in parallel per minute.

SKIP_ALL_VALIDATIONS

Type: Boolean

Default: false

(Optional) If data is expected to arrive in the source at a later point in time, set this value to true.

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

SKIP_VALIDATIONS

Value: MISSING_TOPIC

(Optional) Use this option if data is expected to arrive in the source at a later point in time.

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

START_FROM

Values: { NOW | BEGINNING }

Default: BEGINNING

(Optional) Configures the time from which to start ingesting data. Files before the specified time are ignored.

STORE_RAW_DATA

Type: Boolean

Default: false

(Optional) When true, an additional copy of the data is stored in its original format.

Source options

TOPIC = '<topic_name>'

TOPIC

Type: text

The topic to read from.

Examples

Ingest data into the data lake

The following example creates a SYNC job that copies data from Confluent to a staging table in the data lake. The START_FROM option instructs the job to copy all data using BEGINNING, and CONTENT_TYPE specifies that the data is in JSON format.

CREATE SYNC JOB "extract data from Confluent Kafka"
    COMMENT = 'Load raw orders data from Kafka topic to a staging table'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM CONFLUENT upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

Ingest data directly into Snowflake

An ingestion job can load data directly into your target Snowflake database, performing transformations and ensuring data quality in the process. Direct ingestion removes the need for intermediate staging tables while the available job options provide data observability.

The example below creates a job to ingest the data stream directly from Confluent into Snowflake. The EXCLUDE_COLUMNS option instructs the job to ignore the customer.firstname, customer.lastname, and customer.email columns, and they are not ingested into Snowflake. However, using the COLUMN_TRANSFORMATIONS option, the customer.email column is masked using the MD5 function into the new column, hashed_email. This enables the target system to know that an email address exists without personally identifying the customer.

To guarantee the uniqueness of rows in the target table, the DEDUPLICATE_WITH option checks for duplicate rows, based on orderid, that arrive within two hours of each other. Any duplicates are discarded from the stream and don't pollute the destination. Furthermore, the expectation exp_orderid checks for rows that don't have an orderid. If an orderid is NULL, the affected row is dropped from the ingestion stream.

The target table expects the shipping country to be USA for all orders. This is monitored using the exp_shipping expectation, which detects and warns of any rows that have a shipping country that does not match USA. The violation action is set to WARN if this condition is violated, so rows are loaded into the target even if the warning is triggered. Dropped rows, along with rows that trigger a warning, are counted and recorded in the expectations system tables.

CREATE SYNC JOB ingest_kafka_to_snowflake
   COMMENT = 'Load orders into Snowflake'
   CONTENT_TYPE = JSON
   COMPUTE_CLUSTER = "Default Compute (Free)"
   EXCLUDE_COLUMNS = ('customer.firstname', 'customer.lastname', 'customer.email') 
   DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 2 hours)
   COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email)) 
   COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM CONFLUENT upsolver_kafka_samples 
   TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
  WITH EXPECTATION exp_orderid 
     EXPECT orderid IS NOT NULL 
     ON VIOLATION DROP
  WITH EXPECTATION exp_shipping 
     EXPECT customer.address.country = 'USA' 
     ON VIOLATION WARN;

Last updated