Replication jobs
The page describes the options for creating a replication job.
Replication jobs copy change data capture (CDC) data from enabled databases into one or more target schemas in Snowflake. The source data is ingested once into a staging table that each replication group then reads from to update the target. While sharing the source data, replication groups can be individually configured using different intervals and options to write to a separate schema. This means you create one entity and Upsolver automatically generates a job for each replication group and manages the freshness of data in each target based on the commit interval that you specify.
Prior to creating your job, ensure you have created the target schemas in your Snowflake database, or the jobs will fail.
Please note that Upsolver creates the target tables in Snowflake in upper case.
Syntax
Job identifier
This is the name of your job, which should match the following format:
Job options
The following job options can be applied to all data sources:
Jump to
COMMENT
— editable
COMMENT
— editableType: text
(Optional) A description or comment regarding this job.
COMPUTE_CLUSTER
— editable
COMPUTE_CLUSTER
— editableType: identifier
Default: The sole cluster in your environment
(Optional) The compute cluster to run this job.
This option can only be omitted when there is only one cluster in your environment.
If you have more than one compute cluster, you need to determine which one to use through this option.
INTERMEDIATE_STORAGE_CONNECTION
INTERMEDIATE_STORAGE_CONNECTION
Type: text
The name of the connection you are using to stage the data ingested from your CDC source.
INTERMEDIATE_STORAGE_LOCATION
INTERMEDIATE_STORAGE_LOCATION
Type: text
The name of the location you are using to stage the data ingested from your CDC source.
SNAPSHOT_PARALLELISM
SNAPSHOT_PARALLELISM
Type: int
Default: 1
(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, running more snapshots in parallel increases the load on the source database.
Source options
Visit the pages below for source-specific options:
Replication group options
A replication group must include INCLUDED_TABLES_REGEX
(and optionally EXCLUDED_COLUMNS_REGEX
) or INCLUDED_SCHEMA_DEFINITION
(and optionally EXCLUDED_SCHEMA_DEFINITION
).
The following replication group options apply to all data sources.
Jump to
INCLUDED_TABLES_REGEX
INCLUDED_TABLES_REGEX
Values: ('regexFilter1', 'regexFilter2'
)
One or more regular expressions that define which tables should be included in the replication.
For example, ('dbo.prod_.*') includes all tables in the dbo schema prefixed with prod_.
EXCLUDED_COLUMNS_REGEX
EXCLUDED_COLUMNS_REGEX
Values: ('regexFilter1', 'regexFilter2'
)
(Optional) Can only be included when INCLUDED_TABLES_REGEX
is supplied. One or more regular expressions to define which columns should be excluded from the replication.
For example, ('dbo.*.address_') excludes all columns in all tables prefixed with address_.
INCLUDED_SCHEMA_DEFINITION
INCLUDED_SCHEMA_DEFINITION
Type: text
One or more expressions that define which tables should be included in the replication.
For example, *, schema.*, schema.table.*, or schema.table.column.
EXCLUDED_SCHEMA_DEFINITION
EXCLUDED_SCHEMA_DEFINITION
Type: text
(Optional) Can only be included when INCLUDED_SCHEMA_DEFINITION
is supplied. One or more expressions to determine which tables should be excluded from the replication.
For example, schema.*, schema.table.
COLUMN_TRANSFORMATIONS
COLUMN_TRANSFORMATIONS
Values: ( <column> = <expression>, ...)
(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.
However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.
If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream.
See the Functions and operators reference for a full list of options that you can apply.
COMMIT_INTERVAL
COMMIT_INTERVAL
Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default: 1 MINUTE
(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.
LOGICAL_DELETE_COLUMN
LOGICAL_DELETE_COLUMN
Type: text
(Optional) By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, specify the column name in which Upsolver will mark them as deleted.
Only valid when WRITE_MODE
= MERGE
.
REPLICATION_TARGET
REPLICATION_TARGET
Type: text
Values: <connection_name>
Specify the name of the target connection to write the data. The connection must be created prior to creating your job.
TARGET_SCHEMA_NAME_EXPRESSION
TARGET_SCHEMA_NAME_EXPRESSION
Type: expression
Specify the name of the target schema in Snowflake that the replication group is writing to. The following inputs are supported: $database_name
, $schema_name
, $full_table_name
and $table_name
. If you are ingesting from MongoDB, you can also use the $collection_name
input.
As well as using the exact name of the schema, for example, TARGET_SCHEMA_NAME_EXPRESSION = 'SALES'
, you could also write a case statement to define the target schema, for example:
Similarly, you can use the name of the database to define the target schema as follows:
TARGET_TABLE_NAME_EXPRESSION
TARGET_TABLE_NAME_EXPRESSION
Type: expression
Define how you want Upsolver to name the target tables in Snowflake. The following inputs are supported: $database_name
, $schema_name
, $full_table_name
and $table_name
. If you are ingesting from MongoDB, you can also use the $collection_name
input.
Optionally, you can include a prefix for the table name. Use in conjunction with $full_table_name
, $table_name,
or $collection_name
to create the target table using the name of the source table or collection. For example, to create the table in Snowflake with the prefix sales_, followed by the name of the source table, use the expression:
Furthermore, you could include a CASE
statement to determine the name of the target table based on the source database:
UPSOLVER_EVENT_TIME_COLUMN
UPSOLVER_EVENT_TIME_COLUMN
Type: text
Values: <column_name>
(Optional) Upsolver uses distributed locking technology to keep events strongly ordered. The ingestion timestamp is stored in a dedicated column reflecting the order, which is UPSOLVER_EVENT_TIME
by default unless specified here.
WRITE_MODE
WRITE_MODE
Values: { MERGE | APPEND }
Defines whether the job should merge the data into the target, or append it. Specifying MERGE
instructs Upsolver to create a MERGE
job, whereas APPEND
creates an INSERT
job.
If the WRITE_MODE
is set to MERGE
, you must include a LOGICAL_DELETE_COLUMN
.
Last updated