Amazon Kinesis
This page describes the job options for ingesting data from Amazon Kinesis.
CREATE [SYNC] JOB <job_name>
[{ job_options }]
AS COPY FROM KINESIS
<connection_identifier>
[{ source_options }]
INTO { <table_identifier>
| SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
[ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ COMPRESSION = { AUTO
| GZIP
| SNAPPY
| LZO
| NONE
| SNAPPY_UNFRAMED
| KCL } ]
[ CONTENT_TYPE = { AUTO
| CSV
| JSON
| PARQUET
| TSV
| AVRO
| AVRO_SCHEMA_REGISTRY
| FIXED_WIDTH
| REGEX
| SPLIT_LINES
| ORC
| XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...) | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]
[ SKIP_VALIDATIONS = ('MISSING_STREAM') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]
Amazon Kinesis job options:
General job options:
See also:
Type:
<integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default:
1 MINUTE
(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.
Values:
{ AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }
Default:
AUTO
(Optional) The compression of the source.
Values:
{ AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }
Default:
AUTO
(Optional) The file format of the content being read.
Note that
AUTO
only works when reading Avro, JSON, or Parquet.Values:
( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )
(Optional) You can use
DEDUPLICATE_WITH
to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the WINDOW
value are deduplicated.For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be
1 DAY
. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.
Type:
integer
Default:
1
(Optional) Determines how many readers are used in parallel to read the stream.
This number does not need to equal your number of shards in Kinesis.
A recommended value would be to increase it by 1 for every 70 MB/s sent to your stream.
Type:
integer
Default:
1
(Optional) The number of parser jobs to run in parallel per minute.
Type:
Boolean
Default:
false
(Optional) If data is expected to arrive in the source at a later point in time, set this value to
true
.This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.
Value:
MISSING_STREAM
(Optional) Use this option if data is expected to arrive in the source at a later point in time.
This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.
Values:
{ NOW | BEGINNING }
Default:
BEGINNING
(Optional) Configures the time to start ingesting data. Files dated before the specified time are ignored.
Type:
Boolean
Default:
false
(Optional) When
true
, an additional copy of the data is stored in its original format.STREAM = '<stream_name>'
Type:
text
The stream to read from.
In the following example, a new sync job is created to ingest streaming data from Kinesis into a table in the data lake. The
START_FROM
option defines that all data will be ingested, and the CONTENT_TYPE
option informs the job that the data is stored in JSON
format.After ingesting data into the data lake, you can create a transformation job to load the data into your target.
CREATE SYNC JOB stage_kinesis_data
START_FROM = BEGINNING
CONTENT_TYPE = JSON
AS COPY FROM KINESIS your_kinesis_connection
STREAM = 'sample-stream'
INTO default_glue_catalog.kinesis.kinesis_data_stg;
Ingest data from Kinesis to a data lake table.
An ingestion job can load data directly from Kinesis into Snowflake, bypassing the need to write additional transformation jobs to load the data into the target table.
The example below creates a job to ingest data directly into Snowflake and uses job options and expectations to perform transformations and apply data quality conditions.
The
EXCLUDE_COLUMNS
option tells the job to ignore the customer.email column, as the email value has been hashed into a new column named hashed_email using the COLUMN_TRANSFORMATIONS
option. To protect personal information, customer.email is masked before it reaches the target table in Snowflake, and therefore the raw column data should be excluded.Furthermore, duplicate events are managed using the
DEDUPLICATE_WITH
option. Any duplicate events that arrive within a four-hour interval, are deduplicated on orderid. The target table is expecting all orders to arrive with a value for customer.address.address1, so a check is added to flag how many rows are loaded without this value. A further warning has been added to count the number of rows that do not have a taxrate matching the expected value of 0.12.CREATE SYNC JOB ingest_kinesis_to_snowflake
COMMENT = 'Ingest orders to Snowflake'
START_FROM = BEGINNING
CONTENT_TYPE = JSON
EXCLUDE_COLUMNS = ('customer.email')
COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))
DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 4 hours)
COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection STREAM = 'orders'
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
WITH EXPECTATION exp_add1_not_null EXPECT customer.address.address1 IS NOT NULL ON VIOLATION WARN
WITH EXPECTATION exp_taxrate EXPECT taxrate = 0.12 ON VIOLATION WARN;
Create a synchronized ingestion job to load data directly from Kinesis to Snowflake.
Last modified 14d ago