Replicate CDC Data into Snowflake
This guide takes you through the steps to ingest your data from your CDC database and write it to Snowflake.
This guide uses PostgreSQL as the example source for the CDC data. However, the principles are the same for MySQL.
One script, multiple jobs
Replicating your operational database to a BI solution for analysis can be a time-intensive process. Upsolver provides the ability to write one job template that generates a dedicated job for each table within your replication. This time-saving feature is helpful when you have a large number of tables, removing the need to manually write and manage a job for each table.
Using the write-ahead log (WAL) in PostgreSQL - or the binlog for a MySQL source - Upsolver ingests changed data from your published database into a staging table in the data lake. Jobs are then executed at a specified interval to write the data to the target Snowflake database, ready for analysis and reporting. Each table has its own job, or sub-job, within the main job, which Upsolver manages.
Snowflake schema evolution
Upsolver can use the template to create each table in your destination Snowflake database if it doesn't already exist. Then, using schema evolution, Upsolver automatically manages changes to tables and columns so that user intervention is not required every time a structural change is made in the source database. If a new column appears in the schema, Upsolver will add the column to the target table. When new a table is created in the source database, Upsolver automatically creates a new job to manage the change.
Furthermore, Upsolver handles changes in data type, preventing a job from breaking when an unexpected value fails to write into an existing column with a conflicting type.
Monitor the jobs
Additionally, you can use the system tables to monitor your new jobs to ensure data is flowing as expected. The system tables expose metadata to enable you to monitor performance and deal with any unexpected issues.
Prerequisites
Ensure you enable CDC for your PostgreSQL database prior to following this guide.
The steps for creating the sub-jobs for your replication are as follows:
Connect to PostgreSQL
Create a staging table to store the CDC data
Create a job to ingest the CDC data into your staging table
Connect to Snowflake
Create a job to merge into Snowflake and generate a sub-job for each target table
Monitor the replication progress
Step 1
Connect to PostgreSQL
The first step is to connect to the database from which you want to ingest your CDC data. You will need the connection string to your PostgreSQL database, and the username and password.
Here's the code:
Step 2
Create a staging table to store the CDC data
After connecting to your source PostgreSQL database, the next step is to create a table in the data lake for staging the CDC data.
Here's the code to create the staging table:
Let's understand what this code does.
A table named postgres_to_snowflake_staging is created to store the CDC changes that will be ingested from the PostgreSQL publication. Aside from the system columns, no columns have been defined, as Upsolver will create these automatically, deriving the names and types from the tables in the replication. The table will be a union of all tables and columns in the replication that is specified in the script in Step 3.
In order for Upsolver to generate a sub-job based on each source table, the staging table must be partitioned by $full_table_name
, $schema_name
, and $table_name
. In this example, $event_date
is also included in the partition but it doesn't need to be defined as it is created automatically by Upsolver. Furthermore, a $primary_key
column is specified as this will be used in the MERGE statement in the main job that will be created in Step 5 but is not required to form the partition.
Looking at the table options, the DISABLE_COMPACTION
option is set to true in order to instruct Upsolver to disable the compaction process for this table. The data does not need to be retained in the staging table for longer than a week, so the TABLE_DATA_RETENTION
option has been set to 7 days. Upsolver will delete rows that pass the retention point using the $event_date
as the time marker as specified in RETENTION_DATE_PARTITION
.
Lastly, EXPOSE_IN_CATALOG
is set to false, meaning the table won't be available to the AWS Glue Data Catalog and can't be queried. The table is a union of all tables in the replication, so it is highly likely that there will be too many columns for querying to be supported, even if this was something you wanted to do.
Learn More
To learn more about table options and data retention, please see CREATE TABLE.
Step 3
Create a job to ingest the CDC data
Create your publication in PostgreSQL if you've not already done so. Then, run the following script to create a job to copy the changed data into your staging table.
Here's the code to create an ingestion job:
Let's take a look at what this code does.
This code creates an ingestion job named copy_postgresql_to_staging that will copy the changed data from PostgreSQL into the staging table postgres_to_snowflake_staging that was created in Step 2.
In this example, PUBLICATION_NAME
tells the job to use the orderspublication. Upsolver will take a snapshot of each table and then continue to update the data using the change log. You can specify which tables to ingest using the TABLE_INCLUDE_LIST
source option. In this example, four tables, customers, items, categories, and invoices - all in the orders schema - have been selected for inclusion. You can also use a regular expression to select your source tables. The COLUMN_EXCLUDE_LIST
tells Upsolver to ignore two columns in the customers table: email and phone. These will be excluded from the ingestion and won't appear in the staging table.
Learn More
Step 4
Connect to Snowflake
The next step is to create a connection to the target Snowflake database. You will need your Snowflake connection string, and the username and password.
Here's the code to connect to Snowflake:
Step 5
Create a job to merge into Snowflake and create the target tables
Next, create the main job that will generate a dedicated job for each table in the replication. This job doesn't run, rather, it acts as the template for the sub-jobs.
Here's the code to generate a sub-job for each table in your replication:
Let's understand what this code does.
This script is the template for creating a sub-job for each table in the replication. As the main job, postgres_to_snowflake_output will not execute, however, each sub-job that it generates will run and apply the changes (inserts, updates, and deletes) from the staging table to the destination table in Snowflake.
The three job options defined in the code will be applied to each sub-job. Firstly, the RUN_INTERVAL
determines that the job execution window will every 5 MINUTES. Next, CREATE_TABLE_IF_MISSING
is set to true to enable the destination table to be created in the Snowflake database if not already done so. This is very useful when you have anything more than a handful of tables to manage. Further to this, ADD_MISSING_COLUMNS
is set to true, ensuring that new columns added to the source schema in the future will also be added to the corresponding table in Snowflake.
You will notice that the MERGE INTO
line includes a placeholder for the target table name: {UPSOLVER_TABLE_NAME_<GUID>}
. This placeholder tells Upsolver to use each table name from the staging table as the target in the Snowflake database. In this example, all tables are in the orders schema, but optionally you can provide a placeholder for the schema name as follows:
You will need to replace <GUID>
with a unique name that is not used elsewhere. The name can be a mix of letters and numbers, e.g. {UPSOLVER_SCHEMA_NAME_73HS8C3A}.{UPSOLVER_TABLE_NAME_73HS8C3A}
.
Looking at the SELECT statement, the $is_delete
flag is only used within the MERGE
statement to check for deleted columns that need to be removed from the target table. The columns UPSOLVER_PRIMARY_KEY
and UPSOLVER_EVENT_TIME
are created in each target table in Snowflake.
Upsolver requires the UPSOLVER_PRIMARY_KEY
column to uniquely identify each row for insert, update, or delete, and the UPSOLVER_EVENT_TIME
column is an ingestion timestamp used by Upsolver to record the order of source events. The column UPSOLVER_TABLE_NAME_<GUID>
is required to identify the name of each table in order to match between source and target.
Step 6
Monitor the replication progress
You can easily monitor your jobs and sub-jobs with Upsolver. On the main menu on the left-hand side, click Jobs to open the All Jobs window.
Click on the ingestion job that you created in Step 3. When your job has finished initializing, you will see a list of the tables in your replication with a status of Streaming.
Scroll down the page to see more in-depth details about the job. Toggle the All button to display all metrics for your job. These metrics provide an overview of the data that is ingested from PostgreSQL and alert you to any errors occurring in the job that you should be aware of.
Click Back to All Jobs at the top of the page to return to All Jobs. Notice that you have one job listing for the main job you created in Step 5. Click on the job to open the metrics for all the sub-jobs beneath: you will see a job for each table in your publication.
Furthermore, you can view the full metadata for each sub-job using the system monitoring tables. When a sub-job is created, Upsolver prefixes the job GUID with the word virtual_, so you can quickly identify them in the system tables. On the main menu on the left-hand side, click Worksheets, then click New, then select Blank Worksheet. This opens a new window you can use to query the system tables.
Here's the code to view the sub-jobs:
In the results, you will see that each job shares the name of the main job with the schema and table name appended in brackets. You can adapt the above script to search for specific jobs based on the table and schema name.
Conclusion
In this guide, you learned how to create a template job for your replicated database in order to generate a sub-job for each target table in your . You saw how easy it is to write one script to auto-create a job for all the tables in your publication, as well as create each table in the target database.
Furthermore, you learned how Upsolver uses schema evolution to automatically synchronize schema changes from the source database with the target tables in Snowflake. Then, you learned how to monitor your jobs using the system tables.
Try it yourself
To create sub-jobs for your CDC data:
Connect to PostgreSQL
Create a staging table to store the CDC data
Create a job to ingest the CDC data into your staging table
Connect to Snowflake
Create a job to merge into Snowflake and generate a sub-job for each target table
Monitor the replication progress
Last updated