Comment on page
Snowflake
This page describes the job options for writing your data to Snowflake.
[ ADD_MISSING_COLUMNS = { TRUE | FALSE } ]
[ AGGREGATION_PARALLELISM = <integer> ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CREATE_TABLE_IF_MISSING = { TRUE | FALSE } ]
[ CUSTOM_INSERT_EXPRESSIONS = (<column_name> = '<expression>', ...) ]
[ CUSTOM_UPDATE_EXPRESSIONS = (<column_name> = '<expression>', ...) ]
[ END_AT = { NOW | timestamp } ]
[ KEEP_EXISTING_VALUES_WHEN_NULL = { TRUE | FALSE } ]
[ RUN_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ RUN_PARALLELISM = <integer> ]
[ START_FROM = { NOW | BEGINNING | timestamp } ]
Snowflake job options:
General job options:
Type:
Boolean
Default:
false
(Optional) When
true
, columns that don't exist in the target table are added automatically when encountered.Type:
<integer> { MINUTE[S] | HOUR[S] | DAY[S] }
(Optional) Defines how often the job will commit to Snowflake. If empty, the
RUN_INTERVAL
value will be used.The
COMMIT_INTERVAL
value must be bigger and divisible by RUN_INTERVAL
.Type:
Boolean
Default:
false
(Optional) When
true
, the Snowflake table will be created automatically, otherwise the table should exist.Type:
array[(column, expression)]
Default:
()
(Optional) Configure a list of custom expression transformations to apply to the value of each column when inserting unmatched (new) rows. Note this is only used in Merge Jobs.
Note: You can use
{}
as a placeholder for the select statement's mapped value.Example:
The following example will set the INSERT_TIME column of the target table to the value of
CURRENT_TIMESTAMP()
and will add 1
to the value of some_value
when inserting new rows.CREATE SYNC JOB my_merge_job
START_FROM = BEGINNING
RUN_INTERVAL = 1 MINUTE
CUSTOM_INSERT_EXPRESSIONS = (INSERT_TIME = 'CURRENT_TIMESTAMP()', MY_VALUE = '{} + 1')
AS MERGE INTO SNOWFLAKE snow.PUBLIC.MY_TABLE AS target
USING (SELECT id AS ID, some_value as MY_VALUE
FROM glue.staging.my_staging_table
WHERE time_filter()) source
ON (target.ID = source.ID)
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;
Type:
array[(column, expression)]
Default:
()
(Optional) Configure a list of custom expression transformations to apply to the value of each column when updating matched rows. Note this is only used in Merge Jobs.
Note: You can use
{}
as a placeholder for the mapped value from the select statement.Example:
The following example will set the UPDATE_TIME column of the taget table to the value of
CURRENT_TIMESTAMP()
and will add 1
to the value of some_value
when updating existing rows.CREATE SYNC JOB my_merge_job
START_FROM = BEGINNING
RUN_INTERVAL = 1 MINUTE
CUSTOM_UPDATED_EXPRESSIONS = (UPDATE_TIME = 'CURRENT_TIMESTAMP()', MY_VALUE = '{} + 1')
AS MERGE INTO SNOWFLAKE snow.PUBLIC.MY_TABLE AS target
USING (SELECT id AS ID, some_value as MY_VALUE
FROM glue.staging.my_staging_table
WHERE time_filter()) source
ON (target.ID = source.ID)
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;
Type:
boolean
Default:
false
(Optional) If enabled, updates to the table preserve the previous non-null value. This option is useful if your update events only contain values for modified columns. This works by coalescing the new value with the existing value. If the new value is null the previous value will be preserved. This means that updating values to
null
is not supported.Note: If a non null value is set in the same commit that a null value is set, the non null value will not be kept. Only values that already exist in the table will be preserved.
Upsolver enables you to manage the replication of your database data (CDC) using a powerful template that automatically generates a job for each table in your publication. Using a staging table partitioned by
$full_table_name
, $schema_name
, and $table_name
, Upsolver extracts the schema and table names to pass into variables into the template to create a job for each table.The example below creates a staging table in the data lake with the required partition - note that a time column must be included in the partition and therefore the
$event_date
is also specified: -- Create a staging table for the changed data
CREATE TABLE default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging(
$full_table_name STRING,
$schema_name STRING,
$table_name STRING,
$primary_key STRING
)
PARTITION BY $full_table_name, $schema_name, $table_name, $event_date
DISABLE_COMPACTION = true
TABLE_DATA_RETENTION = 7 DAYS
RETENTION_DATE_PARTITION = $event_date
EXPOSE_IN_CATALOG = false;
No columns have been defined, enabling Upsolver to automatically create them based on the table and column specification of the publication.
You can then write a job to ingest the data using the
TABLE_INCLUDE_LIST
and COLUMN_EXCLUDE_LIST
to control which data to include:-- Create a job to copy the changes from the source database into the staging table
CREATE SYNC JOB copy_postgresql_to_staging
PUBLICATION_NAME = 'orderspublication'
AS COPY FROM POSTGRES my_postgres_connection
TABLE_INCLUDE_LIST = ('orders.customers|orders.items|orders.categories|orders.invoices')
COLUMN_EXCLUDE_LIST = ('orders.customers.email', 'orders.customers.phone')
INTO default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging;
The above example specifies that the customers, items, categories, and invoices tables, all in the orders schema, are ingested. A regular expression could also be used to specify the list of objects.
For a full list of available job options and information on configuring the options applicable to your job, please refer to Ingestion jobs.
Upsolver takes a snapshot of the data in the published tables and thereafter uses the log to discover changes to apply to the target.
The following template uses the schema and column names to generate the sub-jobs:
-- Create a main job that generates a sub-job for each table
CREATE SYNC JOB postgres_to_snowflake_output
RUN_INTERVAL = 5 MINUTES
CREATE_TABLE_IF_MISSING = true
ADD_MISSING_COLUMNS = true
AS MERGE INTO SNOWFLAKE my_snowflake_connection.{UPSOLVER_SCHEMA_NAME_<GUID>}.{UPSOLVER_TABLE_NAME_<GUID>} AS target
USING (
SELECT *,
$is_delete::BOOLEAN AS UPSOLVER_IS_DELETE_<GUID>,
COALESCE($primary_key::STRING, UUID_GENERATOR('')) AS UPSOLVER_PRIMARY_KEY,
$event_time::STRING AS UPSOLVER_EVENT_TIME,
UPPER($table_name) AS UPSOLVER_TABLE_NAME_<GUID>
FROM default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging
WHERE time_filter()
) AS source
ON source.UPSOLVER_PRIMARY_KEY = target.UPSOLVER_PRIMARY_KEY
WHEN MATCHED AND UPSOLVER_IS_DELETE_<guid> THEN DELETE
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;
In this example, the template defines that a job will be created for each table in the replication, in this case, the four tables that were specified. In the
MERGE INTO SNOWFLAKE
statement, the {UPSOLVER_SCHEMA_NAME_<GUID>}.{UPSOLVER_TABLE_NAME_<GUID>}
placeholders act as parameters for the schema and table names detected in the staging table. When writing your script, replace <GUID>
with any unique name, e.g. {UPSOLVER_SCHEMA_NAME_73HS8C3A}.{UPSOLVER_TABLE_NAME_73HS8C3A}
.Upsolver uses schema evolution to manage DDL changes that affect the replicated tables. For example, if a new table is detected, Upsolver automatically creates a new job and the target table in Snowflake. It also handles any new columns that appear, alongside changes in type; if a column that was previously an integer column changes to a string, the job won't break or require developer intervention as Upsolver manages the amendment.
After generating your jobs, you can use the system tables to monitor the performance of your jobs. These are easy to identify in the
system.monitoring.jobs
table as they are prefixed with virtual_.The following example returns the top 10 sub-jobs from the
system.monitoring.jobs
table:-- Return a list of sub-jobs from the system monitoring table
SELECT *
FROM system.monitoring.jobs
WHERE SUBSTRING(job_id, 1, 8) = 'virtual_'
LIMIT 10;
For in-depth information and a step-by-step guide to using this feature, please see the how-to guide Replicate your database data (CDC) into your warehouse - PostgreSQL to Snowflake.
When a change occurs to the schema of your data source, Upsolver automatically detects the change during ingestion and dynamically applies it to the target. This change could be the addition or removal of a column, or a change in data type for a column.
In a transformation job, you can use
SELECT *
to specify that all columns are included in the pipeline. Upsolver performs schema discovery based on the metadata and continues to dynamically monitor changes as data is ingested.Records are parsed to extract a mapping of column names and infer the column type according to the value detected in each record. Therefore, a column type is determined on the first encounter and adjusted as more values are parsed.
For example, consider the following job:
CREATE SYNC JOB snowflake_dynamic_schema_evolution
START_FROM = BEGINNING
RUN_INTERVAL = 5 MINUTES
CREATE_TABLE_IF_MISSING = true
AS INSERT INTO SNOWFLAKE snowflake_connection.orders.order_history MAP_COLUMNS_BY_NAME
SELECT *
FROM default_glue_catalog.upsolver_samples.orders
WHERE time_filter();
The job runs, and a record is ingested with the following structure:
{“details": "text"}
In this record, Upsolver discovers a column named details with a string type. Columns and rows are continuously added to the target table and, as the data is loaded, a different data type is discovered in the details column:
{"details": 1}
Now, the job has found the same column name (details), with two different types - a string (VARCHAR) and a number (BIGINT) - and it needs to provide a coherent, updated details record.
How does Upsolver resolve this apparent conflict? This is where the evolution happens: under the hood, Upsolver allows the existence of multiple types for each column and applies type-casting where possible. The new types are added as DETAILS_BIGINT and where the value encountered can be cast to a number, Upsolver applies the casting to populate the column for this record. If no such casting is supported by the target platform, e.g. Snowflake, a NULL value is used instead.
Each data platform has its own methods for handling data types. For Snowflake, Upsolver uses the following mapping to perform casting that is otherwise not supported, where the value is denoted as
v
:Source Type | Target Type | Expression |
---|---|---|
BIGINT | TIMESTAMP | TRY_CAST($v / 1000 as TIMESTAMP) |
TIMESTAMP | BIGINT | EXTRACT(EPOCH_MILLISECONDS FROM $v) |
BIGINT | DATE | TRY_CAST(TRY_CAST($v * 86400 AS VARCHAR) AS DATE |
DATE | BIGINT | TRY_CAST(EXTRACT(EPOCH_SECOND FROM $v) / 86400 AS BIGINT) |
Last modified 1mo ago