PostgreSQL

This article describes how to ingest CDC data from your PostgreSQL source database.

Prerequisites for PostgreSQL

Database configuration

CDC uses logical replication to get change events from the Write-Ahead Log (WAL), so you should ensure this is enabled in advance.

You can check the replication method of your database by running:

SHOW wal_level;

This query should return logical. If it returns any other value, please change the replication method:

  • For self-hosted PostgreSQL:

ALTER SYSTEM SET wal_level = logical;
  • For RDS / Aurora PostgreSQL: set the logical_replication parameter in AWS RDS console to 1.

The database must be restarted for this change to take effect.

Permissions

In order for Upsolver to read the WAL and the initial state of the database, the CDC data source requires:

  1. A user with replication and select permissions

  2. A PostgreSQL publication for the replicated tables

Creating a CDC user

If you are using an AWS RDS PostgreSQL instance please see the instructions below on how to create a CDC user. If you are using a self-hosted instance, please refer to Debezium's documentation.

Upsolver recommends creating a separate user for the CDC process and granting that user replication and select permissions.

Replication permission can be granted using:

  • For self-hosted PostgreSQL:

ALTER ROLE <user> WITH REPLICATION;
  • For RDS / Aurora PostgreSQL:

GRANT rds_replication TO <user>;

SELECT permissions can be granted either by giving permission to the entire schema, or for each table individually:

 GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <user>; // For all tables
 GRANT SELECT ON <table_name> TO <user>; // For a specific table

Creating a publication

Upsolver requires the publication of replicated tables in order to be able to subscribe to change events. The publication can be either on all tables or on specific tables.

If the database contains tables without primary keys, adding those tables to the publication will cause PostgreSQL to block all updates and deletes for that table. This can be prevented in one of the following ways:

  • Manage the publication tables manually and don't include such tables.

  • Change the REPLICA IDENTITY of that table to FULL or INDEX. More information can be found in the PostgreSQL documentation.

  • Add a primary key to the table.

Creating a publication of all tables is the simplest method and will automatically include all tables created in the future. However, this can lead to issues if tables without primary keys are created without having a REPLICA IDENTITY configured.

A publication of all tables can be created by a superuser using the following command:

CREATE PUBLICATION upsolver FOR ALL TABLES

Creating a publication on specific tables will not automatically include newly created tables. For instructions on how to create and manage publications for specific tables, see the PostgreSQL documentation.

More information about permissions can be found in the Debezium documentation.

Setting up a Heartbeat Table

The WAL in a PostgreSQL instance is shared among all tables and databases on that instance. When at least one replication slot exists, PostgreSQL will wait until all replication slots confirm that a particular WAL event has been consumed before deleting it. Consequently, a replication slot that doesn't confirm any event for a long time might lead to the WAL growing excessively.

Upsolver can confirm an event only when receiving one. This means that when a data source doesn't contain any frequently changing tables, Upsolver might not get an opportunity to confirm any events, thereby leading to high disk usage (by the WAL).

To prevent excessive growth, heartbeats can be placed in the replication stream so that Upsolver can periodically confirm events. A Heartbeat Table sets up the heartbeats. After Upsolver is set up to use the heartbeat table, heartbeats will be tracked and confirmed by the Upsolver data source.

Follow these steps to create a heartbeat table:

  1. Create a heartbeat table:

CREATE TABLE IF NOT EXISTS <schema_name>.<table_name> (key int primary key, value timestamp)

2. Grant the necessary permissions for Upsolver to use the heartbeat table:

GRANT INSERT, UPDATE, SELECT on table <table_name> to <user>

3. If the publication used in the data source was created for specific tables (not all tables), add the heartbeat table to the publication. See the PostgreSQL documentation.

4. Add the heartbeat table to the data source in the data source creation wizard or in the data source properties.

To learn more about PostgreSQL connections see: Connect to your PostgreSQL database

Create a CDC job that reads from your database

You can create a job to ingest your data from PostgreSQL into a staging table in the data lake.

Jump to

Ingest to the data lake

After completing the prerequisites, you can create your staging tables. The example below creates a table without defining columns or data types, as this will be inferred automatically by Upsolver, though you can define columns if required:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data()
    PARTITIONED BY $event_date;

Create a staging table to store the data ingested from PostgreSQL.

Upsolver recommends partitioning by the system column $event_date or another date column within the data in order to optimize your query performance.

Next, you can create an ingestion job as follows:

CREATE JOB load_orders_raw_data_from_postgres
      PUBLICATION_NAME = 'sample'
   AS COPY FROM POSTGRES upsolver_postgres_samples 
   INTO default_glue_catalog.upsolver_samples.orders_raw_data

Create an ingestion job to copy the data from PostgreSQL into a staging table in the data lake.

Note that multiple ingestion jobs can write to the same table, resulting in a final table that contains a UNION ALL of all data copied into that table. This means that any duplicate rows that are written are not removed and the columns list may expand if new columns are detected.

This may not be your intended behavior, so ensure you are writing to the correct table before running your job.

The example above only uses a small subset of all job options available when reading from PostgreSQL. Depending on your use case, you may want to configure a different set of options. For the full list of job options with syntax and detailed descriptions, see the databases page for CDC options.

After your data has been ingested into your staging table, you are ready to move on to the next step of building your data pipeline: transforming your data and writing it to your intended target locations.

Job options

Transformations can be applied to your ingestion job to correct issues, exclude columns, or mask data before it lands in the target. Furthermore, you can use expectations to define data quality rules on your data stream and take appropriate action.

For more information, see the Ingestion jobs page, which describes the available job options and includes examples.

Alter a job that reads from PostgreSQL

Some job options are considered mutable, enabling you to run a SQL command to alter an existing ingestion job rather than create a new job.

For example, take the job we created earlier:

CREATE JOB load_orders_raw_data_from_postgres
      PUBLICATION_NAME = 'sample'
   AS COPY FROM POSTGRES upsolver_postgres_samples 
   INTO default_glue_catalog.upsolver_samples.orders_raw_data

Create an ingestion job to copy data from PostgreSQL into a staging table in the data lake.

If you want to keep the job as is, but only change the cluster that is running the job, execute the following command:

ALTER JOB load_orders_raw_data_from_postgres 
    SET COMPUTE_CLUSTER = my_new_cluster;

Note that some options such as COMPRESSION cannot be altered once the connection has been created.

To check which job options are mutable, see PostgreSQL.

Drop a job that reads from PostgreSQL

If you no longer need a job, you can easily drop it using the following SQL command:

DROP JOB load_orders_raw_data_from_postgres;

Last updated