PostgreSQL
This page describes the job options for ingesting data from PostgreSQL.
CREATE JOB <job_name>
[{ job_options }]
AS COPY FROM Postgres
<connection_identifier>
[{ source_options }]
INTO <table_identifier>
WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ HEARTBEAT_TABLE = '<heartbeat_name>' ]
[ PARSE_JSON_COLUMNS = { TRUE | FALSE } ]
[ PUBLICATION_NAME = ('regexFilter1', 'regexFilter2') ]
[ SKIP_SNAPSHOTS = { TRUE | FALSE } ]
[ SNAPSHOT_PARALLELISM = <integer> ]
PostgreSQL job options:
General job options:
See also:
Type:
string
(Optional) If it is not set, no heartbeat table is used. Using a heartbeat table is recommended to avoid the replication slot growing indefinitely when no CDC events are captured for the subscribed tables.
Type:
Boolean
Default:
false
If enabled, Upsolver will parse JSON columns into a struct matching the JSON value.
Type:
text
Adds a new publication to the current database. The publication name must be distinct from the name of any existing publication in the current database. DDL will be filtered.
Type:
Boolean
Default:
false
(Optional) The snapshot-taking process will be skipped.
This is a way to skip the original snapshot-taking process for this specific job. The remainder of the job actions will be the same.
Type:
integer
Default Value:
1
(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker it is to have all tables streaming. However, doing more snapshots in parallel increases the load on the source database.
[ TABLE_INCLUDE_LIST = ('regexFilter1', 'regexFilter2') ]
[ COLUMN_EXCLUDE_LIST = ('regexFilter1', 'regexFilter2') ]
Type:
text
Default:
''
(Optional) Comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture. Tables not included in this list will not be loaded. If the list is left empty all tables will be loaded. This maps to Debezium table.include.list property.
By default, the connector captures changes in every non-system table in all databases. To match the name of a table, SQLake applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table. It does not match substrings that might be present in a table name.
Each RegEx pattern matches against the full string
databaseName.tableName
, for example:RegEx Pattern | Results |
---|---|
db_name.* | Select all tables from the db_name database. |
db_name.users, db_name.items | Select the users and items tables from the db_name database. |
db1.items_.* | Select all tables from db1 that start with items_ . |
Type:
array[string]
Default:
''
(Optional) Comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. This maps to the Debezium column.exclude.list property.
By default, the connector matches all columns of the tables listed in
TABLE_INCLUDE_LIST
. To match the name of a column, SQLake applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name.Each RegEx pattern matches against the full string
databaseName.tableName.columnName
, for example:RegEx Pattern | Results |
---|---|
db.users.address_.* | Select all columns starting with address_ from the users table in the db database. |
db.*.(.*_pii) | Select all columns ending in _pii across all tables in the db database. |
The following example creates a job to ingest data from PostgreSQL into a table in the data lake. The
PUBLICATION_NAME
option specifies that a new publication named sample is added.CREATE JOB load_orders_raw_data_from_postgres
PUBLICATION_NAME = 'sample'
AS COPY FROM POSTGRES upsolver_postgres_samples
INTO default_glue_catalog.upsolver_samples.orders_raw_data
Ingest data from PostgreSQL into a table in the data lake.
Last modified 8d ago