Amazon Kinesis

Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM KINESIS <connection_identifier>
    [{ source_options }]
INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
 [ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Jump to

Job options

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]                
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]  
[ SKIP_VALIDATIONS = ('MISSING_STREAM') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]

Jump to

Amazon Kinesis job options:

General job options:

  • COLUMN_TRANSFORMATIONS

  • COMMENT

  • COMPUTE_CLUSTER

  • END_AT

  • EXCLUDE_COLUMNS

See also:

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Default: WRITE_INTERVAL

(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the WRITE_INTERVAL value will be used. The COMMIT_INTERVAL value must be bigger and divisible by WRITE_INTERVAL.

COMPRESSION

Values: { AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }

Default: AUTO

(Optional) The compression of the source.

CONTENT_TYPE — editable

Values: { AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }

Default: AUTO

(Optional) The file format of the content being read.

Note that AUTO only works when reading Avro, JSON, or Parquet.

To configure additional options for certain content types, see the Content Types options.

DEDUPLICATE_WITH

Values: ( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )

(Optional) You can use DEDUPLICATE_WITH to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the WINDOW value are deduplicated.

For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be 1 DAY. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.

Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.

READER_SHARDS — editable

Type: integer

Default: 1

(Optional) Determines how many readers are used in parallel to read the stream.

This number does not need to equal your number of shards in Kinesis.

A recommended value would be to increase it by 1 for every 70 MB/s sent to your stream.

RUN_PARALLELISM — editable

Type: integer

Default: 1

(Optional) The number of parser jobs to run in parallel per minute.

SKIP_ALL_VALIDATIONS

Type: Boolean

Default: false

(Optional) If data is expected to arrive in the source at a later point in time, set this value to true.

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

SKIP_VALIDATIONS

Value: MISSING_STREAM

(Optional) Use this option if data is expected to arrive in the source at a later point in time.

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

START_FROM

Values: { NOW | BEGINNING | <timestamp> }

Default: BEGINNING

(Optional) Configures the time to start ingesting data. Files dated before the specified time are ignored.

STORE_RAW_DATA

Type: Boolean

Default: false

(Optional) When true, an additional copy of the data is stored in its original format.

Source options

STREAM = '<stream_name>'

STREAM

Type: text

The stream to read from.

Examples

Ingest data into the data lake

In the following example, a new sync job is created to ingest streaming data from Kinesis into a table in the data lake. The START_FROM option defines that all data will be ingested, and the CONTENT_TYPE option informs the job that the data is stored in JSON format.

After ingesting data into the data lake, you can create a transformation job to load the data into your target.

CREATE SYNC JOB stage_kinesis_data
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KINESIS your_kinesis_connection
    STREAM = 'sample-stream'
INTO default_glue_catalog.upsolver_samples.kinesis_data_staging; 

Ingest data directly into Snowflake

An ingestion job can load data directly from Kinesis into Snowflake, bypassing the need to write additional transformation jobs to load the data into the target table.

The example below creates a job to ingest data directly into Snowflake and uses job options and expectations to perform transformations and apply data quality conditions.

The EXCLUDE_COLUMNS option tells the job to ignore the customer.email column, as the email value has been hashed into a new column named hashed_email using the COLUMN_TRANSFORMATIONS option. To protect personal information, customer.email is masked before it reaches the target table in Snowflake, and therefore the raw column data should be excluded.

Furthermore, duplicate events are managed using the DEDUPLICATE_WITH option. Any duplicate events that arrive within a four-hour interval, are deduplicated on orderid. The target table is expecting all orders to arrive with a value for customer.address.address1, so a check is added to flag how many rows are loaded without this value. A further warning has been added to count the number of rows that do not have a taxrate matching the expected value of 0.12.

CREATE SYNC JOB ingest_kinesis_to_snowflake
    COMMENT = 'Ingest orders to Snowflake'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
    EXCLUDE_COLUMNS = ('customer.email') 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))       
    DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 4 hours)
    COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection 
    STREAM = 'orders'  
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
    WITH EXPECTATION exp_add1_not_null EXPECT customer.address.address1 IS NOT NULL ON VIOLATION WARN
    WITH EXPECTATION exp_taxrate EXPECT taxrate = 0.12 ON VIOLATION WARN;

Last updated