Replicate your database data (CDC) into your warehouse - PostgreSQL to Snowflake

This guide takes you through the steps to ingest your data from PostgreSQL (CDC) and write it to Snowflake.

One script, multiple jobs

Replicating your operational database to a BI solution for analysis can be a time-intensive process. Upsolver provides the ability to write one job template that generates a dedicated job for each table within your replication. This time-saving feature is helpful when you have a large number of tables, removing the need to manually write and manage a job for each table.

Using the write-ahead log (WAL) in PostgreSQL - or the binlog for a MySQL source - Upsolver ingests changed data from your published database into a staging table in the data lake. Jobs are then executed at a specified interval to write the data to the target Snowflake database, ready for analysis and reporting. Each table has its own job, or sub-job, within the main job, which Upsolver manages.

Snowflake schema evolution

Upsolver can use the template to create each table in your destination Snowflake database if it doesn't already exist. Then, using schema evolution, Upsolver automatically manages changes to tables and columns so that user intervention is not required every time a structural change is made in the source database. If a new column appears in the schema, Upsolver will add the column to the target table. When new a table is created in the source database, Upsolver automatically creates a new job to manage the change.

Furthermore, Upsolver handles changes in data type, preventing a job from breaking when an unexpected value fails to write into an existing column with a conflicting type.

Monitor the jobs

Additionally, you can use the system tables to monitor your new jobs to ensure data is flowing as expected. The system tables expose metadata to enable you to monitor performance and deal with any unexpected issues.

Prerequisites

Ensure you have configured your PostgreSQL database for replication prior to following this guide. More information can be found in this guide: Prepare PostgreSQL for CDC.

The steps for creating the sub-jobs for your replication are as follows:

  1. Connect to PostgreSQL

  2. Create a staging table to store the CDC data

  3. Create a job to ingest the CDC data into your staging table

  4. Connect to Snowflake

  5. Create a job to merge into Snowflake and generate a sub-job for each target table

  6. Monitor the replication progress

Step 1

Connect to PostgreSQL

The first step is to connect to the database from which you want to ingest your CDC data. You will need the connection string to your PostgreSQL database, and the username and password.

Here's the code:

-- Connect to your source database
CREATE POSTGRES CONNECTION my_postgres_connection
CONNECTION_STRING = 'jdbc:postgresql://cdc2.c02eyuedjkh9.eu-west-1.rds.amazonaws.com:5432/auction'
USER_NAME = 'your username'
PASSWORD = 'your password'

Step 2

Create a staging table to store the CDC data

After connecting to your source PostgreSQL database, the next step is to create a table in the data lake for staging the CDC data.

Here's the code to create the staging table:

-- Create a staging table for the changed data
CREATE TABLE default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging(
    $full_table_name STRING, 
    $schema_name STRING,
    $table_name STRING, 
    $primary_key STRING
)
PARTITION BY $full_table_name, $schema_name, $table_name, $event_date
DISABLE_COMPACTION = true
TABLE_DATA_RETENTION = 7 DAYS
RETENTION_DATE_PARTITION = $event_date
EXPOSE_IN_CATALOG = false;

Let's understand what this code does.

A table named postgres_to_snowflake_staging is created to store the CDC changes that will be ingested from the PostgreSQL publication. Aside from the system columns, no columns have been defined, as Upsolver will create these automatically, deriving the names and types from the tables in the replication. The table will be a union of all tables and columns in the replication that are specified in the script in Step 3.

In order for Upsolver to generate a sub-job based on each source table, the staging table must be partitioned by $full_table_name, $schema_name, and $table_name. In this example, $event_date is also included in the partition but it doesn't need to be defined as it is created automatically by Upsolver. Furthermore, a $primary_key column is specified as this will be used in the MERGE statement in the main job that will be created in Step 5 but is not required to form the partition.

Looking at the table options, the DISABLE_COMPACTION option is set to true in order to instruct Upsolver to disable the compaction process for this table. The data does not need to be retained in the staging table for longer than a week, so the TABLE_DATA_RETENTION option has been set to 7 days. Upsolver will delete rows that pass the retention point using the $event_date as the time marker as specified in RETENTION_DATE_PARTITION.

Lastly, EXPOSE_IN_CATALOG is set to false, meaning the table won't be available to the AWS Glue Data Catalog and can't be queried. The table is a union of all tables in the replication, so it is highly likely that there will be too many columns for querying to be supported, even if this was something you wanted to do.

For more information about table options and to learn about data retention, please see CREATE TABLE.

Step 3

Create a job to ingest the CDC data

Create your publication in PostgreSQL if you've not already done so. Read the Prerequisites for PostgreSQL article for more information. Then, run the following script to create a job to copy the changed data into your staging table.

Here's the code to create an ingestion job:

-- Create a job to copy the changes from the source database into the staging table
CREATE SYNC JOB copy_postgresql_to_staging
    PUBLICATION_NAME = 'orderspublication'
AS COPY FROM POSTGRES my_postgres_connection
    TABLE_INCLUDE_LIST =  ('orders.customers|orders.items|orders.categories|orders.invoices')
    COLUMN_EXCLUDE_LIST = ('orders.customers.email', 'orders.customers.phone')
INTO default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging;

Let's take a look at what this code does.

This code creates an ingestion job named copy_postgresql_to_staging that will copy the changed data from PostgreSQL into the staging table postgres_to_snowflake_staging that was created in Step 2.

In this example, PUBLICATION_NAME tells the job to use the orderspublication. Upsolver will take a snapshot of each table and then continue to update the data using the change log. You can specify which tables to ingest using the TABLE_INCLUDE_LIST source option. In this example, four tables, customers, items, categories, and invoices - all in the orders schema - have been selected for inclusion. You can also use a regular expression to select your source tables. The COLUMN_EXCLUDE_LIST tells Upsolver to ignore two columns in the customers table: email and phone. These will be excluded from the ingestion and won't appear in the staging table.

For more information regarding job and source options, see the PostgreSQL ingestion jobs reference.

Step 4

Connect to Snowflake

The next step is to create a connection to the target Snowflake database. You will need your Snowflake connection string, and the username and password.

Here's the code to connect to Snowflake:

-- Create a connection to your target Snowflake database
CREATE SNOWFLAKE CONNECTION my_snowflake_connection
CONNECTION_STRING = 'jdbc:snowflake://snowflakedemo.us-east-1.snowflakecomputing.com?db=DEMO_DB&warehouse=DEMO_WH&role=ADMIN'
USER_NAME = 'your username'
PASSWORD = 'your password';

Step 5

Create a job to merge into Snowflake and create the target tables

Next, create the main job that will generate a dedicated job for each table in the replication. This job doesn't run, rather, it acts as the template for the sub-jobs.

Here's the code to generate a sub-job for each table in your replication:

-- Create a main job that generates a sub-job for each table
CREATE SYNC JOB postgres_to_snowflake_output
       RUN_INTERVAL = 5 MINUTES
       CREATE_TABLE_IF_MISSING = true
       ADD_MISSING_COLUMNS = true
AS MERGE INTO SNOWFLAKE my_snowflake_connection.orders.{UPSOLVER_TABLE_NAME_<GUID>} AS target
USING (
       SELECT *,       
       $is_delete::BOOLEAN AS UPSOLVER_IS_DELETE_<GUID>,
       COALESCE($primary_key::STRING, UUID_GENERATOR('')) AS UPSOLVER_PRIMARY_KEY,
       $event_time::STRING AS UPSOLVER_EVENT_TIME,
       UPPER($table_name) AS UPSOLVER_TABLE_NAME_<GUID>    
       FROM default_glue_catalog.upsolver_samples.postgres_to_snowflake_staging
       WHERE time_filter()
) AS source
ON source.UPSOLVER_PRIMARY_KEY = target.UPSOLVER_PRIMARY_KEY
WHEN MATCHED AND UPSOLVER_IS_DELETE_<guid> THEN DELETE
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

Let's understand what this code does.

This script is the template for creating a sub-job for each table in the replication. As the main job, postgres_to_snowflake_output will not execute, however, each sub-job that it generates will run and apply the changes (inserts, updates, and deletes) from the staging table to the destination table in Snowflake.

The three job options defined in the code will be applied to each sub-job. Firstly, the RUN_INTERVAL determines that the job execution window will every 5 MINUTES. Next, CREATE_TABLE_IF_MISSING is set to true to enable the destination table to be created in the Snowflake database if not already done so. This is very useful when you have anything more than a handful of tables to manage. Further to this, ADD_MISSING_COLUMNS is set to true, ensuring that new columns added to the source schema in the future will also be added to the corresponding table in Snowflake.

You will notice that the MERGE INTO line includes a placeholder for the target table name: {UPSOLVER_TABLE_NAME_<GUID>}. This placeholder tells Upsolver to use each table name from the staging table as the target in the Snowflake database. In this example, all tables are in the orders schema, but optionally you can provide a placeholder for the schema name as follows:

MERGE INTO SNOWFLAKE my_snowflake_connection.{UPSOLVER_SCHEMA_NAME_<GUID>}.{UPSOLVER_TABLE_NAME_<GUID>}

You will need to replace <GUID> with a unique name that is not used elsewhere. The name can be a mix of letters and numbers, e.g. {UPSOLVER_SCHEMA_NAME_73HS8C3A}.{UPSOLVER_TABLE_NAME_73HS8C3A}.

Looking at the SELECT statement, the $is_delete flag is only used within the MERGE statement to check for deleted columns that need to be removed from the target table. The columns UPSOLVER_PRIMARY_KEY and UPSOLVER_EVENT_TIME are created in each target table in Snowflake.

Upsolver requires the UPSOLVER_PRIMARY_KEY column to uniquely identify each row for insert, update, or delete, and the UPSOLVER_EVENT_TIME column is an ingestion timestamp used by Upsolver to record the order of source events. The column UPSOLVER_TABLE_NAME_<GUID> is required to identify the name of each table in order to match between source and target.

Step 6

Monitor the replication progress

You can easily monitor your jobs and sub-jobs with Upsolver. On the main menu on the left-hand side, click Jobs to open the All Jobs window.

Click on the ingestion job that you created in Step 3. When your job has finished initializing, you will see a list of the tables in your replication with a status of Streaming.

Scroll down the page to see more in-depth details about the job. Toggle the All button to display all metrics for your job. These metrics provide an overview of the data that is ingested from PostgreSQL and alert you to any errors occurring in the job that you should be aware of.

Click Back to All Jobs at the top of the page to return to All Jobs. Notice that you have one job listing for the main job you created in Step 5. Click on the job to open the metrics for all the sub-jobs beneath: you will see a job for each table in your publication.

Furthermore, you can view the full metadata for each sub-job using the system monitoring tables. When a sub-job is created, Upsolver prefixes the job GUID with the word virtual_, so you can quickly identify them in the system tables. On the main menu on the left-hand side, click Worksheets, then click New, then select Blank Worksheet. This opens a new window you can use to query the system tables.

Here's the code to view the sub-jobs:

-- Return a list of sub-jobs from the system monitoring table
SELECT * 
FROM system.monitoring.jobs
WHERE SUBSTRING(job_id, 1, 8) = 'virtual_'  
LIMIT 10;

In the results, you will see that each job shares the name of the main job with the schema and table name appended in brackets. You can adapt the above script to search for specific jobs based on the table and schema name.

Conclusion

In this guide, you learned how to create a template job for your replicated database in order to generate a sub-job for each target table in Snowflake. You saw how easy it is to write one script to auto-create a job for all the tables in your publication, as well as create each table in the target database. Furthermore, you learned how Upsolver uses schema evolution to automatically synchronize schema changes from the source database with the target tables in Snowflake. Then, you saw how to monitor your jobs using the system tables.

Try it yourself

To create sub-jobs for your CDC data:

  1. Create a publication on your PostgreSQL database.

  2. Connect to your publication and create a staging table and job to ingest the changed data.

  3. Create a job to generate a sub-job for each table within the publication.

  4. Monitor your sub-jobs using the job-monitoring metrics.

Last updated