Replication jobs

The page describes the options for creating a replication job.

Replication jobs copy change data capture (CDC) data from enabled databases into one or more target schemas in Snowflake. The source data is ingested once into a staging table that each replication group then reads from to update the target. While sharing the source data, replication groups can be individually configured using different intervals and options to write to a separate schema. This means you create one entity and Upsolver automatically generates a job for each replication group and manages the freshness of data in each target based on the commit interval that you specify.

Prior to creating your job, ensure you have created the target schemas in your Snowflake database, or the jobs will fail.

Please note that Upsolver creates the target tables in Snowflake in upper case.

Syntax

CREATE REPLICATION JOB <job_identifier>
    [{ job_options }]  
FROM <source_connection_name> 
    [{ source_options }]
WITH REPLICATION GROUP <group_name> 
    [{ replication_group_options }]
[ WITH REPLICATION GROUP ... ];

Job identifier

This is the name of your job, which should match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

The following job options can be applied to all data sources:

[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ INTERMEDIATE_STORAGE_CONNECTION ]
[ INTERMEDIATE_STORAGE_LOCATION ]
[ SNAPSHOT_PARALLELISM = <int> ]

Jump to

COMMENT — editable

Type: text

(Optional) A description or comment regarding this job.

COMPUTE_CLUSTER — editable

Type: identifier

Default: The sole cluster in your environment

(Optional) The compute cluster to run this job.

This option can only be omitted when there is only one cluster in your environment.

If you have more than one compute cluster, you need to determine which one to use through this option.

INTERMEDIATE_STORAGE_CONNECTION

Type: text

The name of the connection you are using to stage the data ingested from your CDC source.

INTERMEDIATE_STORAGE_LOCATION

Type: text

The name of the location you are using to stage the data ingested from your CDC source.

SNAPSHOT_PARALLELISM

Type: int

Default: 1

(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, running more snapshots in parallel increases the load on the source database.

Source options

Visit the pages below for source-specific options:

Replication group options

A replication group must include INCLUDED_TABLES_REGEX (and optionally EXCLUDED_COLUMNS_REGEX) or INCLUDED_SCHEMA_DEFINITION (and optionally EXCLUDED_SCHEMA_DEFINITION).

The following replication group options apply to all data sources.

{    
    [ INCLUDED_TABLES_REGEX = ('<included_tables_regex>', ...) 
    EXCLUDED_COLUMNS_REGEX = ('<excluded_columns_regex>'[, ...]) ] 
        | 
    [ INCLUDED_SCHEMA_DEFINITION = (<included_table_pattern>)
    EXCLUDED_SCHEMA_DEFINITION = (<excluded_table_pattern>) ] 
}
[ COLUMN_TRANSFORMATIONS = (<column_name> = <expresion> [, ...]) ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ LOGICAL_DELETE_COLUMN = <column_identifier> ]                     
[ REPLICATION_TARGET = <target_connection_identifier> ]
[ TARGET_SCHEMA_NAME_EXPRESSION = '<schema_name>' ]
[ TARGET_TABLE_NAME_EXPRESSION = '<table_prefix_name>' || $table_name ]
[ UPSOLVER_EVENT_TIME_COLUMN = <column_identifier> ] 
[ WRITE_MODE = { MERGE | APPEND } ]

Jump to

INCLUDED_TABLES_REGEX

Values: ('regexFilter1', 'regexFilter2')

One or more regular expressions that define which tables should be included in the replication.

For example, ('dbo.prod_.*') includes all tables in the dbo schema prefixed with prod_.

EXCLUDED_COLUMNS_REGEX

Values: ('regexFilter1', 'regexFilter2')

(Optional) Can only be included when INCLUDED_TABLES_REGEX is supplied. One or more regular expressions to define which columns should be excluded from the replication.

For example, ('dbo.*.address_') excludes all columns in all tables prefixed with address_.

INCLUDED_SCHEMA_DEFINITION

Type: text

One or more expressions that define which tables should be included in the replication.

For example, *, schema.*, schema.table.*, or schema.table.column.

EXCLUDED_SCHEMA_DEFINITION

Type: text

(Optional) Can only be included when INCLUDED_SCHEMA_DEFINITION is supplied. One or more expressions to determine which tables should be excluded from the replication.

For example, schema.*, schema.table.

COLUMN_TRANSFORMATIONS

Values: ( <column> = <expression>, ...)

(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream.

See the Functions and operators reference for a full list of options that you can apply.

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Default: 1 MINUTE

(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.

LOGICAL_DELETE_COLUMN

Type: text

(Optional) By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, specify the column name in which Upsolver will mark them as deleted.

Only valid when WRITE_MODE = MERGE.

REPLICATION_TARGET

Type: text

Values: <connection_name>

Specify the name of the target connection to write the data. The connection must be created prior to creating your job.

TARGET_SCHEMA_NAME_EXPRESSION

Type: expression

Specify the name of the target schema in Snowflake that the replication group is writing to. The following inputs are supported: $database_name, $schema_name, $full_table_name and $table_name. If you are ingesting from MongoDB, you can also use the $collection_name input.

As well as using the exact name of the schema, for example, TARGET_SCHEMA_NAME_EXPRESSION = 'SALES', you could also write a case statement to define the target schema, for example:

TARGET_SCHEMA_NAME_EXPRESSION = 
    CASE $schema_name 
        WHEN 'sales' THEN 'SALES' 
        ELSE 'ARCHIVE' 
    END

Similarly, you can use the name of the database to define the target schema as follows:

TARGET_SCHEMA_NAME_EXPRESSION = 
    CASE $database_name 
        WHEN 'Suppliers' THEN 'SUPPLIERS' 
        ELSE 'DISTRIBUTORS' 
    END

TARGET_TABLE_NAME_EXPRESSION

Type: expression

Define how you want Upsolver to name the target tables in Snowflake. The following inputs are supported: $database_name, $schema_name, $full_table_name and $table_name. If you are ingesting from MongoDB, you can also use the $collection_name input.

Optionally, you can include a prefix for the table name. Use in conjunction with $full_table_name, $table_name, or $collection_name to create the target table using the name of the source table or collection. For example, to create the table in Snowflake with the prefix sales_, followed by the name of the source table, use the expression:

TARGET_TABLE_NAME_EXPRESSION = 'sales_' || $table_name::STRING

Furthermore, you could include a CASE statement to determine the name of the target table based on the source database:

TARGET_TABLE_NAME_EXPRESSION = 
    CASE $database_name 
        WHEN 'Suppliers' THEN 'supp_' || $table_name::STRING 
        ELSE 'dist_' || $table_name::STRING 
    END

UPSOLVER_EVENT_TIME_COLUMN

Type: text

Values: <column_name>

(Optional) Upsolver uses distributed locking technology to keep events strongly ordered. The ingestion timestamp is stored in a dedicated column reflecting the order, which is UPSOLVER_EVENT_TIME by default unless specified here.

WRITE_MODE

Values: { MERGE | APPEND }

Defines whether the job should merge the data into the target, or append it. Specifying MERGE instructs Upsolver to create a MERGE job, whereas APPEND creates an INSERT job.

If the WRITE_MODE is set to MERGE, you must include a LOGICAL_DELETE_COLUMN.

Last updated