Snowflake

This page describes the job options for writing your data to Snowflake.

Syntax

[ ADD_MISSING_COLUMNS = { TRUE | FALSE } ]
[ AGGREGATION_PARALLELISM = <integer> ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CREATE_TABLE_IF_MISSING = { TRUE | FALSE } ]
[ CUSTOM_INSERT_EXPRESSIONS = (<column_name> = '<expression>', ...) ]
[ CUSTOM_UPDATE_EXPRESSIONS = (<column_name> = '<expression>', ...) ]
[ END_AT = { NOW | timestamp } ]
[ KEEP_EXISTING_VALUES_WHEN_NULL = { TRUE | FALSE } ]
[ RUN_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ RUN_PARALLELISM = <integer> ]
[ START_FROM = { NOW | BEGINNING | timestamp } ]

Jump to

Snowflake job options:

General job options:

Job options

ADD_MISSING_COLUMNS

Type: Boolean

Default: false

(Optional) When true, columns that don't exist in the target table are added automatically when encountered.

When false, you cannot do SELECT * within the SELECT statement of your transformation job.

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Default: RUN_INTERVAL

(Optional) Defines how often the job will commit to Snowflake. If empty, the RUN_INTERVAL value will be used.

The COMMIT_INTERVAL value must be bigger and divisible by RUN_INTERVAL.

CREATE_TABLE_IF_MISSING

Type: Boolean

Default: false

(Optional) When true, the Snowflake table will be created automatically, otherwise the table should exist.

CUSTOM_INSERT_EXPRESSIONS

Type: array[(column, expression)]

Default: ()

(Optional) Configure a list of custom expression transformations to apply to the value of each column when inserting unmatched (new) rows. Note this is only used in Merge Jobs.

Note: You can use {} as a placeholder for the select statement's mapped value.

Example:

The following example will set the INSERT_TIME column of the target table to the value of CURRENT_TIMESTAMP() and will add 1 to the value of some_value when inserting new rows.

CREATE SYNC JOB my_merge_job
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    CUSTOM_INSERT_EXPRESSIONS = (INSERT_TIME = 'CURRENT_TIMESTAMP()', MY_VALUE = '{} + 1')
 AS MERGE INTO SNOWFLAKE snow.PUBLIC.MY_TABLE AS target       
    USING (SELECT  id AS ID, some_value as MY_VALUE
	       FROM glue.staging.my_staging_table
	       WHERE time_filter()) source
     ON (target.ID = source.ID)
     WHEN MATCHED THEN REPLACE
     WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

CUSTOM_UPDATE_EXPRESSIONS

Type: array[(column, expression)]

Default: ()

(Optional) Configure a list of custom expression transformations to apply to the value of each column when updating matched rows. Note this is only used in Merge Jobs.

Note: You can use {} as a placeholder for the mapped value from the select statement.

Example:

The following example will set the UPDATE_TIME column of the taget table to the value of CURRENT_TIMESTAMP() and will add 1 to the value of some_value when updating existing rows.

CREATE SYNC JOB my_merge_job
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    CUSTOM_UPDATED_EXPRESSIONS = (UPDATE_TIME = 'CURRENT_TIMESTAMP()', MY_VALUE = '{} + 1')
 AS MERGE INTO SNOWFLAKE snow.PUBLIC.MY_TABLE AS target       
    USING (SELECT  id AS ID, some_value as MY_VALUE
	       FROM glue.staging.my_staging_table
	       WHERE time_filter()) source
     ON (target.ID = source.ID)
     WHEN MATCHED THEN REPLACE
     WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

KEEP_EXISTING_VALUES_WHEN_NULL

Type: boolean

Default: false

(Optional) If enabled, updates to the table preserve the previous non-null value. This option is useful if your update events only contain values for modified columns. This works by coalescing the new value with the existing value. If the new value is null the previous value will be preserved. This means that updating values to null is not supported.

Note: If a non null value is set in the same commit that a null value is set, the non null value will not be kept. Only values that already exist in the table will be preserved.

Replicate database data (CDC) into Snowflake

Upsolver enables you to manage the replication of your database data (CDC) using a powerful template that automatically generates a job for each table in your publication. Using a staging table partitioned by $full_table_name, $schema_name, and $table_name, Upsolver extracts the schema and table names to pass into variables into the template to create a job for each table.

The example below creates a staging table in the data lake with the required partition - note that a time column must be included in the partition and therefore the $event_date is also specified:

 -- Create a staging table for the changed data
CREATE TABLE default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging(
    $full_table_name STRING, 
    $schema_name STRING,
    $table_name STRING, 
    $primary_key STRING
)
PARTITION BY $full_table_name, $schema_name, $table_name, $event_date
DISABLE_COMPACTION = true
TABLE_DATA_RETENTION = 7 DAYS
RETENTION_DATE_PARTITION = $event_date
EXPOSE_IN_CATALOG = false;

No columns have been defined, enabling Upsolver to automatically create them based on the table and column specification of the publication.

For more information, please see CREATE TABLE.

You can then write a job to ingest the data using the TABLE_INCLUDE_LIST and COLUMN_EXCLUDE_LIST to control which data to include:

-- Create a job to copy the changes from the source database into the staging table
CREATE SYNC JOB copy_postgresql_to_staging
    PUBLICATION_NAME = 'orderspublication'
AS COPY FROM POSTGRES my_postgres_connection
    TABLE_INCLUDE_LIST = ('orders.customers|orders.items|orders.categories|orders.invoices')
    COLUMN_EXCLUDE_LIST = ('orders.customers.email', 'orders.customers.phone')
INTO default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging;

The above example specifies that the customers, items, categories, and invoices tables, all in the orders schema, are ingested. A regular expression could also be used to specify the list of objects.

For a full list of available job options and information on configuring the options applicable to your job, please refer to Ingestion jobs.

Upsolver takes a snapshot of the data in the published tables and thereafter uses the log to discover changes to apply to the target.

The following template uses the schema and column names to generate the sub-jobs:

-- Create a main job that generates a sub-job for each table
CREATE SYNC JOB postgres_to_snowflake_output
       RUN_INTERVAL = 5 MINUTES
       CREATE_TABLE_IF_MISSING = true
       ADD_MISSING_COLUMNS = true
AS MERGE INTO SNOWFLAKE my_snowflake_connection.{UPSOLVER_SCHEMA_NAME_<GUID>}.{UPSOLVER_TABLE_NAME_<GUID>} AS target
USING (
       SELECT *,       
       $is_delete::BOOLEAN AS UPSOLVER_IS_DELETE_<GUID>,
       COALESCE($primary_key::STRING, UUID_GENERATOR('')) AS UPSOLVER_PRIMARY_KEY,
       $event_time::STRING AS UPSOLVER_EVENT_TIME,
       UPPER($table_name) AS UPSOLVER_TABLE_NAME_<GUID>    
       FROM default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging
       WHERE time_filter()
) AS source
ON source.UPSOLVER_PRIMARY_KEY = target.UPSOLVER_PRIMARY_KEY
WHEN MATCHED AND UPSOLVER_IS_DELETE_<guid> THEN DELETE
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

In this example, the template defines that a job will be created for each table in the replication, in this case, the four tables that were specified. In the MERGE INTO SNOWFLAKE statement, the {UPSOLVER_SCHEMA_NAME_<GUID>}.{UPSOLVER_TABLE_NAME_<GUID>} placeholders act as parameters for the schema and table names detected in the staging table. When writing your script, replace <GUID> with any unique name, e.g. {UPSOLVER_SCHEMA_NAME_73HS8C3A}.{UPSOLVER_TABLE_NAME_73HS8C3A}.

Upsolver uses schema evolution to manage DDL changes that affect the replicated tables. For example, if a new table is detected, Upsolver automatically creates a new job and the target table in Snowflake. It also handles any new columns that appear, alongside changes in type; if a column that was previously an integer column changes to a string, the job won't break or require developer intervention as Upsolver manages the amendment.

After generating your jobs, you can use the system tables to monitor the performance of your jobs. These are easy to identify in the system.monitoring.jobs table as they are prefixed with virtual_.

The following example returns the top 10 sub-jobs from the system.monitoring.jobs table:

-- Return a list of sub-jobs from the system monitoring table
SELECT * 
FROM system.monitoring.jobs
WHERE SUBSTRING(job_id, 1, 8) = 'virtual_'  
LIMIT 10;

For in-depth information and a step-by-step guide to using this feature, please see the how-to guide Replicate your database data (CDC) into your warehouse - PostgreSQL to Snowflake.

Managing schema evolution

When a change occurs to the schema of your data source, Upsolver automatically detects the change during ingestion and dynamically applies it to the target. This change could be the addition or removal of a column, or a change in data type for a column.

In a transformation job, you can use SELECT * to specify that all columns are included in the pipeline. Upsolver performs schema discovery based on the metadata and continues to dynamically monitor changes as data is ingested.

Records are parsed to extract a mapping of column names and infer the column type according to the value detected in each record. Therefore, a column type is determined on the first encounter and adjusted as more values are parsed.

For example, consider the following job:

CREATE SYNC JOB snowflake_dynamic_schema_evolution
  START_FROM = BEGINNING
  RUN_INTERVAL = 5 MINUTES
  CREATE_TABLE_IF_MISSING = true
AS INSERT INTO SNOWFLAKE snowflake_connection.orders.order_history MAP_COLUMNS_BY_NAME
     SELECT *
     FROM default_glue_catalog.upsolver_samples.orders
     WHERE time_filter();

The job runs, and a record is ingested with the following structure:

{“details": "text"}

In this record, Upsolver discovers a column named details with a string type. Columns and rows are continuously added to the target table and, as the data is loaded, a different data type is discovered in the details column:

{"details": 1}

Now, the job has found the same column name (details), with two different types - a string (VARCHAR) and a number (BIGINT) - and it needs to provide a coherent, updated details record.

Conflict resolution

How does Upsolver resolve this apparent conflict? This is where the evolution happens: under the hood, Upsolver allows the existence of multiple types for each column and applies type-casting where possible. The new types are added as DETAILS_BIGINT and where the value encountered can be cast to a number, Upsolver applies the casting to populate the column for this record. If no such casting is supported by the target platform, e.g. Snowflake, a NULL value is used instead.

Special casting

Each data platform has its own methods for handling data types. For Snowflake, Upsolver uses the following mapping to perform casting that is otherwise not supported, where the value is denoted as v:

Source TypeTarget TypeExpression

BIGINT

TIMESTAMP

TRY_CAST($v / 1000 as TIMESTAMP)

TIMESTAMP

BIGINT

EXTRACT(EPOCH_MILLISECONDS FROM $v)

BIGINT

DATE

TRY_CAST(TRY_CAST($v * 86400 AS VARCHAR) AS DATE

DATE

BIGINT

TRY_CAST(EXTRACT(EPOCH_SECOND FROM $v) / 86400 AS BIGINT)

Last updated