Replicate CDC data to multiple targets - PostgreSQL to Snowflake

This guide will show you how to use replication jobs to copy your CDC data to multiple schemas in Snowflake.

One script, multiple targets

Replication jobs enable you to copy data from your source database and replicate it to multiple targets, reducing the need to write a job for each schema into which you want to write your data.

These special jobs use replication groups, so you can define the source and ingest the data once, and then create a replication group with its own requirements for each target. Although they share the same publication, each group operates independently: define a replication job that includes multiple targets, and Upsolver automatically creates a sub-job for each target.

In this guide, you will discover how to create a replication job that writes to two different schemas in Snowflake: one that is used for production reporting, and the other for development. Each group will share the source data, but write to the target at different intervals and perform different manipulations on the data.

Monitor your jobs

Furthermore, you will learn how to use the system tables to monitor your new jobs to ensure data is streaming as anticipated. An important component of job creation and administration, system tables expose metadata, enabling you to monitor performance and handle issues.

Prerequisites

Create a publication

Upsolver requires the publication of replicated tables in order to be able to subscribe to change events. The publication can be either on all tables or on specific tables. A publication of all tables can be created by a superuser using the following command:

CREATE PUBLICATION upsolver FOR ALL TABLES

Creating a publication on specific tables will not automatically include newly created tables. For instructions on how to create and manage publications for specific tables, see the PostgreSQL documentation.

Create a Heartbeat table

Follow these steps to create a heartbeat table:

  1. Create a heartbeat table:

CREATE TABLE IF NOT EXISTS <schema_name>.<table_name> (key int primary key, value timestamp)

2. Grant the necessary permissions for Upsolver to use the heartbeat table:

GRANT INSERT, UPDATE, SELECT on table <table_name> to <user>

3. If the publication used in the data source was created for specific tables (not all tables), add the heartbeat table to the publication. See the PostgreSQL documentation.

4. Add the heartbeat table to the data source in the data source creation wizard or in the data source properties.

Follow this guide for detailed instructions on configuring CDC: Prepare PostgreSQL for CDC.

The steps for creating the job for your replication and performing ongoing monitoring are as follows:

  1. Connect to PostgreSQL

  2. Connect to Snowflake

  3. Create a replication job to ingest data to multiple targets

  4. View the snapshot status

  5. Monitor your jobs using system tables

Step 1

Connect to PostgreSQL

Having created your publication in PostgreSQL, the first step is to connect to the database from which you want to ingest your CDC data. You will need the connection string to your PostgreSQL database, and your username and password.

Here's the code to connect to PostgreSQL:

-- Connect to your source PostgreSQL database
CREATE POSTGRES CONNECTION my_postgres_connection
CONNECTION_STRING = 'jdbc:postgresql://cdc2.c02eyuedjkh9.eu-west-1.rds.amazonaws.com:5432/Sales'
USER_NAME = 'your username'
PASSWORD = 'your password'

Step 2

Connect to Snowflake

Next, create a connection to your target database. You will need your Snowflake connection string, and your username and password.

Here's the code to connect to Snowflake:

-- Create a connection to your target Snowflake database
CREATE SNOWFLAKE CONNECTION my_snowflake_connection
CONNECTION_STRING = 'jdbc:snowflake://snowflakedemo.us-east-1.snowflakecomputing.com?db=REPORTING_DB&warehouse=REPORTING_WH&role=ADMIN'
USER_NAME = 'your username'
PASSWORD = 'your password';

Step 3

Create a replication job to ingest data to multiple targets

Ensure you have created the target schemas in your Snowflake database that you will be using in your replication job. This example uses the ORDERS_PROD and ORDERS_DEV schemas as targets.

The next step is to create a replication job that copies the CDC data from the Sales database in PostgreSQL to the ORDERS_PROD and ORDERS_DEV schemas in the REPORTING_DB database in Snowflake.

Here's the code to create the replication job:

CREATE REPLICATION JOB postgres_replication_to_snowflake
  COMMENT = 'Replicate Postgres CDC data to Snowflake groups'
  COMPUTE_CLUSTER = "Default Compute (Free)"
  INTERMEDIATE_STORAGE_CONNECTION = s3_connection
  INTERMEDIATE_STORAGE_LOCATION = 's3://upsolver-integration-tests/test/'  
FROM my_postgres_connection 
  PUBLICATION_NAME = 'sales_publication' 
  HEARTBEAT_TABLE = 'sales.heartbeat'
WITH REPLICATION GROUP replicate_to_snowflake_prod 
(
  INCLUDED_TABLES_REGEX = ('(prod)\..*') -- include all tables in the prod schema
  EXCLUDED_COLUMNS_REGEX = ('.*\.address') -- exclude address columns 
  LOGICAL_DELETE_COLUMN = "is_deleted"
  REPLICATION_TARGET = my_snowflake_connection
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_PROD' 
  TARGET_TABLE_NAME_EXPRESSION = $table_name  
  WRITE_INTERVAL = 5 MINUTES
  WRITE_MODE = MERGE 
)
WITH REPLICATION GROUP replicate_to_snowflake_dev 
(
  INCLUDED_TABLES_REGEX = ('(prod)\..*')  -- include all tables in the prod schema
  REPLICATION_TARGET = my_snowflake_connection
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_DEV' 
  TARGET_TABLE_NAME_EXPRESSION = 'history_' || $table_name    
  WRITE_INTERVAL = 4 HOURS
  WRITE_MODE = APPEND 
);

Let's understand what this job does.

If you have previously created ingestion and transformation jobs in Upsolver, you may notice that this code uses a job type of REPLICATION: this keyword instructs Upsolver to create a sub-job for each replication group.

In this example, we create a replication job named postgres_replication_to_snowflake, with an optional COMMENT to describe what this job does. The COMPUTE_CLUSTER specifies which cluster should run the job if you have more than one cluster, otherwise, the default cluster is used.

The job includes two source options: the PUBLICATION_NAME option defines the publication to use, which should already be created on your PostgreSQL database. In this example, we use a publication named sales_publication. Furthermore, the HEARTBEAT_TABLE option specifies the heartbeat table to use, in this case, sales.heartbeat. The options specified here are shared by both replication groups.

Upsolver will create a sub-job for each replication group, enabling each group to have a different job configuration. The ORDERS_PROD schema requires fresh data, so the WRITE_INTERVAL has been set to 5 MINUTES, whereas the ORDERS_DEV schema is not critical, so the data is updated every 4 HOURS.

Furthermore, the production group will MERGE the data. By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, the is_deleted column is added. The development group will APPEND the data.

Both groups replicate all tables in the prod schema, as specified in the INCLUDED_TABLES_REGEX option. However, all address columns are excluded from the production target in the EXCLUDED_COLUMNS_REGEX option to remove personally identifiable information (PII).

In the development group, the TARGET_SCHEMA_NAME_EXPRESSION option includes the prefix value of hist_. This means that Upsolver will create the target table using the name of the source table with the prefix so it is clear that the target tables in this group are used for development.

If you wanted to extend this code, you could add a third replication group for a UAT schema, for example.

For more information, see the Replication jobs SQL command reference.

Step 4

View the snapshot status

When you create your job, Upsolver immediately begins snapshotting the tables in your source database. You can monitor the snapshotting progress and observe the number of rows ingested as follows:

  1. After executing the CREATE REPLICATION JOB statement, click Jobs from the main menu in Upsolver, and you will see your new job listed. If you have many existing jobs, you can click Filters to search for your job, or click Date Created to order by most recent and see your new job. In this example, Upsolver creates the job postgres_replication_to_snowflake. This is the master job for the replication, and the From-To icon confirms that data streams from PostgreSQL to Snowflake.

  2. Click on the job name to open the Jobs page. Notice that Upsolver has created a sub-job - each with the name format <job name> - <replication group name> output job - for each replication group created in the job. There is also a copy job that performs the replication:

    • postgres_replication_to_snowflake copy job

    • postgres_replication_to_snowflake - replicate_to_snowflake_prod output job

    • postgres_replication_to_snowflake - replicate_to_snowflake_dev output job

  3. Click on the copy job to open the job status page. This job ingests the CDC data from PostgreSQL into a staging table in the Upsolver lake. Here you can monitor progress as Upsolver takes a snapshot of each table that was included in the job. The status of each table shows as Pending, Snapshotting, or Streaming. It may take a few minutes for this page to update when you first open it while Upsolver collects the monitoring information.

  4. Located under the tables group are the job metrics showing the Summary view as default. This displays Rows Pending Processing and Rows written (completed executions) to provide insight into the data ingestion process. Use the display toggle to change this view from Summary to All if you want to see all metrics. Click a metric to expand the information. You can then copy and paste the code from the See All Events section into a worksheet and run this at any time without needing to open up the job page.

  5. Click the Job Details button. This opens the Information screen to display the SQL syntax that created the job, the job Options and Details, and the Snapshot Status for each CDC table.

  6. Click the close icon to return to the job status page, then click Back to return to the Jobs page.

  7. You can then click into each of the sub-jobs and explore the job metrics.

Step 5

Monitor your jobs using system tables

The system tables provide information about your jobs to enable you to monitor progress, and diagnose issues. To view information about your replication job, query the jobs system table.

Here's the code:

SELECT * 
FROM system.information_schema.jobs
WHERE name LIKE 'postgres_replication_to_snowflake%';

This returns information about the master job, the replication (copy job), and each sub-job created for each replication group. The job_type column affirms the action of each job:

  • replication: the master job

  • copy: the copy job that performs the CDC replication into the staging table

  • merge: the WRITE_MODE option for the replication group is MERGE

  • insert: the WRITE_MODE option for the replication group is APPEND

The cdc_status system table lists each table in your replication copy job and includes the status, e.g. Streaming. If you included a heartbeat table in your job, this would also be listed.

Here's the code to view the CDC status of each table:

SELECT * 
FROM system.monitoring.cdc_status 
WHERE job_name LIKE 'postgres_replication_to_snowflake%';

Conclusion

In this guide you learned how to replicate your change data capture tables from PostgreSQL into multiple target schemas in your Snowflake database. You saw how easy it is to create a single job with multiple replication groups that support different options in the target schema. Then you learned how to view the snapshotting process and ensure your CDC data was replicating as expected, and you queried the system tables to perform ongoing job monitoring.

Try it yourself

To ingest your CDC data from PostgreSQL into multiple targets in Snowflake:

  1. Connect to your CDC-enabled PostgreSQL database

  2. Connect to your target Snowflake database

  3. Create a replication job with a replication group for each target schema

  4. Monitor the snapshotting process and observe data ingestion

  5. Monitor your jobs using the job status metrics

Last updated