Snowflake

Job options

[ ADD_MISSING_COLUMNS = { TRUE | FALSE } ]
[ AGGREGATION_PARALLELISM = <integer> ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CREATE_TABLE_IF_MISSING = { TRUE | FALSE } ]
[ CUSTOM_INSERT_EXPRESSIONS = (<column_name> = '<expression>', ...) ]
[ CUSTOM_UPDATE_EXPRESSIONS = (<column_name> = '<expression>', ...) ]
[ END_AT = { NOW | timestamp } ]
[ KEEP_EXISTING_VALUES_WHEN_NULL = { TRUE | FALSE } ]
[ RUN_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ RUN_PARALLELISM = <integer> ]
[ START_FROM = { NOW | BEGINNING | timestamp } ]

Jump to

Snowflake job options:

General job options:

ADD_MISSING_COLUMNS

Type: Boolean

Default: false

(Optional) When true, columns that don't exist in the target table are added automatically when encountered.

When false, you cannot do SELECT * within the SELECT statement of your transformation job.

ON_COLUMN_TYPE_MISMATCH

Type: String

Default: None

Possible values: Add Column ,None

This option is applicable only if ADD_MISSING_COLUMNS=true. It determines how to handle cases where the datatype of the source data does not match the datatype of the corresponding column in the target table.

If set to Add Column, Upsolver will attempt to cast the incoming data to the original column's datatype. If the cast fails, a new column with the format <originalColumnName>_newDataType will be created, and the mismatched data will be written to this new column. For example, if the CLIENT_ID column is a number in the target table and a VARCHAR arrives in that column, a new column called CLIENT_ID_VARCHAR will be added for the string data. The original column will continue to be populated if the data can be cast successfully.

If set to None, no new columns will be added, and only valid casts will be written to the original column.

AGGREGATION_PARALLELISM — editable

Type: integer

Default: 1

(Optional) Only supported when the query contains aggregations. Formally known as "output sharding."

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Default: RUN_INTERVAL

(Optional) Defines how often the job will commit to Snowflake. If empty, the RUN_INTERVAL value will be used.

The COMMIT_INTERVAL value must be bigger and divisible by RUN_INTERVAL.

CREATE_TABLE_IF_MISSING

Type: Boolean

Default: false

(Optional) When true, the Snowflake table will be created automatically, otherwise the table should already exist.

CUSTOM_INSERT_EXPRESSIONS

Type: array[(column, expression)]

Default: ()

(Optional) Configure a list of custom expression transformations to apply to the value of each column when inserting unmatched (new) rows. Note this is only used in MERGE jobs.

Note:

  1. You can use {} as a placeholder for the SELECT statement's mapped value.

  2. The expressions will not create the target columns in the table. The columns should either exist in the target table or be mapped in the SELECT clause, for example SELECT timestamp '1970-01-01 00:00:00' as INSERT_TIME.

Example:

The following example will set the INSERT_TIME column of the target table to the value of CURRENT_TIMESTAMP() and will add 1 to the value of some_value when inserting new rows:

CREATE SYNC JOB my_merge_job
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    CUSTOM_INSERT_EXPRESSIONS = (INSERT_TIME = 'CURRENT_TIMESTAMP()', 
                                MY_VALUE = '{} + 1')
 AS MERGE INTO SNOWFLAKE snow.PUBLIC.MY_TABLE AS target       
    USING (SELECT id AS ID, some_value AS MY_VALUE
	  FROM glue.staging.my_staging_table
	  WHERE $event_time BETWEEN run_start_time() AND run_end_time()) AS source
     ON (target.ID = source.ID)
     WHEN MATCHED THEN REPLACE
     WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

CUSTOM_UPDATE_EXPRESSIONS

Type: array[(column, expression)]

Default: ()

(Optional) Configure a list of custom expression transformations to apply to the value of each column when updating matched rows. Note this is only used in MERGE jobs.

Notes:

  1. You can use {} as a placeholder for the mapped value from the SELECT statement or {current_value} as a placeholder for the existing value in the table.

  2. The expressions will not create the target columns in the table. The columns should either exist in the target table or be mapped in the SELECT clause, for example SELECT timestamp '1970-01-01 00:00:00' as UPDATE_TIME.

Example:

The following example will set the UPDATE_TIME column of the target table to the value of CURRENT_TIMESTAMP() and will add 1 to the value of some_value when updating existing rows:

CREATE SYNC JOB my_merge_job
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    CUSTOM_UPDATED_EXPRESSIONS = (UPDATE_TIME = 'CURRENT_TIMESTAMP()', 
                                  MY_VALUE = '{} + 1',
                                  INSERT_TIME = '{current_value}')
 AS MERGE INTO SNOWFLAKE snow.PUBLIC.MY_TABLE AS target       
    USING (SELECT id AS ID, some_value AS MY_VALUE
	  FROM glue.staging.my_staging_table
	  WHERE $event_time BETWEEN run_start_time() AND run_end_time()) AS source
     ON (target.ID = source.ID)
     WHEN MATCHED THEN REPLACE
     WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

KEEP_EXISTING_VALUES_WHEN_NULL

Type: Boolean

Default: false

(Optional) If enabled, updates to the table preserve the previous non-null value. This option is useful if your update events only contain values for modified columns. This works by coalescing the new value with the existing value. If the new value is null the previous value will be preserved. This means that updating values to null is not supported.

RUN_PARALLELISM — editable

Type: integer

Default: 1

(Optional) Controls how many jobs run in parallel to process a single minute of data from the source table.

Increasing this can lower the end-to-end latency if you have lots of data per minute.

Last updated