Replication
The page describes the options for creating a replication job.
Replication jobs copy change data capture (CDC) data from enabled databases into one or more target schemas in the following supported targets:
Amazon Redshift
AWS Glue Data Catalog
Polaris Catalog
Snowflake
The source data is ingested into a staging table that each replication group then reads from to update the target. While sharing the source data, replication groups can be individually configured using different intervals and options to write to a separate schema. Therefore you create one entity and Upsolver automatically generates a job for each replication group, managing the freshness of data in each target based on the commit interval that you specify.
Please note:
Target tables in Snowflake are created in upper case.
Target tables in Amazon Redshift are created in lower case.
Syntax
Job identifier
This is the name of your job, which should match the following format:
Job options
The following job options can be applied to all sources.:
Jump to
COMMENT
— editable
COMMENT
— editableType: text
(Optional) A description or comment regarding this job.
COMPUTE_CLUSTER
— editable
COMPUTE_CLUSTER
— editableType: identifier
Default: The sole cluster in your environment
(Optional) The compute cluster to run this job.
This option can only be omitted when there is only one cluster in your environment.
If you have more than one compute cluster, you need to determine which one to use through this option.
INTERMEDIATE_STORAGE_CONNECTION
INTERMEDIATE_STORAGE_CONNECTION
Type: text
The name of the connection you are using to stage the data ingested from your CDC source.
INTERMEDIATE_STORAGE_LOCATION
INTERMEDIATE_STORAGE_LOCATION
Type: text
The name of the location you are using to stage the data ingested from your CDC source.
SNAPSHOT_PARALLELISM
SNAPSHOT_PARALLELISM
Type: int
Default: 1
(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, running more snapshots in parallel increases the load on the source database.
Source options
The following source options apply to PostgreSQL:
PostgreSQL source options:
HEARTBEAT_TABLE
HEARTBEAT_TABLE
Type: string
(Optional) If it is not set, no heartbeat table is used. Using a heartbeat table is recommended to avoid the replication slot growing indefinitely when no CDC events are captured for the subscribed tables.
PUBLICATION_NAME
PUBLICATION_NAME
Type: text
Adds a new publication to the current database. The publication name must be distinct from the name of any existing publication in the current database. DDL will be filtered.
Replication group options
A replication group must include INCLUDED_TABLES_REGEX
(and optionally EXCLUDED_COLUMNS_REGEX
) or INCLUDED_SCHEMA_DEFINITION
(and optionally EXCLUDED_SCHEMA_DEFINITION
).
The following replication group options apply to all data sources with the exception of the differences noted in the option descriptions below.
Jump to
COMMIT_INTERVAL
COMMIT_INTERVAL
Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default: 1 MINUTE
(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.
INCLUDED_TABLES_REGEX
INCLUDED_TABLES_REGEX
Values: ('regexFilter1', 'regexFilter2'
)
One or more regular expressions that define which tables should be included in the replication.
For example, ('dbo.prod_.*') includes all tables in the dbo schema prefixed with prod_.
EXCLUDED_COLUMNS_REGEX
EXCLUDED_COLUMNS_REGEX
Values: ('regexFilter1', 'regexFilter2'
)
(Optional) Can only be included when INCLUDED_TABLES_REGEX
is supplied. One or more regular expressions to define which columns should be excluded from the replication.
For example, ('dbo.*.address_') excludes all columns in all tables prefixed with address_.
INCLUDED_SCHEMA_DEFINITION
INCLUDED_SCHEMA_DEFINITION
Type: text
One or more expressions that define which tables should be included in the replication.
For example, *, schema.*, schema.table.*, or schema.table.column.
EXCLUDED_SCHEMA_DEFINITION
EXCLUDED_SCHEMA_DEFINITION
Type: text
(Optional) Can only be included when INCLUDED_SCHEMA_DEFINITION
is supplied. One or more expressions to determine which tables should be excluded from the replication.
For example, schema.*, schema.table.
COLUMN_TRANSFORMATIONS
COLUMN_TRANSFORMATIONS
Values: ( <column> = <expression>, ...)
(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.
However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.
If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream.
LOGICAL_DELETE_COLUMN
LOGICAL_DELETE_COLUMN
Type: text
(Optional) By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, specify the column name in which Upsolver will mark them as deleted.
The column will be named UPSOLVER_IS_DELETED unless specified otherwise.
This option is valid only when using WRITE_MODE
= MERGE
.
OPERATION_TYPE_COLUMN
OPERATION_TYPE_COLUMN
Type: text
(Optional) When you choose to append data in the target, Upsolver creates a column to store the operation type that occurred on the source database, e.g. insert, update, delete.
By default, this column is named UPSOLVER_RECORD_TYPE. To rename this column, use the OPERATION_TYPE_COLUMN
option and specify the column name.
This option is valid only when using WRITE_MODE
= APPEND
.
PRIMARY_KEY_COLUMN
PRIMARY_KEY_COLUMN
Type: text
Values: <column_name>
(Optional) Defines the column name to be created and used to store the original primary key value of the source tables. If not specified, the column name UPSOLVER_PRIMARY_KEY will be used.
REPLICATION_TARGET
REPLICATION_TARGET
Type: text
Values: <connection_name>
Specify the name of the target connection to write the data. The connection must be created prior to creating your job.
TARGET_SCHEMA_NAME_EXPRESSION
TARGET_SCHEMA_NAME_EXPRESSION
Type: expression
Specify the name of the target schema in Snowflake that the replication group is writing to. The following inputs are supported: $database_name
, $schema_name
, $full_table_name
and $table_name
. If you are ingesting from MongoDB, you can also use the $collection_name
input.
As well as using the exact name of the schema, for example, TARGET_SCHEMA_NAME_EXPRESSION = 'SALES'
, you could also write a case statement to define the target schema, for example:
Similarly, you can use the name of the database to define the target schema as follows:
TARGET_TABLE_NAME_EXPRESSION
TARGET_TABLE_NAME_EXPRESSION
Type: expression
Define how you want Upsolver to name the target tables in Snowflake. The following inputs are supported: $database_name
, $schema_name
, $full_table_name
and $table_name
. If you are ingesting from MongoDB, you can also use the $collection_name
input.
Optionally, you can include a prefix for the table name. Use in conjunction with $full_table_name
, $table_name,
or $collection_name
to create the target table using the name of the source table or collection. For example, to create the table in Snowflake with the prefix sales_, followed by the name of the source table, use the expression:
Furthermore, you could include a CASE
statement to determine the name of the target table based on the source database:
UPSOLVER_EVENT_TIME_COLUMN
UPSOLVER_EVENT_TIME_COLUMN
Type: text
Values: <column_name>
(Optional) Upsolver uses distributed locking technology to keep events strongly ordered. The ingestion timestamp is stored in a dedicated column reflecting the order, which is UPSOLVER_EVENT_TIME
by default unless specified here.
WRITE_INTERVAL
WRITE_INTERVAL
Type: <integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Determines how often the job writes the CDC data to the REPLICATION_TARGET
specified within the group. Frequent writes provide up-to-date information but may be costly, especially for Snowflake.
The writes take place over a set period of time defined by this interval and they must be divisible by the number of hours in a day.
For example, you can set WRITE_INTERVAL
to 2 hours (the job runs 12 times per day), but trying to set WRITE_INTERVAL
to 5 hours would fail since 24 hours is not evenly divisible by 5.
WRITE_MODE
WRITE_MODE
Values: { MERGE | APPEND }
Defines whether the job should merge or append the data into the target. You have three options for writing data:
Exact replication
MERGE
Performs an exact replication in the target by using hard updates and deletes, i.e. data is overwritten and deleted in the target exactly as in the source.
This option must include the UPSOLVER_EVENT_TIME_COLUMN
so that Upsolver can retain the order of source events.
Soft deletion
MERGE
Applies a soft delete approach by using the LOGICAL_DELETE_COLUMN
to flag when a row is deleted in the source database, without deleting it from the target. Updates are applied in the target per the source.
This option must also include the UPSOLVER_EVENT_TIME_COLUMN
so that Upsolver can retain the order of source events.
Append only
APPEND
Changes are appended to the target, so no updates or delete are applied. The OPERATION_TYPE_COLUMN
records when the operation was an insert, update, or delete.
This option must also include the UPSOLVER_EVENT_TIME_COLUMN
so that Upsolver can retain the order of source events.
Note
At present, Amazon Redshift targets only support the MERGE
write mode.
Example
Replicate CDC data
This example applies to all source and target combinations, with the exception of the WRITE_MODE = APPEND
replication group option, which is currently unavailable for Redshift (use MERGE
instead):
This example creates a replication job named replicate_orders_to_target that writes to two replication groups:
replicate_to_prod for production
replicate_to_dev for development
To configure your database source, change the my_database_connection value to be the name of your connection. If you are using PostgreSQL, please include your additional source options.
To set your target, change the REPLICATION_TARGET
value to the name of your Amazon Redshift, AWS Glue Data Catalog, Snowflake, or Polaris connection.
In our example above, each replication group has its own set of options that can be configured differently while streaming from the same data source:
The production replication group has a
COMMIT_INTERVAL
of 5 MINUTES to keep the target frequently updated, whereas the development group has an interval of 1 HOUR to update less often. Update frequency may have an impact on cloud compute costs when writing to targets such as Snowflake.Furthermore, the production group will
MERGE
the data, using the column is_deleted to flag if a row is deleted in the source. The development group willAPPEND
the data, using the operation_type column specified in theOERATION_TYPE_COLUMN
option to audit whether the operation in the source database was an insert, update, or delete.Both groups replicate all tables in the orders schema, as specified in the
INCLUDED_TABLES_REGEX
option. However, all creditcard columns are excluded from the production target by using theEXCLUDED_COLUMNS_REGEX
option to remove PII.In the development group, the
TARGET_SCHEMA_NAME_EXPRESSION
option includes the prefix value of history_. This means that Upsolver will create the target table using the name of the source table and the prefix so it is clear that the target tables in this group are used for development purposes.
Last updated