Apache Kafka

Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
    AS COPY FROM KAFKA <connection_identifier>
         [{ source_options }]
    INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> } ;
 [ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ]; 

Jump to

Job options

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CONSUMER_PROPERTIES = '<properties>' ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]    
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]
[ SKIP_VALIDATIONS = ('MISSING_TOPIC') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]

Jump to

Apache Kafka job options:

General job options:

See also:

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Default: WRITE_INTERVAL

(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the WRITE_INTERVAL value will be used. The COMMIT_INTERVAL value must be bigger and divisible by WRITE_INTERVAL.

COMPRESSION

Values: { AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }

Default: AUTO

(Optional) The compression format of the source.

CONSUMER_PROPERTIES — editable

Type: text_area

(Optional) Additional properties to use when configuring the consumer. This overrides any settings in the Apache Kafka connection.

CONTENT_TYPE — editable

Values: { AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }

Default: AUTO

(Optional) The file format of the content being read.

Note that AUTO only works when reading Avro, JSON, or Parquet, and that Upsolver does not currently support Protobuf.

To configure additional options for particular content types, see the Content Types options.

DEDUPLICATE_WITH

Values: ( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )

(Optional) You can use DEDUPLICATE_WITH to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the WINDOW value are deduplicated.

For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be 1 DAY. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.

Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.

READER_SHARDS — editable

Type: integer

Default: 1

(Optional) Determines how many readers are used in parallel to read the stream.

This number does not need to equal your number of partitions in Apache Kafka.

A recommended value would be to increase it by 1 for every 70 MB/s sent to your topic.

RUN_PARALLELISM — editable

Type: integer

Default: 1

(Optional) The number of parser jobs to run in parallel per minute.

SKIP_ALL_VALIDATIONS

Type: Boolean

Default: false

(Optional) If data is expected to arrive in the source at a later point in time, set this value to true.

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

SKIP_VALIDATIONS

Value: MISSING_TOPIC

(Optional) Use this option if data is expected to arrive in the source at a later point in time.

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

START_FROM

Values: { NOW | BEGINNING }

Default: BEGINNING

(Optional) Configures the time from which to start ingesting data. Files before the specified time are ignored.

STORE_RAW_DATA

Type: Boolean

Default: false

(Optional) When true, an additional copy of the data is stored in its original format

Source options

TOPIC = '<topic_name>'

TOPIC

Type: text

The topic to read from.

Examples

Ingest data into the data lake

The following example creates a SYNC job that copies data from Kafka to a staging table in the data lake. The START_FROM option instructs the job to copy all data using BEGINNING, and CONTENT_TYPE specifies that the data is in JSON format.

CREATE SYNC JOB "extract data from kafka"
    COMMENT = 'Load raw orders data from Kafka topic to a staging table'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

Ingest data directly into Snowflake

An ingestion job can load data directly into your target Snowflake database, performing transformations and ensuring data quality in the process. Direct ingestion removes the need for intermediate staging tables while the available job options provide data observability.

The example below creates a job to ingest the data stream directly from Kafka into Snowflake. The EXCLUDE_COLUMNS option instructs the job to ignore the customer.firstname, customer.lastname, and customer.email columns, and they are not ingested into Snowflake. However, using the COLUMN_TRANSFORMATIONS option, the customer.email column is masked using the MD5 function into the new column, hashed_email. This enables the target system to know that an email address exists without personally identifying the customer.

To guarantee the uniqueness of rows in the target table, the DEDUPLICATE_WITH option checks for duplicate rows, based on orderid, that arrive within two hours of each other. Any duplicates are discarded from the stream and don't pollute the destination. Furthermore, the expectation exp_orderid checks for rows that don't have an orderid. If an orderid is NULL, the affected row is dropped from the ingestion stream.

The target table expects the shipping country to be USA for all orders. This is monitored using the exp_shipping expectation, which detects and warns of any rows that have a shipping country that does not match USA. The violation action is set to WARN if this condition is violated, so rows are loaded into the target even if the warning is triggered. Dropped rows, along with rows that trigger a warning, are counted and recorded in the system tables.

CREATE SYNC JOB ingest_kafka_to_snowflake
   COMMENT = 'Load orders into Snowflake'
   CONTENT_TYPE = JSON
   COMPUTE_CLUSTER = "Default Compute (Free)"
   EXCLUDE_COLUMNS = ('customer.firstname', 'customer.lastname', 'customer.email') 
   DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 2 hours)
   COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email)) 
   COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KAFKA upsolver_kafka_samples 
   TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
  WITH EXPECTATION exp_orderid 
     EXPECT orderid IS NOT NULL 
     ON VIOLATION DROP
  WITH EXPECTATION exp_shipping 
     EXPECT customer.address.country = 'USA' 
     ON VIOLATION WARN;

Last updated