Confluent Kafka
Syntax
Jump to
Job options
Jump to
Confluent job options:
General job options:
Default: 1
(Optional) Determines how many readers are used in parallel to read the stream.
This number does not need to equal your number of partitions in Confluent.
A recommended value would be to increase it by 1
for every 70 MB/s sent to your topic.
RUN_PARALLELISM
— editable
RUN_PARALLELISM
— editableType: integer
Default: 1
(Optional) The number of parser jobs to run in parallel per minute.
SKIP_ALL_VALIDATIONS
SKIP_ALL_VALIDATIONS
Type: Boolean
Default: false
(Optional) If data is expected to arrive in the source at a later point in time, set this value to true
.
This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.
SKIP_VALIDATIONS
SKIP_VALIDATIONS
Value: MISSING_TOPIC
(Optional) Use this option if data is expected to arrive in the source at a later point in time.
This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.
START_FROM
START_FROM
Values: { NOW | BEGINNING }
Default: BEGINNING
(Optional) Configures the time from which to start ingesting data. Files before the specified time are ignored.
STORE_RAW_DATA
STORE_RAW_DATA
Type: Boolean
Default: false
(Optional) When true
, an additional copy of the data is stored in its original format.
Source options
TOPIC
TOPIC
Type: text
The topic to read from.
Examples
Ingest data into the data lake
The following example creates a SYNC
job that copies data from Confluent to a staging table in the data lake. The START_FROM
option instructs the job to copy all data using BEGINNING
, and CONTENT_TYPE
specifies that the data is in JSON
format.
Ingest data directly into Snowflake
An ingestion job can load data directly into your target Snowflake database, performing transformations and ensuring data quality in the process. Direct ingestion removes the need for intermediate staging tables while the available job options provide data observability.
The example below creates a job to ingest the data stream directly from Confluent into Snowflake. The EXCLUDE_COLUMNS
option instructs the job to ignore the customer.firstname, customer.lastname, and customer.email columns, and they are not ingested into Snowflake. However, using the COLUMN_TRANSFORMATIONS
option, the customer.email column is masked using the MD5
function into the new column, hashed_email. This enables the target system to know that an email address exists without personally identifying the customer.
To guarantee the uniqueness of rows in the target table, the DEDUPLICATE_WITH
option checks for duplicate rows, based on orderid, that arrive within two hours of each other. Any duplicates are discarded from the stream and don't pollute the destination. Furthermore, the expectation exp_orderid checks for rows that don't have an orderid. If an orderid is NULL
, the affected row is dropped from the ingestion stream.
The target table expects the shipping country to be USA for all orders. This is monitored using the exp_shipping expectation, which detects and warns of any rows that have a shipping country that does not match USA. The violation action is set to WARN
if this condition is violated, so rows are loaded into the target even if the warning is triggered. Dropped rows, along with rows that trigger a warning, are counted and recorded in the system tables.
Last updated