Comment on page
PostgreSQL
This page describes the options for creating a replication job to ingest data from PostgreSQL.
CREATE REPLICATION JOB <job_identifier>
[{ job_options }]
FROM <source_connection_name>
[{ source_options }]
WITH REPLICATION GROUP <group_name>
[{ replication_group_options }]
[ WITH REPLICATION GROUP ... ];
Jump to
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ INTERMEDIATE_STORAGE_CONNECTION ]
[ INTERMEDIATE_STORAGE_LOCATION ]
[ SNAPSHOT_PARALLELISM = <int> ]
General job options:
[ HEARTBEAT_TABLE = '<heartbeat_name>' ]
[ PUBLICATION_NAME = ('regexFilter1', 'regexFilter2') ]
PostgreSQL source options:
Type:
string
(Optional) If it is not set, no heartbeat table is used. Using a heartbeat table is recommended to avoid the replication slot growing indefinitely when no CDC events are captured for the subscribed tables.
Type:
text
Adds a new publication to the current database. The publication name must be distinct from the name of any existing publication in the current database. DDL will be filtered.
{
[ INCLUDED_TABLES_REGEX = ('<included_tables_regex>', ...)
EXCLUDED_COLUMNS_REGEX = ('<excluded_columns_regex>'[, ...]) ]
|
[ INCLUDED_SCHEMA_DEFINITION = (<included_table_pattern>)
EXCLUDED_SCHEMA_DEFINITION = (<excluded_table_pattern>) ]
}
[ COLUMN_TRANSFORMATIONS = (<column_name> = <expresion> [, ...]) ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ LOGICAL_DELETE_COLUMN = <column_identifier> ]
[ REPLICATION_TARGET = <target_connection_identifier> ]
[ TARGET_SCHEMA_NAME_EXPRESSION = '<schema_name>' ]
[ TARGET_TABLE_NAME_EXPRESSION = '<table_prefix_name>' || $table_name ]
[ UPSOLVER_EVENT_TIME_COLUMN = <column_identifier> ]
[ WRITE_MODE = { MERGE | APPEND } ]
General replication group options:
The following example creates a job named postgres_replication_to_snowflake that writes to two replication groups in Snowflake: replicate_to_snowflake_prod for production use and replicate_to_snowflake_dev for development:
CREATE REPLICATION JOB postgres_replication_to_snowflake
COMMENT = 'Replicate Postgres CDC data to Snowflake groups'
COMPUTE_CLUSTER = "Default Compute (Free)"
INTERMEDIATE_STORAGE_CONNECTION = s3_connection
INTERMEDIATE_STORAGE_LOCATION = 's3://upsolver-integration-tests/test/'
FROM my_postgres_connection
PUBLICATION_NAME = 'orders_publication'
HEARTBEAT_TABLE = 'orders.heartbeat'
WITH REPLICATION GROUP replicate_to_snowflake_prod
INCLUDED_TABLES_REGEX = ('orders\..*')
EXCLUDED_COLUMNS_REGEX = ('.*\.creditcard') -- exclude creditcard columns
COMMIT_INTERVAL = 5 MINUTES
LOGICAL_DELETE_COLUMN = "is_deleted"
REPLICATION_TARGET = my_snowflake_connection
TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS'
TARGET_TABLE_NAME_EXPRESSION = $table_name
WRITE_MODE = MERGE
WITH REPLICATION GROUP replicate_to_snowflake_dev
INCLUDE_SCHEMA_DEFINITION = ('orders\..*')
COMMIT_INTERVAL = 1 HOUR
REPLICATION_TARGET = my_snowflake_connection
TARGET_TABLE_NAME_EXPRESSION = 'history_' || $table_name
TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_DEV'
WRITE_MODE = APPEND;
Each group has its own set of options that can be configured differently while streaming from the same data source. Note that the source options set the
PUBLICATION_NAME
and HEARTBEAT_TABLE
for the source connection, which is shared by both groups. The production replication group has a
COMMIT_INTERVAL
of 5 MINUTES to keep the target frequently updated, whereas the development group has an interval of 1 HOUR to update less often. Furthermore, the production group will MERGE
the data, using the column is_deleted to flag if a row is deleted in the source, and the development group will APPEND
the data. Both groups replicate all tables in the orders schema, as specified in the
INCLUDED_TABLES_REGEX
option. However, all address columns are excluded from the production target in the EXCLUDED_COLUMNS_REGEX
option to remove PII.In the development group, the
TARGET_SCHEMA_NAME_EXPRESSION
option includes the prefix value of history_. This means that Upsolver will create the target table using the name of the source table with the prefix so it is clear that the target tables in this group are used for development.Last modified 2mo ago