PostgreSQL CDC database replication

This article describes how to use Upsolver to replicate a PostgeSQL database.

Supported Versions

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL database versions 10, 11, 12 and 13 are supported. This includes managed DB instances hosted by AWS RDS and Aurora.

Prerequisites

Database configuration

CDC uses logical replication to get change events from the WAL (Write-Ahead Log). So, you should make sure logical replication is enabled.

You can check the replication method of your database by running:

SHOW wal_level;

This query should return "logical". If it returns any other value please change the replication method:

  • For self hosted Postgres:

ALTER SYSTEM SET wal_level = logical;
  • For RDS / Aurora Postgres: set the “logical_replication” parameter in AWS RDS console to "1".

The database must be restarted for this change to take effect.

Permissions

In order for Upsolver to read the WAL and the initial state of the database, the CDC data source requires:

  1. A user with replication and select permissions

  2. A Postgres publication for the replicated tables

Creating a CDC user

If you are using an AWS RDS Postgres instance please see the instructions below on how to create a CDC user. If you are using a self hosted instance please refer to Debezium's documentation.

We recommend creating a separate user for the CDC process, and grant that user replication and select permissions.

Replication permission can be granted using:

  • For self hosted Postgres:

ALTER ROLE <user> WITH REPLICATION;
  • For RDS / Aurora Postgres:

GRANT rds_replication TO <user>;

SELECT permissions can be granted either by giving permission to the entire schema, or, for each table individually:

 GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <user>; // For all tables
 GRANT SELECT ON <table_name> TO <user>; // For a specific table

Creating a publication

Upsolver requires the publication of replicated tables in order to be able to subscribe to change events. The publication can be either on all tables or on specific tables.

If the database contains tables that do not have Primary Keys, adding those tables to the publication will cause Postgres to block all updates and deletes for that table. This can be prevented in one of the following ways:

  • Manage the publication tables manually and don't include such tables in it.

  • Change the REPLICA IDENTITY of that table to FULL or INDEX. More information about this can be found in Postgres documentation.

  • Add a primary key to the table.

Creating a publication of all tables is the simplest method and will automatically include all tables that will be created in the future but can lead to issues if tables without primary keys are created without having a REPLICA IDENTITY configured.

A publication of all tables can be created by a superuser using the following command:

CREATE PUBLICATION upsolver FOR ALL TABLES

Creating a publication on specific tables will not automatically include newly created tables. For instructions on how to create and manage publication for specific tables see the Postgres documentation.

More information about permissions can be found in Debezium documentation.

Setting up a Heartbeat Table

The WAL in a Postgres instance is shared among all tables and databases on that instance. When at least one replication slot exists, Postges will wait until all replication slots confirm that a certain WAL event has been consumed before deleting it. Consequently, a replication slot that doesn't confirm any event for a long time might lead to the WAL growing excessively.

Upsolver is able to confirm an event only when receiving one. This means that when a data source doesn't contain any frequently changing table, Upsolver might not get an opportunity to confirm any events, thereby leading to high disk usage (by the WAL).

To prevent excessive growth, heartbeats can be placed in the replication stream so that Upsolver can, periodically, confirm events. A Heartbeat Table is used to set up the heartbeats. Once Upsolver is set up to use the heartbeat table, heartbeats will be tracked and confirmed by the Upsolver data source.

Follow these steps to create a heartbeat table:

  1. Create a heartbeat table:

CREATE TABLE IF NOT EXISTS <schema_name>.<table_name> (key int primary key, value timestamp)

2. Grant the necessary permissions for Upsolver to be able to use the heartbeat table:

GRANT INSERT, UPDATE, SELECT on table <table_name> to <user>

3. If the publication used in the data source was created for specific tables (not all tables), add the heartbeat table to the publication. See the Postgres documentation.

4. Add the heartbeat table to the data source in the data source creation wizard or in the data source properties.

Creating a PostgreSQL CDC Data Source with Amazon Athena Output

Upsolver can be used to replicate a PostgreSQL database onto an Athena data lake. It does this by ingesting PostgreSQL data and WALs, converting them to Athena Output and loading the results on an Athena data lake.

NOTE: When processing a PostgreSQL data source:

⚠️ Do not stop the data source.

⚠️ Do not stop the cluster.

See the Cleanup section below for more details.

Follow these steps to create a PostgreSQL CDC data source:

1. Click on DATA SOURCES and click on NEW on the upper right hand corner.

2. SELECT PostgreSQL from the Change Data Capture group.

3. Enter your connection string for PostgreSQL database. Make sure to use the right port, user name and password. Click on TEST CONNECTION and wait for the green check mark to appear.

3. (Optional) Add the Publication Name and Heartbeat Table Name.

4. (Optional) Choose the schema(s) and table(s) that you want to read from. The list supports regular expression. You may also exclude columns from your tables.

Replicating the database

5. To Replicate the database, start by providing destination information. This will replicate the database automatically to the target data lake. TABLE PREFIX will show up as part of your destination table name. For example, if you input staging_ for a table named orders. The underlying table name will be staging_orders.

6. Give a name to your data source and data output. This will be the name of the entity in Upsolver. Click on CONTINUE.

At this point the MONITORING window will open and, a few minutes later, you will be able to see your data on Athena.

Cleanup

In order to be able to ingest events from a Postgres database, Upsolver creates a replication slot for every CDC data source. In Postgres, when events are not being ingested from the replication slot they will keep accumulating in the database. This might lead to high disk consumption and risk the database.

Be certain the replication slot has been deleted from your database. Failing to do that might lead to high disk consumption.

To stop the CDC process:

  1. Copy the data source ID from your data source URL and save it for step 3 and 4.

2. Delete the data source.

3. Make sure the replication slot was deleted from the database:

 SELECT * FROM pg_replication_slots

Look for a replication slot named uspolver_<data_source_id> where <data_source_id> is the id you copied in step 1.

4. If the replication slot wasn't deleted, please delete it manually:

SELECT pg_drop_replication_slot('uspolver_<data_source_id>')

5. If you created a Heartbeat Table and want to delete it, use:

DROP TABLE <schema_name>.<table_name>

Last updated