Replication

The page describes the options for creating a replication job.

Replication jobs copy change data capture (CDC) data from enabled databases into one or more target schemas in the following supported targets:

  • Amazon Redshift

  • AWS Glue Data Catalog

  • Polaris Catalog

  • Snowflake

The source data is ingested into a staging table that each replication group then reads from to update the target. While sharing the source data, replication groups can be individually configured using different intervals and options to write to a separate schema. Therefore you create one entity and Upsolver automatically generates a job for each replication group, managing the freshness of data in each target based on the commit interval that you specify.

Please note:

  • Target tables in Snowflake are created in upper case.

  • Target tables in Amazon Redshift are created in lower case.

Syntax

CREATE REPLICATION JOB <job_identifier>
    [{ job_options }]  
FROM <source_connection_name> 
    [{ source_options }]
WITH REPLICATION GROUP <group_name>
    [{ replication_group_options }]
[ WITH REPLICATION GROUP ... ];

Job identifier

This is the name of your job, which should match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

The following job options can be applied to all sources.:

[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ INTERMEDIATE_STORAGE_CONNECTION ]
[ INTERMEDIATE_STORAGE_LOCATION ]
[ SNAPSHOT_PARALLELISM = <integer> ]

Jump to

COMMENT — editable

Type: text

(Optional) A description or comment regarding this job.

COMPUTE_CLUSTER — editable

Type: identifier

Default: The sole cluster in your environment

(Optional) The compute cluster to run this job.

This option can only be omitted when there is only one cluster in your environment.

If you have more than one compute cluster, you need to determine which one to use through this option.

INTERMEDIATE_STORAGE_CONNECTION

Type: text

The name of the connection you are using to stage the data ingested from your CDC source.

INTERMEDIATE_STORAGE_LOCATION

Type: text

The name of the location you are using to stage the data ingested from your CDC source.

SNAPSHOT_PARALLELISM

Type: int

Default: 1

(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, running more snapshots in parallel increases the load on the source database.


Source options

The following source options apply to PostgreSQL:

[ HEARTBEAT_TABLE = '<heartbeat_name>' ]
[ PUBLICATION_NAME = ('regexFilter1', 'regexFilter2') ]

PostgreSQL source options:

HEARTBEAT_TABLE

Type: string

(Optional) If it is not set, no heartbeat table is used. Using a heartbeat table is recommended to avoid the replication slot growing indefinitely when no CDC events are captured for the subscribed tables.

PUBLICATION_NAME

Type: text

Adds a new publication to the current database. The publication name must be distinct from the name of any existing publication in the current database. DDL will be filtered.


Replication group options

A replication group must include INCLUDED_TABLES_REGEX (and optionally EXCLUDED_COLUMNS_REGEX) or INCLUDED_SCHEMA_DEFINITION (and optionally EXCLUDED_SCHEMA_DEFINITION).

The following replication group options apply to all data sources with the exception of the differences noted in the option descriptions below.

{    
    [ INCLUDED_TABLES_REGEX = ('<included_tables_regex>', ...) 
    EXCLUDED_COLUMNS_REGEX = ('<excluded_columns_regex>'[, ...]) ] 
        | 
    [ INCLUDED_SCHEMA_DEFINITION = (<included_table_pattern>)
    EXCLUDED_SCHEMA_DEFINITION = (<excluded_table_pattern>) ] 
}
[ COLUMN_TRANSFORMATIONS = (<column_name> = <expresion> [, ...]) ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ LOGICAL_DELETE_COLUMN = <column_identifier> ]     
[ OPERATION_TYPE_COLUMN = <column_identifier> ]   
[ PRIMARY_KEY_COLUMN = <column_identifier> ]                
[ REPLICATION_TARGET = <target_connection_identifier> ]
[ TARGET_SCHEMA_NAME_EXPRESSION = '<schema_name>' ]
[ TARGET_TABLE_NAME_EXPRESSION = '<table_prefix_name>' || $table_name ]
[ UPSOLVER_EVENT_TIME_COLUMN = <column_identifier> ] 
[ WRITE_MODE = { MERGE | APPEND } ]

Jump to

COMMIT_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Default: 1 MINUTE

(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.

INCLUDED_TABLES_REGEX

Values: ('regexFilter1', 'regexFilter2')

One or more regular expressions that define which tables should be included in the replication.

For example, ('dbo.prod_.*') includes all tables in the dbo schema prefixed with prod_.

EXCLUDED_COLUMNS_REGEX

Values: ('regexFilter1', 'regexFilter2')

(Optional) Can only be included when INCLUDED_TABLES_REGEX is supplied. One or more regular expressions to define which columns should be excluded from the replication.

For example, ('dbo.*.address_') excludes all columns in all tables prefixed with address_.

INCLUDED_SCHEMA_DEFINITION

Type: text

One or more expressions that define which tables should be included in the replication.

For example, *, schema.*, schema.table.*, or schema.table.column.

EXCLUDED_SCHEMA_DEFINITION

Type: text

(Optional) Can only be included when INCLUDED_SCHEMA_DEFINITION is supplied. One or more expressions to determine which tables should be excluded from the replication.

For example, schema.*, schema.table.

COLUMN_TRANSFORMATIONS

Values: ( <column> = <expression>, ...)

(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream.

See the Functions and Operators references for a full list of options that you can apply.

LOGICAL_DELETE_COLUMN

Type: text

(Optional) By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, specify the column name in which Upsolver will mark them as deleted.

The column will be named UPSOLVER_IS_DELETED unless specified otherwise.

This option is valid only when using WRITE_MODE = MERGE.

OPERATION_TYPE_COLUMN

Type: text

(Optional) When you choose to append data in the target, Upsolver creates a column to store the operation type that occurred on the source database, e.g. insert, update, delete.

By default, this column is named UPSOLVER_RECORD_TYPE. To rename this column, use the OPERATION_TYPE_COLUMN option and specify the column name.

This option is valid only when using WRITE_MODE = APPEND.

PRIMARY_KEY_COLUMN

Type: text

Values: <column_name>

(Optional) Defines the column name to be created and used to store the original primary key value of the source tables. If not specified, the column name UPSOLVER_PRIMARY_KEY will be used.

REPLICATION_TARGET

Type: text

Values: <connection_name>

Specify the name of the target connection to write the data. The connection must be created prior to creating your job.

TARGET_SCHEMA_NAME_EXPRESSION

Type: expression

Specify the name of the target schema in Snowflake that the replication group is writing to. The following inputs are supported: $database_name, $schema_name, $full_table_name and $table_name. If you are ingesting from MongoDB, you can also use the $collection_name input.

As well as using the exact name of the schema, for example, TARGET_SCHEMA_NAME_EXPRESSION = 'SALES', you could also write a case statement to define the target schema, for example:

TARGET_SCHEMA_NAME_EXPRESSION = 
    CASE $schema_name 
        WHEN 'sales' THEN 'SALES' 
        ELSE 'ARCHIVE' 
    END

Similarly, you can use the name of the database to define the target schema as follows:

TARGET_SCHEMA_NAME_EXPRESSION = 
    CASE $database_name 
        WHEN 'Suppliers' THEN 'SUPPLIERS' 
        ELSE 'DISTRIBUTORS' 
    END

TARGET_TABLE_NAME_EXPRESSION

Type: expression

Define how you want Upsolver to name the target tables in Snowflake. The following inputs are supported: $database_name, $schema_name, $full_table_name and $table_name. If you are ingesting from MongoDB, you can also use the $collection_name input.

Optionally, you can include a prefix for the table name. Use in conjunction with $full_table_name, $table_name, or $collection_name to create the target table using the name of the source table or collection. For example, to create the table in Snowflake with the prefix sales_, followed by the name of the source table, use the expression:

TARGET_TABLE_NAME_EXPRESSION = 'sales_' || $table_name::STRING

Furthermore, you could include a CASE statement to determine the name of the target table based on the source database:

TARGET_TABLE_NAME_EXPRESSION = 
    CASE $database_name 
        WHEN 'Suppliers' THEN 'supp_' || $table_name::STRING 
        ELSE 'dist_' || $table_name::STRING 
    END

UPSOLVER_EVENT_TIME_COLUMN

Type: text

Values: <column_name>

(Optional) Upsolver uses distributed locking technology to keep events strongly ordered. The ingestion timestamp is stored in a dedicated column reflecting the order, which is UPSOLVER_EVENT_TIME by default unless specified here.

WRITE_INTERVAL

Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }

Determines how often the job writes the CDC data to the REPLICATION_TARGET specified within the group. Frequent writes provide up-to-date information but may be costly, especially for Snowflake.

The writes take place over a set period of time defined by this interval and they must be divisible by the number of hours in a day.

For example, you can set WRITE_INTERVAL to 2 hours (the job runs 12 times per day), but trying to set WRITE_INTERVAL to 5 hours would fail since 24 hours is not evenly divisible by 5.

WRITE_MODE

Values: { MERGE | APPEND }

Defines whether the job should merge or append the data into the target. You have three options for writing data:

Replication Mode
Write Mode
Description

Exact replication

MERGE

Performs an exact replication in the target by using hard updates and deletes, i.e. data is overwritten and deleted in the target exactly as in the source.

Soft deletion

MERGE

Append only

APPEND

Note

At present, Amazon Redshift targets only support the MERGE write mode.


Example

Replicate CDC data

This example applies to all source and target combinations, with the exception of the WRITE_MODE = APPEND replication group option, which is currently unavailable for Redshift (use MERGE instead):

CREATE REPLICATION JOB replicate_orders_to_target
  COMMENT = 'Replicate CDC data to multiple groups'
  COMPUTE_CLUSTER = "Default Compute (Free)"
  INTERMEDIATE_STORAGE_CONNECTION = s3_connection
  INTERMEDIATE_STORAGE_LOCATION = 's3://upsolver-integration-tests/test/' 
FROM my_database_connection 
// Add PostgreSQL source options here:
   // PUBLICATION_NAME = 'orders_publication' 
   // HEARTBEAT_TABLE = 'orders.heartbeat'
WITH REPLICATION GROUP replicate_to_prod 
  INCLUDED_TABLES_REGEX = ('orders\..*')
  EXCLUDED_COLUMNS_REGEX = ('.*\.creditcard') -- exclude creditcard columns 
  COMMIT_INTERVAL = 5 MINUTES
  REPLICATION_TARGET = my_target_connection
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS'
  TARGET_TABLE_NAME_EXPRESSION = $table_name
  UPSOLVER_EVENT_TIME_COLUMN = event_timestamp
  WRITE_MODE = MERGE 
    // The flag column to denote deletion in the source
    LOGICAL_DELETE_COLUMN = is_deleted
WITH REPLICATION GROUP replicate_to_dev 
  INCLUDED_TABLES_REGEX = ('orders\..*')
  COMMIT_INTERVAL = 1 HOUR
  REPLICATION_TARGET = my_target_connection
  TARGET_TABLE_NAME_EXPRESSION = 'history_' || $table_name
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_DEV'
  UPSOLVER_EVENT_TIME_COLUMN = event_timestamp   
  WRITE_MODE = APPEND -- Use MERGE for Redshift targets 
    // The column that records the change type, e.g. insert, update, delete
    OPERATION_TYPE_COLUMN = operation_type;

This example creates a replication job named replicate_orders_to_target that writes to two replication groups:

  • replicate_to_prod for production

  • replicate_to_dev for development

To configure your database source, change the my_database_connection value to be the name of your connection. If you are using PostgreSQL, please include your additional source options.

To set your target, change the REPLICATION_TARGET value to the name of your Amazon Redshift, AWS Glue Data Catalog, Snowflake, or Polaris connection.

In our example above, each replication group has its own set of options that can be configured differently while streaming from the same data source:

  • The production replication group has a COMMIT_INTERVAL of 5 MINUTES to keep the target frequently updated, whereas the development group has an interval of 1 HOUR to update less often. Update frequency may have an impact on cloud compute costs when writing to targets such as Snowflake.

  • Furthermore, the production group will MERGE the data, using the column is_deleted to flag if a row is deleted in the source. The development group will APPEND the data, using the operation_type column specified in the OERATION_TYPE_COLUMN option to audit whether the operation in the source database was an insert, update, or delete.

  • Both groups replicate all tables in the orders schema, as specified in the INCLUDED_TABLES_REGEX option. However, all creditcard columns are excluded from the production target by using the EXCLUDED_COLUMNS_REGEX option to remove PII.

  • In the development group, the TARGET_SCHEMA_NAME_EXPRESSION option includes the prefix value of history_. This means that Upsolver will create the target table using the name of the source table and the prefix so it is clear that the target tables in this group are used for development purposes.

Last updated