Replicate CDC Data into Snowflake
This guide takes you through the steps to ingest your data from your CDC database and write it to Snowflake.
Last updated
This guide takes you through the steps to ingest your data from your CDC database and write it to Snowflake.
Last updated
This guide uses PostgreSQL as the example source for the CDC data. However, the principles are the same for MySQL.
Replicating your operational database to a BI solution for analysis can be a time-intensive process. Upsolver provides the ability to write one job template that generates a dedicated job for each table within your replication. This time-saving feature is helpful when you have a large number of tables, removing the need to manually write and manage a job for each table.
Using the write-ahead log (WAL) in PostgreSQL - or the binlog for a MySQL source - Upsolver ingests changed data from your published database into a staging table in the data lake. Jobs are then executed at a specified interval to write the data to the target Snowflake database, ready for analysis and reporting. Each table has its own job, or sub-job, within the main job, which Upsolver manages.
Upsolver can use the template to create each table in your destination Snowflake database if it doesn't already exist. Then, using schema evolution, Upsolver automatically manages changes to tables and columns so that user intervention is not required every time a structural change is made in the source database. If a new column appears in the schema, Upsolver will add the column to the target table. When new a table is created in the source database, Upsolver automatically creates a new job to manage the change.
Furthermore, Upsolver handles changes in data type, preventing a job from breaking when an unexpected value fails to write into an existing column with a conflicting type.
Additionally, you can use the system tables to monitor your new jobs to ensure data is flowing as expected. The system tables expose metadata to enable you to monitor performance and deal with any unexpected issues.
Ensure you prior to following this guide.
The steps for creating the sub-jobs for your replication are as follows:
Connect to PostgreSQL
Create a staging table to store the CDC data
Create a job to ingest the CDC data into your staging table
Connect to Snowflake
Create a job to merge into Snowflake and generate a sub-job for each target table
Monitor the replication progress
The first step is to connect to the database from which you want to ingest your CDC data. You will need the connection string to your PostgreSQL database, and the username and password.
Here's the code:
After connecting to your source PostgreSQL database, the next step is to create a table in the data lake for staging the CDC data.
Here's the code to create the staging table:
Let's understand what this code does.
Looking at the table options, the DISABLE_COMPACTION
option is set to true in order to instruct Upsolver to disable the compaction process for this table. The data does not need to be retained in the staging table for longer than a week, so the TABLE_DATA_RETENTION
option has been set to 7 days. Upsolver will delete rows that pass the retention point using the $event_date
as the time marker as specified in RETENTION_DATE_PARTITION
.
Lastly, EXPOSE_IN_CATALOG
is set to false, meaning the table won't be available to the AWS Glue Data Catalog and can't be queried. The table is a union of all tables in the replication, so it is highly likely that there will be too many columns for querying to be supported, even if this was something you wanted to do.
Learn More
Here's the code to create an ingestion job:
Let's take a look at what this code does.
In this example, PUBLICATION_NAME
tells the job to use the orderspublication. Upsolver will take a snapshot of each table and then continue to update the data using the change log. You can specify which tables to ingest using the TABLE_INCLUDE_LIST
source option. In this example, four tables, customers, items, categories, and invoices - all in the orders schema - have been selected for inclusion. You can also use a regular expression to select your source tables. The COLUMN_EXCLUDE_LIST
tells Upsolver to ignore two columns in the customers table: email and phone. These will be excluded from the ingestion and won't appear in the staging table.
Learn More
The next step is to create a connection to the target Snowflake database. You will need your Snowflake connection string, and the username and password.
Here's the code to connect to Snowflake:
Next, create the main job that will generate a dedicated job for each table in the replication. This job doesn't run, rather, it acts as the template for the sub-jobs.
Here's the code to generate a sub-job for each table in your replication:
Let's understand what this code does.
This script is the template for creating a sub-job for each table in the replication. As the main job, postgres_to_snowflake_output will not execute, however, each sub-job that it generates will run and apply the changes (inserts, updates, and deletes) from the staging table to the destination table in Snowflake.
The three job options defined in the code will be applied to each sub-job. Firstly, the RUN_INTERVAL
determines that the job execution window will every 5 MINUTES. Next, CREATE_TABLE_IF_MISSING
is set to true to enable the destination table to be created in the Snowflake database if not already done so. This is very useful when you have anything more than a handful of tables to manage. Further to this, ADD_MISSING_COLUMNS
is set to true, ensuring that new columns added to the source schema in the future will also be added to the corresponding table in Snowflake.
You will notice that the MERGE INTO
line includes a placeholder for the target table name: {UPSOLVER_TABLE_NAME_<GUID>}
. This placeholder tells Upsolver to use each table name from the staging table as the target in the Snowflake database. In this example, all tables are in the orders schema, but optionally you can provide a placeholder for the schema name as follows:
You will need to replace <GUID>
with a unique name that is not used elsewhere. The name can be a mix of letters and numbers, e.g. {UPSOLVER_SCHEMA_NAME_73HS8C3A}.{UPSOLVER_TABLE_NAME_73HS8C3A}
.
Looking at the SELECT statement, the $is_delete
flag is only used within the MERGE
statement to check for deleted columns that need to be removed from the target table. The columns UPSOLVER_PRIMARY_KEY
and UPSOLVER_EVENT_TIME
are created in each target table in Snowflake.
Upsolver requires the UPSOLVER_PRIMARY_KEY
column to uniquely identify each row for insert, update, or delete, and the UPSOLVER_EVENT_TIME
column is an ingestion timestamp used by Upsolver to record the order of source events. The column UPSOLVER_TABLE_NAME_<GUID>
is required to identify the name of each table in order to match between source and target.
You can easily monitor your jobs and sub-jobs with Upsolver. On the main menu on the left-hand side, click Jobs to open the All Jobs window.
Scroll down the page to see more in-depth details about the job. Toggle the All button to display all metrics for your job. These metrics provide an overview of the data that is ingested from PostgreSQL and alert you to any errors occurring in the job that you should be aware of.
Furthermore, you can view the full metadata for each sub-job using the system monitoring tables. When a sub-job is created, Upsolver prefixes the job GUID with the word virtual_, so you can quickly identify them in the system tables. On the main menu on the left-hand side, click Worksheets, then click New, then select Blank Worksheet. This opens a new window you can use to query the system tables.
Here's the code to view the sub-jobs:
In the results, you will see that each job shares the name of the main job with the schema and table name appended in brackets. You can adapt the above script to search for specific jobs based on the table and schema name.
In this guide, you learned how to create a template job for your replicated database in order to generate a sub-job for each target table in your . You saw how easy it is to write one script to auto-create a job for all the tables in your publication, as well as create each table in the target database.
Furthermore, you learned how Upsolver uses schema evolution to automatically synchronize schema changes from the source database with the target tables in Snowflake. Then, you learned how to monitor your jobs using the system tables.
To create sub-jobs for your CDC data:
Connect to PostgreSQL
Create a staging table to store the CDC data
Create a job to ingest the CDC data into your staging table
Connect to Snowflake
Create a job to merge into Snowflake and generate a sub-job for each target table
Monitor the replication progress
A table named postgres_to_snowflake_staging is created to store the CDC changes that will be ingested from the PostgreSQL publication. Aside from the system columns, no columns have been defined, as Upsolver will create these automatically, deriving the names and types from the tables in the replication. The table will be a union of all tables and columns in the replication that is specified in the script in .
In order for Upsolver to generate a sub-job based on each source table, the staging table must be partitioned by $full_table_name
, $schema_name
, and $table_name
. In this example, $event_date
is also included in the partition but it doesn't need to be defined as it is created automatically by Upsolver. Furthermore, a $primary_key
column is specified as this will be used in the MERGE statement in the main job that will be created in but is not required to form the partition.
To learn more about table options and data retention, please see .
Create your in PostgreSQL if you've not already done so. Then, run the following script to create a job to copy the changed data into your staging table.
This code creates an ingestion job named copy_postgresql_to_staging that will copy the changed data from PostgreSQL into the staging table postgres_to_snowflake_staging that was created in .
For more information about job and source options, and for detailed examples, please see the SQL command references for , , , and .
Click on the ingestion job that you created in . When your job has finished initializing, you will see a list of the tables in your replication with a status of Streaming.
Click Back to All Jobs at the top of the page to return to All Jobs. Notice that you have one job listing for the main job you created in . Click on the job to open the metrics for all the sub-jobs beneath: you will see a job for each table in your publication.