LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
Reference
Reference
  • Reference
  • ✨Learning Paths
    • Amazon Kinesis
    • Amazon S3
    • Apache Kafka
    • Confluent Cloud
    • Microsoft SQL Server
    • MongoDB
    • MySQL
    • PostgreSQL
  • SQL COMMANDS
    • Clusters
      • CREATE CLUSTER
      • ALTER CLUSTER
      • ROLL CLUSTER
      • STOP/START CLUSTER
      • DROP CLUSTER
    • Connections
      • CREATE CONNECTION
        • Amazon Kinesis
        • Amazon Redshift
        • Amazon S3
        • Apache Kafka
        • AWS Glue Data Catalog
        • ClickHouse
        • Confluent Cloud
        • Elasticsearch
        • Hive Metastore
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • Polaris Catalog
        • Iceberg REST catalogs
        • PostgreSQL
        • Snowflake
      • ALTER CONNECTION
      • DROP CONNECTION
      • CDC Connections with SSL
    • Jobs
      • CREATE JOB
        • Ingestion
          • Amazon Kinesis
          • Amazon S3
          • Apache Kafka
          • Confluent Kafka
          • Content Types
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Replication
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Transformation
          • INSERT
            • MAP_COLUMNS_BY_NAME
            • UNNEST
          • MERGE
          • SELECT
          • Job Options
            • Amazon Redshift
            • Amazon S3
            • Apache Iceberg
            • ClickHouse
            • Data Lake Tables
            • Elasticsearch
            • PostgreSQL
            • Snowflake
        • Monitoring
          • Amazon CloudWatch
          • Datadog
          • Dynatrace
      • ALTER JOB
      • PAUSE / RESUME JOB
      • DROP JOB
    • Materialized Views
      • CREATE MATERIALIZED VIEW
      • ALTER MATERIALIZED VIEW
      • DROP MATERIALIZED VIEW
      • Join with a Materialized View
    • Tables
      • CREATE TABLE
      • ALTER TABLE
      • DROP TABLE
    • Iceberg Tables
      • Upsolver Managed Tables
        • CREATE ICEBERG TABLE
        • ALTER ICEBERG TABLE
        • DROP ICEBERG TABLE
        • OPTIMIZE ICEBERG TABLE
      • External Iceberg Tables
        • CREATE EXTERNAL ICEBERG TABLE
        • DROP EXTERNAL ICEBERG TABLE
      • Mirror Iceberg Tables
        • CREATE MIRROR
        • ALTER MIRROR
        • PAUSE / RESUME MIRROR
        • DROP MIRROR
  • FUNCTIONS & OPERATORS
    • Data Types
    • Functions
      • Aggregate
        • APPROX_COUNT_DISTINCT
        • APPROX_COUNT_DISTINCT_EACH
        • AVG
        • AVG_EACH
        • AVG_TIME_SERIES
        • COLLECT_SET
        • COLLECT_SET_EACH
        • COUNT
        • COUNT(*)
        • COUNT(DISTINCT ...)
        • COUNT_EACH
        • COUNT_IF
        • DYNAMIC_SESSIONS
        • FIRST
        • FIRST_ARRAY
        • FIRST_EACH
        • FIRST_TIME_SERIES
        • LAST
        • LAST_ARRAY
        • LAST_EACH
        • LAST_K
        • LAST_K_EACH
        • LAST_TIME_SERIES
        • MAX
        • MAX_BY
        • MAX_EACH
        • MAX_TIME_SERIES
        • MIN
        • MIN_BY
        • MIN_EACH
        • MIN_TIME_SERIES
        • SESSION_COUNT
        • STD_DEV
        • STD_DEV_EACH
        • STRING_MAX
        • STRING_MAX_EACH
        • STRING_MIN_EACH
        • SUM
        • SUM_EACH
        • SUM_TIME_SERIES
        • WEIGHTED_AVERAGE
      • Array
        • ARRAY_DISTINCT
        • ARRAY_JOIN
        • ARRAY_MAX
        • ARRAY_MIN
        • ARRAY_SORT
        • ARRAY_SORT_DESC
        • ARRAY_SUM
        • COUNT_VALUES_IF
        • COUNT_VALUES
        • ELEMENT_AT
        • FIRST_ELEMENT
        • LAST_ELEMENT
        • VALUE_INDEX_IN_ARRAY
        • VALUE_INDEX_IN_ROW
      • Comparison
        • GREATEST
        • LEAST
      • Conditional
        • COALESCE
        • IF_ELSE
        • NULL_IF
      • Date & Time
        • ADD_TIME_ZONE_OFFSET
        • DATE
        • DATE_ADD
        • DATE_DIFF
        • DATE_TRUNC
        • DAY
        • DAY_OF_WEEK
        • DAY_OF_YEAR
        • EXTRACT_TIMESTAMP
        • EXTRACT
        • FORMAT_DATETIME
        • FROM_ISO8601_DATE
        • FROM_UNIXTIME
        • HOUR
        • MILLISECOND
        • MINUTE
        • MONTH
        • QUARTER
        • RUN_END_TIME
        • RUN_START_TIME
        • SECOND
        • SUBTRACT_TIME_ZONE_OFFSET
        • TO_UNIX_EPOCH_MILLIS
        • TO_UNIX_EPOCH_SECONDS
        • TO_UNIXTIME
        • WEEK
        • YEAR_OF_WEEK
        • YEAR
      • Filter
        • IS_DUPLICATE
        • NOT
      • Interval
        • PARSE_DURATION
      • Mathematical
        • ABS
        • CBRT
        • CEIL
        • CEILING
        • DEGREES
        • EXP
        • FLOOR
        • GET_SHARD_NUMBER
        • LN
        • LOG
        • LOG2
        • LOG10
        • MOD
        • MODULO
        • POW
        • POWER
        • RADIANS
        • RAND
        • RANDOM
        • RECIPROCAL
        • ROUND
        • SIGN
        • SORT_VALUES
        • SQRT
        • TRUNCATE
      • Regular Expressions
        • REGEXP_EXTRACT
        • REGEXP_EXTRACT_ALL
        • REGEXP_LIKE
        • REGEX_MATCH_POSITION
        • REGEX_NAMED_GROUPS
        • REGEXP_REPLACE
      • Spatial
        • ST_DISTANCE
        • ST_WGS84_DISTANCE
        • WKT_SPATIAL_CONTAINS
        • WKT_SPATIAL_INTERSECT
      • String
        • BASE64_DECODE
        • BASE64_TO_HEX
        • BYTES_SUBSTRING
        • CONCAT
        • DATE
        • JOIN_ARRAYS
        • LENGTH
        • LOWER
        • LPAD
        • LTRIM
        • MD5
        • PARSE_DATETIME
        • REPLACE
        • REVERSE
        • RPAD
        • RTRIM
        • SHA1
        • SHA3_512
        • SHA256
        • SHA512
        • SORT_VALUES
        • SPLIT
        • SPLIT_TO_RECORD
        • STRING_FORMAT
        • STRIP_MARGIN
        • STRIP_PREFIX
        • STRIP_SUFFIX
        • STRPOS
        • SUBSTR
        • SUBSTRING
        • TRANSLATE
        • TRIM_CHARS
        • TRIM
        • UPPER
        • UUID_GENERATOR
        • XX_HASH
      • Structural
        • FROM_KEY_VALUE
        • GET_RANGE
        • JOIN_ALL_BY_KEY
        • JSON_PATH
        • JSON_TO_RECORD
        • MAP_WITH_INDEX
        • QUERY_STRING_TO_RECORD
        • RECORD_TO_JSON
        • SORT_BY
        • TO_ARRAY
        • ZIP_WITH_INDEX
        • ZIP
      • Trigonometric
        • COS
        • SIN
        • TAN
        • TANH
      • Type Conversion
        • CAST
        • CHR
        • DECIMAL_TO_HEX
        • HEX_TO_DECIMAL
        • TO_BIGINT
        • TO_DOUBLE
        • TO_STRING
      • URL
        • TOP_PRIVATE_DOMAIN
        • URL_DECODE
        • URL_ENCODE
        • URL_PARSER
    • Operators
      • Comparison
      • Conditional
        • CASE
      • Logical
      • Mathematical
      • String
  • MONITORING
    • Clusters
    • Datasets
      • Ingested Data
        • Column
      • Lineage
      • Data Violations
      • Statistics
      • Maintenance
        • Compactions
        • Expire Snapshots
        • Orphan Files
      • Columns
      • Partitions
      • Properties
    • Job Status
      • Stream & File Sources
        • Monitoring
        • Graphs
        • Lineage
        • Settings
      • CDC Sources
        • Monitoring
        • Replication Settings
        • Job Settings
    • System Catalog
      • Information Schema
        • Clusters
        • Columns
        • Connections
        • Jobs
        • Mirrors
        • Tables
        • Users
        • Views
      • Insights
        • job_output_column_stats
        • dataset_column_stats
      • Monitoring
        • CDC Status
        • Clusters
        • Expectations
        • Jobs
        • Partition Statistics
        • Recent Compactions
        • Running Queries
        • Table Statistics
      • Task Executions Table
  • GENERAL
    • Common SQL Syntax
    • View Entity Syntax
    • Keyboard Shortcuts
Powered by GitBook
On this page
  • Syntax
  • Job options
  • Source options
  • Examples
  • Ingest data into the data lake
  • Ingest data directly into Snowflake
  1. SQL COMMANDS
  2. Jobs
  3. CREATE JOB
  4. Ingestion

Amazon Kinesis

Last updated 11 months ago

Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM KINESIS <connection_identifier>
    [{ source_options }]
INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
 [ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Jump to

Job options

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]                
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]  
[ SKIP_VALIDATIONS = ('MISSING_STREAM') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]

Jump to

Amazon Kinesis job options:

General job options:

See also:

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the WRITE_INTERVAL value will be used. The COMMIT_INTERVAL value must be bigger and divisible by WRITE_INTERVAL.

COMPRESSION

Values: { AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }

Default: AUTO

(Optional) The compression of the source.

CONTENT_TYPE — editable

Values: { AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }

Default: AUTO

(Optional) The file format of the content being read.

Note that AUTO only works when reading Avro, JSON, or Parquet.

DEDUPLICATE_WITH

Values: ( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )

(Optional) You can use DEDUPLICATE_WITH to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the WINDOW value are deduplicated.

For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be 1 DAY. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.

Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.

READER_SHARDS — editable

Type: integer

Default: 1

(Optional) Determines how many readers are used in parallel to read the stream.

This number does not need to equal your number of shards in Kinesis.

A recommended value would be to increase it by 1 for every 70 MB/s sent to your stream.

RUN_PARALLELISM — editable

Type: integer

Default: 1

(Optional) The number of parser jobs to run in parallel per minute.

SKIP_ALL_VALIDATIONS

Type: Boolean

Default: false

(Optional) If data is expected to arrive in the source at a later point in time, set this value to true.

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

SKIP_VALIDATIONS

Value: MISSING_STREAM

(Optional) Use this option if data is expected to arrive in the source at a later point in time.

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

START_FROM

Values: { NOW | BEGINNING | <timestamp> }

Default: BEGINNING

(Optional) Configures the time to start ingesting data. Files dated before the specified time are ignored.

STORE_RAW_DATA

Type: Boolean

Default: false

(Optional) When true, an additional copy of the data is stored in its original format.

Source options

STREAM = '<stream_name>'

STREAM

Type: text

The stream to read from.

Examples

Ingest data into the data lake

In the following example, a new sync job is created to ingest streaming data from Kinesis into a table in the data lake. The START_FROM option defines that all data will be ingested, and the CONTENT_TYPE option informs the job that the data is stored in JSON format.

CREATE SYNC JOB stage_kinesis_data
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KINESIS your_kinesis_connection
    STREAM = 'sample-stream'
INTO default_glue_catalog.upsolver_samples.kinesis_data_staging; 

Ingest data directly into Snowflake

An ingestion job can load data directly from Kinesis into Snowflake, bypassing the need to write additional transformation jobs to load the data into the target table.

The example below creates a job to ingest data directly into Snowflake and uses job options and expectations to perform transformations and apply data quality conditions.

The EXCLUDE_COLUMNS option tells the job to ignore the customer.email column, as the email value has been hashed into a new column named hashed_email using the COLUMN_TRANSFORMATIONS option. To protect personal information, customer.email is masked before it reaches the target table in Snowflake, and therefore the raw column data should be excluded.

Furthermore, duplicate events are managed using the DEDUPLICATE_WITH option. Any duplicate events that arrive within a four-hour interval, are deduplicated on orderid. The target table is expecting all orders to arrive with a value for customer.address.address1, so a check is added to flag how many rows are loaded without this value. A further warning has been added to count the number of rows that do not have a taxrate matching the expected value of 0.12.

CREATE SYNC JOB ingest_kinesis_to_snowflake
    COMMENT = 'Ingest orders to Snowflake'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
    EXCLUDE_COLUMNS = ('customer.email') 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))       
    DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 4 hours)
    COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection 
    STREAM = 'orders'  
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
    WITH EXPECTATION exp_add1_not_null 
        EXPECT customer.address.address1 IS NOT NULL 
        ON VIOLATION WARN
    WITH EXPECTATION exp_taxrate 
        EXPECT taxrate = 0.12 
        ON VIOLATION WARN;

Default:

To configure additional options for certain content types, see the options.

After ingesting data into the data lake, you can create a job to load the data into your target.

Content Types
transformation
Job options
Source options
COMMIT_INTERVAL
COMPRESSION
CONTENT_TYPE
DEDUPLICATE_WITH
READER_SHARDS
RUN_PARALLELISM
SKIP_ALL_VALIDATIONS
SKIP_VALIDATIONS
START_FROM
STORE_RAW_DATA
WRITE_INTERVAL
COLUMN_TRANSFORMATIONS
COMMENT
COMPUTE_CLUSTER
END_AT
EXCLUDE_COLUMNS
WITH EXPECTATION