Snowflake
This page describes the job options for writing your data to Snowflake.
Syntax
Jump to
Snowflake job options:
General job options:
Job options
ADD_MISSING_COLUMNS
Type: Boolean
Default: false
(Optional) When true
, columns that don't exist in the target table are added automatically when encountered.
When false
, you cannot do SELECT *
within the SELECT
statement of your transformation job.
COMMIT_INTERVAL
Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default: RUN_INTERVAL
(Optional) Defines how often the job will commit to Snowflake. If empty, the RUN_INTERVAL
value will be used.
The COMMIT_INTERVAL
value must be bigger and divisible by RUN_INTERVAL
.
CREATE_TABLE_IF_MISSING
Type: Boolean
Default: false
(Optional) When true
, the Snowflake table will be created automatically, otherwise the table should exist.
CUSTOM_INSERT_EXPRESSIONS
Type: array[(column, expression)]
Default: ()
(Optional) Configure a list of custom expression transformations to apply to the value of each column when inserting unmatched (new) rows. Note this is only used in Merge Jobs.
Note: You can use {}
as a placeholder for the select statement's mapped value.
Example:
The following example will set the INSERT_TIME column of the target table to the value of CURRENT_TIMESTAMP()
and will add 1
to the value of some_value
when inserting new rows.
CUSTOM_UPDATE_EXPRESSIONS
Type: array[(column, expression)]
Default: ()
(Optional) Configure a list of custom expression transformations to apply to the value of each column when updating matched rows. Note this is only used in Merge Jobs.
Note: You can use {}
as a placeholder for the mapped value from the select statement.
Example:
The following example will set the UPDATE_TIME column of the taget table to the value of CURRENT_TIMESTAMP()
and will add 1
to the value of some_value
when updating existing rows.
KEEP_EXISTING_VALUES_WHEN_NULL
Type: boolean
Default: false
(Optional) If enabled, updates to the table preserve the previous non-null value. This option is useful if your update events only contain values for modified columns. This works by coalescing the new value with the existing value. If the new value is null the previous value will be preserved. This means that updating values to null
is not supported.
Note: If a non null value is set in the same commit that a null value is set, the non null value will not be kept. Only values that already exist in the table will be preserved.
Replicate database data (CDC) into Snowflake
Upsolver enables you to manage the replication of your database data (CDC) using a powerful template that automatically generates a job for each table in your publication. Using a staging table partitioned by $full_table_name
, $schema_name
, and $table_name
, Upsolver extracts the schema and table names to pass into variables into the template to create a job for each table.
The example below creates a staging table in the data lake with the required partition - note that a time column must be included in the partition and therefore the $event_date
is also specified:
No columns have been defined, enabling Upsolver to automatically create them based on the table and column specification of the publication.
For more information, please see CREATE TABLE.
You can then write a job to ingest the data using the TABLE_INCLUDE_LIST
and COLUMN_EXCLUDE_LIST
to control which data to include:
The above example specifies that the customers, items, categories, and invoices tables, all in the orders schema, are ingested. A regular expression could also be used to specify the list of objects.
For a full list of available job options and information on configuring the options applicable to your job, please refer to Ingestion jobs.
Upsolver takes a snapshot of the data in the published tables and thereafter uses the log to discover changes to apply to the target.
The following template uses the schema and column names to generate the sub-jobs:
In this example, the template defines that a job will be created for each table in the replication, in this case, the four tables that were specified. In the MERGE INTO SNOWFLAKE
statement, the {UPSOLVER_SCHEMA_NAME_<GUID>}.{UPSOLVER_TABLE_NAME_<GUID>}
placeholders act as parameters for the schema and table names detected in the staging table. When writing your script, replace <GUID>
with any unique name, e.g. {UPSOLVER_SCHEMA_NAME_73HS8C3A}.{UPSOLVER_TABLE_NAME_73HS8C3A}
.
Upsolver uses schema evolution to manage DDL changes that affect the replicated tables. For example, if a new table is detected, Upsolver automatically creates a new job and the target table in Snowflake. It also handles any new columns that appear, alongside changes in type; if a column that was previously an integer column changes to a string, the job won't break or require developer intervention as Upsolver manages the amendment.
After generating your jobs, you can use the system tables to monitor the performance of your jobs. These are easy to identify in the system.monitoring.jobs
table as they are prefixed with virtual_.
The following example returns the top 10 sub-jobs from the system.monitoring.jobs
table:
For in-depth information and a step-by-step guide to using this feature, please see the how-to guide Replicate your database data (CDC) into your warehouse - PostgreSQL to Snowflake.
Managing schema evolution
When a change occurs to the schema of your data source, Upsolver automatically detects the change during ingestion and dynamically applies it to the target. This change could be the addition or removal of a column, or a change in data type for a column.
In a transformation job, you can use SELECT *
to specify that all columns are included in the pipeline. Upsolver performs schema discovery based on the metadata and continues to dynamically monitor changes as data is ingested.
Records are parsed to extract a mapping of column names and infer the column type according to the value detected in each record. Therefore, a column type is determined on the first encounter and adjusted as more values are parsed.
For example, consider the following job:
The job runs, and a record is ingested with the following structure:
{“details": "text"}
In this record, Upsolver discovers a column named details with a string type. Columns and rows are continuously added to the target table and, as the data is loaded, a different data type is discovered in the details column:
{"details": 1}
Now, the job has found the same column name (details), with two different types - a string (VARCHAR) and a number (BIGINT) - and it needs to provide a coherent, updated details record.
Conflict resolution
How does Upsolver resolve this apparent conflict? This is where the evolution happens: under the hood, Upsolver allows the existence of multiple types for each column and applies type-casting where possible. The new types are added as DETAILS_BIGINT and where the value encountered can be cast to a number, Upsolver applies the casting to populate the column for this record. If no such casting is supported by the target platform, e.g. Snowflake, a NULL value is used instead.
Special casting
Each data platform has its own methods for handling data types. For Snowflake, Upsolver uses the following mapping to perform casting that is otherwise not supported, where the value is denoted as v
:
Source Type | Target Type | Expression |
---|---|---|
BIGINT | TIMESTAMP | TRY_CAST($v / 1000 as TIMESTAMP) |
TIMESTAMP | BIGINT | EXTRACT(EPOCH_MILLISECONDS FROM $v) |
BIGINT | DATE | TRY_CAST(TRY_CAST($v * 86400 AS VARCHAR) AS DATE |
DATE | BIGINT | TRY_CAST(EXTRACT(EPOCH_SECOND FROM $v) / 86400 AS BIGINT) |
Last updated