Amazon Kinesis
Syntax
Jump to
Job options
Jump to
Amazon Kinesis job options:
General job options:
See also:
COMMIT_INTERVAL
COMMIT_INTERVAL
Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default: WRITE_INTERVAL
(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the WRITE_INTERVAL
value will be used. The COMMIT_INTERVAL
value must be bigger and divisible by WRITE_INTERVAL
.
COMPRESSION
COMPRESSION
Values: { AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }
Default: AUTO
(Optional) The compression of the source.
CONTENT_TYPE
— editable
CONTENT_TYPE
— editableValues: { AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }
Default: AUTO
(Optional) The file format of the content being read.
Note that AUTO
only works when reading Avro, JSON, or Parquet.
To configure additional options for certain content types, see the Content Types options.
DEDUPLICATE_WITH
DEDUPLICATE_WITH
Values: ( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )
(Optional) You can use DEDUPLICATE_WITH
to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the WINDOW
value are deduplicated.
For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be 1 DAY
. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.
Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.
READER_SHARDS
— editable
READER_SHARDS
— editableType: integer
Default: 1
(Optional) Determines how many readers are used in parallel to read the stream.
This number does not need to equal your number of shards in Kinesis.
A recommended value would be to increase it by 1 for every 70 MB/s sent to your stream.
RUN_PARALLELISM
— editable
RUN_PARALLELISM
— editableType: integer
Default: 1
(Optional) The number of parser jobs to run in parallel per minute.
SKIP_ALL_VALIDATIONS
SKIP_ALL_VALIDATIONS
Type: Boolean
Default: false
(Optional) If data is expected to arrive in the source at a later point in time, set this value to true
.
This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.
SKIP_VALIDATIONS
SKIP_VALIDATIONS
Value: MISSING_STREAM
(Optional) Use this option if data is expected to arrive in the source at a later point in time.
This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.
START_FROM
START_FROM
Values: { NOW | BEGINNING | <timestamp> }
Default: BEGINNING
(Optional) Configures the time to start ingesting data. Files dated before the specified time are ignored.
STORE_RAW_DATA
STORE_RAW_DATA
Type: Boolean
Default: false
(Optional) When true
, an additional copy of the data is stored in its original format.
Source options
STREAM
STREAM
Type: text
The stream to read from.
Examples
Ingest data into the data lake
In the following example, a new sync job is created to ingest streaming data from Kinesis into a table in the data lake. The START_FROM
option defines that all data will be ingested, and the CONTENT_TYPE
option informs the job that the data is stored in JSON
format.
After ingesting data into the data lake, you can create a transformation job to load the data into your target.
Ingest data directly into Snowflake
An ingestion job can load data directly from Kinesis into Snowflake, bypassing the need to write additional transformation jobs to load the data into the target table.
The example below creates a job to ingest data directly into Snowflake and uses job options and expectations to perform transformations and apply data quality conditions.
The EXCLUDE_COLUMNS
option tells the job to ignore the customer.email column, as the email value has been hashed into a new column named hashed_email using the COLUMN_TRANSFORMATIONS
option. To protect personal information, customer.email is masked before it reaches the target table in Snowflake, and therefore the raw column data should be excluded.
Furthermore, duplicate events are managed using the DEDUPLICATE_WITH
option. Any duplicate events that arrive within a four-hour interval, are deduplicated on orderid. The target table is expecting all orders to arrive with a value for customer.address.address1, so a check is added to flag how many rows are loaded without this value. A further warning has been added to count the number of rows that do not have a taxrate matching the expected value of 0.12.
Last updated