Comment on page
Confluent Kafka
This page describes the job options for ingesting data from Confluent.
CREATE [SYNC] JOB <job_name>
[{ job_options }]
AS COPY FROM CONFLUENT
<connection_identifier>
[{ source_options }]
INTO { <table_identifier>
| SNOWFLAKE <connection_name>.<schema_name>.<table_name> } ;
[ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPRESSION = { AUTO
| GZIP
| SNAPPY
| LZO
| NONE
| SNAPPY_UNFRAMED
| KCL } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CONSUMER_PROPERTIES = '<properties>' ]
[ CONTENT_TYPE = { AUTO
| CSV
| JSON
| PARQUET
| TSV
| AVRO
| AVRO_SCHEMA_REGISTRY
| FIXED_WIDTH
| REGEX
| SPLIT_LINES
| ORC
| XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...) | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]
[ SKIP_VALIDATIONS = ('MISSING_TOPIC') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]
Confluent job options:
General job options:
See also:
Type:
<integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default:
1 MINUTE
(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.
Values:
{ AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }
Default:
AUTO
(Optional) The compression format of the source.
Type:
text_area
(Optional) Additional properties to use when configuring the consumer. This overrides any settings in the Confluent connection.
Values:
{ AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }
Default:
AUTO
(Optional) The file format of the content being read.
Note that
AUTO
only works when reading Avro, JSON, or Parquet.Values:
( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )
(Optional) You can use
DEDUPLICATE_WITH
to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the WINDOW
value are deduplicated.For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be
1 DAY
. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.
Type:
integer
Default:
1
(Optional) Determines how many readers are used in parallel to read the stream.
This number does not need to equal your number of partitions in Confluent.
A recommended value would be to increase it by
1
for every 70 MB/s sent to your topic.Type:
integer
Default:
1
(Optional) The number of parser jobs to run in parallel per minute.
Type:
Boolean
Default:
false
(Optional) If data is expected to arrive in the source at a later point in time, set this value to
true
.This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.
Value:
MISSING_TOPIC
(Optional) Use this option if data is expected to arrive in the source at a later point in time.
This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.
Values:
{ NOW | BEGINNING }
Default:
BEGINNING
(Optional) Configures the time from which to start ingesting data. Files before the specified time are ignored.
Type:
Boolean
Default:
false
(Optional) When
true
, an additional copy of the data is stored in its original format.TOPIC = '<topic_name>'
Type:
text
The topic to read from.
The following example creates a
SYNC
job that copies data from Confluent to a staging table in the data lake. The START_FROM
option instructs the job to copy all data using BEGINNING
, and CONTENT_TYPE
specifies that the data is in JSON
format.CREATE SYNC JOB "extract data from kafka"
COMMENT = 'Load raw orders data from Kafka topic to a staging table'
START_FROM = BEGINNING
CONTENT_TYPE = JSON
AS COPY FROM KAFKA upsolver_kafka_samples
TOPIC = 'orders'
INTO default_glue_catalog.upsolver_samples.orders_raw_data;
Create a SYNC job to ingest data from Confluent to a staging table in the data lake.
An ingestion job can load data directly into your target Snowflake database, performing transformations and ensuring data quality in the process. Direct ingestion removes the need for intermediate staging tables while the available job options provide data observability.
The example below creates a job to ingest the data stream directly from Confluent into Snowflake. The
EXCLUDE_COLUMNS
option instructs the job to ignore the customer.firstname, customer.lastname, and customer.email columns, and they are not ingested into Snowflake. However, using the COLUMN_TRANSFORMATIONS
option, the customer.email column is masked using the MD5
function into the new column, hashed_email. This enables the target system to know that an email address exists without personally identifying the customer.To guarantee the uniqueness of rows in the target table, the
DEDUPLICATE_WITH
option checks for duplicate rows, based on orderid, that arrive within two hours of each other. Any duplicates are discarded from the stream and don't pollute the destination. Furthermore, the expectation exp_orderid checks for rows that don't have an orderid. If an orderid is NULL
, the affected row is dropped from the ingestion stream.The target table expects the shipping country to be USA for all orders. This is monitored using the exp_shipping expectation, which detects and warns of any rows that have a shipping country that does not match USA. The violation action is set to
WARN
if this condition is violated, so rows are loaded into the target even if the warning is triggered. Dropped rows, along with rows that trigger a warning, are counted and recorded in the system tables.CREATE SYNC JOB ingest_kafka_to_snowflake
COMMENT = 'Load orders into Snowflake'
CONTENT_TYPE = JSON
COMPUTE_CLUSTER = "Default Compute (Free)"
EXCLUDE_COLUMNS = ('customer.firstname', 'customer.lastname', 'customer.email')
DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 2 hours)
COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))
COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KAFKA upsolver_kafka_samples TOPIC = 'orders'
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
WITH EXPECTATION exp_orderid EXPECT orderid IS NOT NULL ON VIOLATION DROP
WITH EXPECTATION exp_shipping EXPECT customer.address.country = 'USA' ON VIOLATION WARN;
Ingest data directly from Confluent to Snowflake, perform transformations, and apply expectations
Last modified 1mo ago