LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
How To Guides
How To Guides
  • How To Guides
  • SETUP
    • Deploy Upsolver on AWS
      • Deployment Guide
      • AWS Role Permissions
      • VPC Peering Guide
      • Role-Based AWS Credentials
    • Enable API Integration
    • Install the Upsolver CLI
  • CONNECTORS
    • Create Connections
      • Amazon Kinesis
      • Amazon Redshift
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • ClickHouse
      • Confluent Cloud
      • Elasticsearch
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • PostgreSQL
      • Snowflake
      • Tabular
    • Configure Access
      • Amazon Kinesis
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • Confluent Kafka
    • Enable CDC
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • PostgreSQL
  • JOBS
    • Basics
      • Real-time Data Ingestion — Amazon Kinesis to ClickHouse
      • Real-time Data Ingestion — Amazon S3 to Amazon Athena
      • Real-time Data Ingestion — Apache Kafka to Amazon Athena
      • Real-time Data Ingestion — Apache Kafka to Snowflake
    • Advanced Use Cases
      • Build a Data Lakehouse
      • Enriching Data - Amazon S3 to ClickHouse
      • Joining Data — Amazon S3 to Amazon Athena
      • Upserting Data — Amazon S3 to Amazon Athena
      • Aggregating Data — Amazon S3 to Amazon Athena
      • Managing Data Quality - Ingesting Data with Expectations
    • Database Replication
      • Replicate CDC Data into Snowflake
      • Replicate CDC Data to Multiple Targets in Snowflake
      • Ingest Your Microsoft SQL Server CDC Data to Snowflake
      • Ingest Your MongoDB CDC Data to Snowflake
      • Handle PostgreSQL TOAST Values
    • VPC Flow Logs
      • Data Ingestion — VPC Flow Logs
      • Data Analytics — VPC Flow Logs
    • Job Monitoring
      • Export Metrics to a Third-Party System
    • Data Observability
      • Observe Data with Datasets
  • DATA
    • Query Upsolver Iceberg Tables from Snowflake
  • APACHE ICEBERG
    • Analyze Your Iceberg Tables Using the Upsolver CLI
    • Optimize Your Iceberg Tables
Powered by GitBook
On this page
  • Overview
  • Key Behavior: TOAST and Debezium
  • Example Scenario: Replicating PostgreSQL to Snowflake
  • Step-by-Step Guide
  • Potential Workarounds
  • Option 1: Set Replica Identity to FULL
  • Option 2: Leverage CUSTOM_UPDATE_EXPRESSIONS
  • Option 3: Use a Materialized View for Latest TOAST Value
  1. JOBS
  2. Database Replication

Handle PostgreSQL TOAST Values

This page describes how to handle PostgreSQL TOAST values with Upsolver.

Overview

PostgreSQL employs TOAST (The Oversized-Attribute Storage Technique) to efficiently store large objects such as strings or byte arrays that exceed a specific size. While TOAST is often transparent to the user, it gains importance when implementing change data capture (CDC) and logical replication functionalities. This is particularly relevant when using CDC engines like Debezium in conjunction with Upsolver.

Key Behavior: TOAST and Debezium

When Debezium captures a changed row from a PostgreSQL database, it omits unchanged TOASTed values unless they are explicitly part of the table's replica identity. If a TOAST column's value remains unchanged but appears in the logical replication message, the Debezium Postgres connector substitutes the value with a placeholder: __debezium_unavailable_value.

Example Scenario: Replicating PostgreSQL to Snowflake

When replicating data from PostgreSQL to Snowflake, you may encounter that the __debezium_unavailable_value placeholder replaces the original TOAST value in the Snowflake target table.

Step-by-Step Guide

1. PostgreSQL Table Setup

Create a table in PostgreSQL to store large data:

CREATE TABLE large_data (
    id serial PRIMARY KEY,
    big_text text,
    other_column text
);

2. Insert Row with TOASTed Value

Insert a row into the table with a large text that will be TOASTed.

INSERT INTO large_data (big_text, other_column) 
VALUES (repeat('A very long string. ', 10000), '2');

3. Configuration an Ingestion Job

Configure an ingestion job in Upsolver to replicate the data.

CREATE TABLE default_glue_catalog.schema_name.postgres_raw_data();

CREATE JOB load_raw_data_from_postgres
    PUBLICATION_NAME = 'upsolver_publication'
AS COPY FROM POSTGRES upsolver_postgres_samples
    INTO default_glue_catalog.schema_name.postgres_raw_data;

4. Modify Row Without Changing TOASTed Column

UPDATE large_data SET other_column = '3' WHERE id = 1;

5. Query Upsolver SQLake Table

SELECT * FROM default_glue_catalog.schema_name.postgres_raw_data 
WHERE "$full_table_name" = 'postgres.public.large_data';

You will see that updates to the big_text column display __debezium_unavailable_value.

Potential Workarounds

Option 1: Set Replica Identity to FULL

To keep track of unchanged TOASTed columns, set the replica identity of the PostgreSQL table to FULL.

ALTER TABLE large_data REPLICA IDENTITY FULL;

Option 2: Leverage CUSTOM_UPDATE_EXPRESSIONS

In a Snowflake Merge job, use the CUSTOM_UPDATE_EXPRESSIONS parameter to conditionally replace __debezium_unavailable_value with the original value.

CREATE SYNC JOB transform_to_snowflake_using_custom_update_expression
   START_FROM = BEGINNING
   CREATE_TABLE_IF_MISSING = true
   CUSTOM_UPDATE_EXPRESSIONS = (BIG_TEXT = 'CASE {} 
      WHEN ''__debezium_unavailable_value'' 
      THEN SCHEMA_NAME.TOAST_EXAMPLE1.BIG_TEXT ELSE {} END')
AS MERGE INTO SNOWFLAKE snowflake_connection.SCHEMA_NAME.TOAST_EXAMPLE1 t 
USING (
         SELECT *, $operation = 'delete' as is_delete
         FROM default_glue_catalog.schema_name.postgres_raw_data 
         WHERE "$full_table_name" = 'postgres.public.large_data'
         AND TIME_FILTER()
      ) s
       ON (s.id = t.ID)
       WHEN MATCHED AND is_delete THEN DELETE
       WHEN MATCHED THEN REPLACE
       WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

Caveat

If both an insert and an update operation for the same row are ingested into Snowflake within the same COMMIT_INTERVAL, the __debezium_unavailable_value placeholder will persist, as the original value has not yet been established in the table.

Option 3: Use a Materialized View for Latest TOAST Value

Create a materialized view in Upsolver that stores the latest TOAST values and JOIN it with the target table.

// Materialized view that stores the last TOAST value
CREATE MATERIALIZED VIEW default_glue_catalog.schema_name.last_toast_value AS
   SELECT id as ID,
       LAST(NULL_IF(BIG_TEXT = '__debezium_unavailable_value', big_text)) AS BIG_TEXT
   FROM default_glue_catalog.schema_name.postgres_raw_data
   WHERE "$full_table_name" = 'postgres.public.large_data'
   GROUP BY 1;

// Transformation job to merge the data to Snowflake
CREATE SYNC JOB transform_to_snowflake_using_mv
   START_FROM = BEGINNING
   CREATE_TABLE_IF_MISSING = true
AS MERGE INTO SNOWFLAKE snowflake_connection.SCHEMA_NAME.TOAST_EXAMPLE_WITH_JOIN2 t 
USING (
         SELECT *,   
                CASE 
                   WHEN t.big_text = '__debezium_unavailable_value' THEN l.big_text 
                   ELSE t.big_text 
                END AS BIG_TEXT,
                $operation = 'delete' AS is_delete
         FROM default_glue_catalog.schema_name.postgres_raw_data t
         LEFT JOIN default_glue_catalog.schema_name.lookup2 l 
         AS OF $event_time
         ON l.id = t.id 
         WHERE "$full_table_name" = 'postgres.public.large_data'
         AND TIME_FILTER()
      ) s
       ON (s.id = t.ID)
       WHEN MATCHED AND is_delete THEN DELETE
       WHEN MATCHED THEN REPLACE
       WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

By implementing these strategies, you can ensure that TOASTed values are properly handled and replicated in your Upsolver jobs.

Last updated 11 months ago