Upsolver
Contact Support
  • Welcome to Upsolver
  • Getting Started
    • Start using Upsolver for free
    • Get started as a Upsolver user
      • Upsolver in 5 minutes
        • Upsolver Quickstart in 5 minutes
          • Additional sandbox fun
        • Amazon Athena data output
        • MySQL (AWS RDS) data output
        • Local MySQL data output
      • Upsolver free training
        • Introduction to Upsolver
          • Transform and write data to Amazon Athena
          • Pre-aggregate data for efficiency and performance
          • UPSERT streaming data to Amazon Athena
      • Prerequisites for AWS deployment
      • AWS integration
      • Deploy Upsolver on your AWS account
      • Prerequisites for Azure Deployment
      • Azure Integration
        • Prerequisites for Azure Users
        • Log into Upsolver
        • Log into Azure & Authenticate
        • Set Up and Deploy Azure Resources
        • Delegate Resource Group, and Deploy Upsolver in Azure
        • Integrate Azure with Upsolver
    • Upsolver concepts
      • Deployment models
      • Upsolver components
      • Data ingestion
    • Upsolver Amazon AWS deployment guide
      • Private VPC
      • Upsolver VPC
      • AWS role permissions
      • VPC peering
    • Tutorials and FAQ
      • Tutorials
        • How To Re-process Data
        • Create an Amazon S3 data source
        • Create an Amazon Athena data output
        • Join multiple data streams for real-time analytics
        • Use Upsolver to index less data into Splunk
        • Upsert and delete use case
        • AWS S3 to Athena use case
        • Merge data use case
        • Full vs. Partial Inbound Data Records
      • FAQ
      • Infrastructure
        • What is a dry-run cluster?
    • Glossary
      • Language guide
        • SQL syntax reference
        • Functions
          • Aggregation Functions
            • APPROX_COUNT_DISTINCT
            • APPROX_COUNT_DISTINCT_EACH
            • AVG
            • AVG_EACH
            • AVG_TIME_SERIES
            • COLLECT_SET
            • COLLECT_SET_EACH
            • COUNT
            • COUNT(*)
            • COUNT_DISTINCT
            • COUNT_EACH
            • COUNT_IF
            • DECAYED_SUM
            • DYNAMIC_SESSIONS
            • FIRST
            • FIRST_ARRAY
            • FIRST_EACH
            • FIRST_TIME_SERIES
            • LAST
            • LAST_ARRAY
            • LAST_EACH
            • LAST_K
            • LAST_K_EACH
            • LAST_TIME_SERIES
            • MAX
            • MAX_BY
            • MAX_EACH
            • MAX_TIME_SERIES
            • MIN
            • MIN_BY
            • MIN_EACH
            • MIN_TIME_SERIES
            • SESSION_COUNT
            • STD_DEV
            • STD_DEV_EACH
            • STRING_MAX_EACH
            • STRING_MIN_EACH
            • SUM
            • SUM_EACH
            • SUM_TIME_SERIES
            • WEIGHTED_AVERAGE
          • Calculated functions
            • Aerospike functions
            • Array functions
            • Conditional functions
            • Date functions
            • External API functions
            • Filter functions
            • Numeric functions
            • Spatial functions
            • String functions
            • Structural functions
              • ZIP
            • Type conversion functions
      • Data formats
      • Data types and features
      • Database output options
      • Upsolver shards
      • Permissions list
      • Index
    • Troubleshooting
      • My CloudFormation stack failed to deploy
      • My private API doesn't start or I can't connect to it
        • Elastic IPs limit reached
        • EC2 Spot Instance not running
        • DNS cache
        • Security group not open
      • My compute cluster doesn't start
      • I can't connect to my Kafka cluster
      • I can't create an S3 data source
      • Data doesn't appear in Athena table
      • I get an exception when querying my Athena table
      • Unable to define a JDBC (Postgres) connection
  • Connecting data sources
    • Amazon AWS data sources
      • Amazon S3 data source
        • Quick guide: S3 data source
        • Full guide: S3 data source
      • Amazon Kinesis Stream data source
      • Amazon S3 over SQS data source
      • Amazon AppFlow data source
        • Setup Google Analytics client ID and client secret.
    • Microsoft Azure data sources
      • Azure Blob storage data source
      • Azure Event Hubs data source
    • Kafka data source
    • Google Cloud Storage data source
    • File upload data source
    • CDC data sources (Debezium)
      • MySQL CDC data source
        • Binlog retention in MySQL
      • PostgreSQL CDC database replication
    • JDBC data source
    • HDFS data source
    • Data source UI
    • Data source properties
  • Data outputs and data transformation
    • Data outputs
      • Amazon AWS data outputs
        • Amazon S3 data output
        • Amazon Athena data output
          • Quick guide: Athena data output
          • Full guide: Athena data output
          • Output all data source fields to Amazon Athena
        • Amazon Kinesis data output
        • Amazon Redshift data output
        • Amazon Redshift Spectrum data output
          • Connect Redshift Spectrum to Glue Data Catalog
        • Amazon SageMaker data output
      • Data lake / database data outputs
        • Snowflake data output
          • Upsert data to Snowflake
        • MySQL data output
        • PostgreSQL data output
        • Microsoft SQL Server data output
        • Elasticsearch data output
        • Dremio
        • PrestoDB
      • Upsolver data output
      • HDFS data output
      • Google Storage data output
      • Microsoft Azure Storage data output
      • Qubole data output
      • Lookup table data output
        • Lookup table alias
        • API Playground
        • Serialization of EACH aggregations
      • Kafka data output
    • Data transformation
      • Transform with SQL
        • Mapping data to a desired schema
        • Transforming data with SQL
        • Aggregate streaming data
        • Query hierarchical data
      • Work with Arrays
      • View outputs
      • Create an output
        • Modify an output in SQL
          • Quick guide: SQL data transformation
        • Add calculated fields
        • Add filters
        • Add lookups
          • Add lookups from data sources
          • Add lookups from lookup tables
          • Adding lookups from reference data
        • Output properties
          • General output properties
      • Run an output
      • Edit an output
      • Duplicate an output
      • Stop an output
      • Delete an output
  • Guide for developers
    • Upsolver REST API
      • Create a data source
      • Modify a data source
      • API content formats
    • CI/CD on Upsolver
  • Administration
    • Connections
      • Amazon S3 connection
      • Amazon Kinesis connection
      • Amazon Redshift connection
      • Amazon Athena connection
      • Amazon S3 over SQS connection
      • Google Storage connection
      • Azure Blob storage connection
      • Snowflake connection
      • MySQL connection
      • Elasticsearch connection
      • HDFS connection
      • Qubole connection
      • PostgreSQL connection
      • Microsoft SQL Server connection
      • Spotinst Private VPC connection
      • Kafka connection
    • Clusters
      • Cluster types
        • Compute cluster
        • Query cluster
        • Local API cluster
      • Monitoring clusters
      • Cluster tasks
      • Cluster Elastic IPs
      • Cluster properties
      • Uploading user-provided certificates
    • Python UDF
    • Reference data
    • Workspaces
    • Monitoring
      • Credits
      • Delays In Upsolver pipelines
      • Monitoring reports
        • Monitoring system properties
        • Monitoring metrics
    • Security
      • IAM: Identity and access management
        • Manage users
        • Manage groups
        • Manage policies
      • Git integration
      • Single sign-on with SAML
        • Microsoft Azure AD with SAML sign-on
        • Okta with SAML sign-on
        • OneLogin with SAML sign-on
      • AMI security updates
  • Support
    • Upsolver support portal
  • Change log
  • Legal
Powered by GitBook
On this page
  • Supported Versions
  • Prerequisites
  • Database configuration
  • Permissions
  • Creating a publication
  • Setting up a Heartbeat Table
  • Creating a PostgreSQL CDC Data Source with Amazon Athena Output
  • Replicating the database
  • Cleanup

Was this helpful?

  1. Connecting data sources
  2. CDC data sources (Debezium)

PostgreSQL CDC database replication

This article describes how to use Upsolver to replicate a PostgeSQL database.

Supported Versions

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL database versions 10, 11, 12 and 13 are supported. This includes managed DB instances hosted by AWS RDS and Aurora.

Prerequisites

Database configuration

CDC uses logical replication to get change events from the WAL (Write-Ahead Log). So, you should make sure logical replication is enabled.

You can check the replication method of your database by running:

SHOW wal_level;

This query should return "logical". If it returns any other value please change the replication method:

  • For self hosted Postgres:

ALTER SYSTEM SET wal_level = logical;
  • For RDS / Aurora Postgres: set the “logical_replication” parameter in AWS RDS console to "1".

The database must be restarted for this change to take effect.

Permissions

In order for Upsolver to read the WAL and the initial state of the database, the CDC data source requires:

  1. A user with replication and select permissions

Creating a CDC user

We recommend creating a separate user for the CDC process, and grant that user replication and select permissions.

Replication permission can be granted using:

  • For self hosted Postgres:

ALTER ROLE <user> WITH REPLICATION;
  • For RDS / Aurora Postgres:

GRANT rds_replication TO <user>;

SELECT permissions can be granted either by giving permission to the entire schema, or, for each table individually:

 GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <user>; // For all tables
 GRANT SELECT ON <table_name> TO <user>; // For a specific table

Creating a publication

Upsolver requires the publication of replicated tables in order to be able to subscribe to change events. The publication can be either on all tables or on specific tables.

If the database contains tables that do not have Primary Keys, adding those tables to the publication will cause Postgres to block all updates and deletes for that table. This can be prevented in one of the following ways:

  • Manage the publication tables manually and don't include such tables in it.

  • Add a primary key to the table.

Creating a publication of all tables is the simplest method and will automatically include all tables that will be created in the future but can lead to issues if tables without primary keys are created without having a REPLICA IDENTITY configured.

A publication of all tables can be created by a superuser using the following command:

CREATE PUBLICATION upsolver FOR ALL TABLES

Setting up a Heartbeat Table

The WAL in a Postgres instance is shared among all tables and databases on that instance. When at least one replication slot exists, Postges will wait until all replication slots confirm that a certain WAL event has been consumed before deleting it. Consequently, a replication slot that doesn't confirm any event for a long time might lead to the WAL growing excessively.

Upsolver is able to confirm an event only when receiving one. This means that when a data source doesn't contain any frequently changing table, Upsolver might not get an opportunity to confirm any events, thereby leading to high disk usage (by the WAL).

To prevent excessive growth, heartbeats can be placed in the replication stream so that Upsolver can, periodically, confirm events. A Heartbeat Table is used to set up the heartbeats. Once Upsolver is set up to use the heartbeat table, heartbeats will be tracked and confirmed by the Upsolver data source.

Follow these steps to create a heartbeat table:

  1. Create a heartbeat table:

CREATE TABLE IF NOT EXISTS <schema_name>.<table_name> (key int primary key, value timestamp)

2. Grant the necessary permissions for Upsolver to be able to use the heartbeat table:

GRANT INSERT, UPDATE, SELECT on table <table_name> to <user>

4. Add the heartbeat table to the data source in the data source creation wizard or in the data source properties.

Creating a PostgreSQL CDC Data Source with Amazon Athena Output

Upsolver can be used to replicate a PostgreSQL database onto an Athena data lake. It does this by ingesting PostgreSQL data and WALs, converting them to Athena Output and loading the results on an Athena data lake.

NOTE: When processing a PostgreSQL data source:

Follow these steps to create a PostgreSQL CDC data source:

1. Click on DATA SOURCES and click on NEW on the upper right hand corner.

2. SELECT PostgreSQL from the Change Data Capture group.

3. Enter your connection string for PostgreSQL database. Make sure to use the right port, user name and password. Click on TEST CONNECTION and wait for the green check mark to appear.

3. (Optional) Add the Publication Name and Heartbeat Table Name.

4. (Optional) Choose the schema(s) and table(s) that you want to read from. The list supports regular expression. You may also exclude columns from your tables.

Replicating the database

5. To Replicate the database, start by providing destination information. This will replicate the database automatically to the target data lake. TABLE PREFIX will show up as part of your destination table name. For example, if you input staging_ for a table named orders. The underlying table name will be staging_orders.

6. Give a name to your data source and data output. This will be the name of the entity in Upsolver. Click on CONTINUE.

At this point the MONITORING window will open and, a few minutes later, you will be able to see your data on Athena.

Cleanup

In order to be able to ingest events from a Postgres database, Upsolver creates a replication slot for every CDC data source. In Postgres, when events are not being ingested from the replication slot they will keep accumulating in the database. This might lead to high disk consumption and risk the database.

Be certain the replication slot has been deleted from your database. Failing to do that might lead to high disk consumption.

To stop the CDC process:

  1. Copy the data source ID from your data source URL and save it for step 3 and 4.

2. Delete the data source.

3. Make sure the replication slot was deleted from the database:

 SELECT * FROM pg_replication_slots

Look for a replication slot named uspolver_<data_source_id> where <data_source_id> is the id you copied in step 1.

4. If the replication slot wasn't deleted, please delete it manually:

SELECT pg_drop_replication_slot('uspolver_<data_source_id>')

5. If you created a Heartbeat Table and want to delete it, use:

DROP TABLE <schema_name>.<table_name>
PreviousBinlog retention in MySQLNextJDBC data source

Last updated 3 years ago

Was this helpful?

A Postgres for the replicated tables

If you are using an AWS RDS Postgres instance please see the instructions below on how to create a CDC user. If you are using a self hosted instance please refer to.

Change the REPLICA IDENTITY of that table to FULL or INDEX. More information about this can be found in .

Creating a publication on specific tables will not automatically include newly created tables. For instructions on how to create and manage publication for specific tables see.

More information about permissions can be found .

3. If the publication used in the data source was created for specific tables (not all tables), add the heartbeat table to the publication. See .

Do not stop the data source.

Do not stop the cluster.

See the section below for more details.

⚠️
⚠️
publication
Debezium's documentation
Postgres documentation
the Postgres documentation
in Debezium documentation
the Postgres documentation
Cleanup