Microsoft SQL Server

Syntax

CREATE JOB <job_name>
    [{ job_options }]
AS COPY FROM MSSQL <connection_identifier>
    [{ source_options }]
INTO <table_identifier>
 WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Jump to

Job options

The following job properties configure the behavior of the ingestion job.

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]  
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]   
[ PARSE_JSON_COLUMNS = { TRUE | FALSE } ]  
[ SKIP_SNAPSHOTS = { TRUE | FALSE } ]
[ SNAPSHOT_PARALLELISM = <integer> ]

SQL Server job options:

General job options:

See also:

PARSE_JSON_COLUMNS

Type: Boolean

Default: false

If enabled, Upsolver will parse JSON columns into a struct matching the JSON value.

SKIP_SNAPSHOTS— editable

Type: Boolean

Default: false

(Optional) By default, snapshots are enabled for new tables. This means that Upsolver will take a full snapshot of the table(s) and ingest it into the staging table before it continues to listen for change events. When True, Upsolver will not take an initial snapshot and only process change events starting from the time the ingestion job is created.

In the majority of cases, when you connect to your source tables, you want to take a full snapshot and ingest it as the baseline of your table. This creates a full copy of the source table in your data lake before you begin to stream the most recent change events. If you skip taking a snapshot, you will not have the historical data in the target table, only the newly added or changed rows.

Skipping a snapshot is useful in scenarios where your primary database instance crashed or became unreachable, failing over to the secondary. In this case, you will need to re-establish the CDC connection but would not want to take a full snapshot because you already have all of the history in your table. In this case, you would want to restart processing from the moment you left off when the connection to the primary database went down.

SNAPSHOT_PARALLELISM

Type: int

Default: 1

(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, doing more snapshots in parallel increases the load on the source database.

Source options

[ COLUMN_EXCLUDE_LIST = ('regexFilter1', 'regexFilter2') ]   
[ TABLE_INCLUDE_LIST = ('regexFilter1', 'regexFilter2') ]

TABLE_INCLUDE_LIST — editable

Type: text

Default: ''

(Optional) Comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture. Tables not included in this list will not be loaded. If the list is left empty all tables will be loaded. This maps to the Debezium table.include.list property.

By default, the connector captures changes in every non-system table in all databases. To match the name of a table, Upsolver applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table. It does not match substrings that might be present in a table name.

Each RegEx pattern matches against the full string databaseName.tableName, for example:

RegEx PatternResults

db_name.*

Select all tables from the db_name database.

db_name.users, db_name.items

Select the users and items tables from the db_name database.

db1.items_.*

Select all tables from db1 that start with items_.

COLUMN_EXCLUDE_LIST — editable

Type: array[string]

Default: ''

(Optional) Comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. This maps to the Debezium column.exclude.list property.

By default, the connector matches all columns of the tables listed in TABLE_INCLUDE_LIST. To match the name of a column, Upsolver applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name.

Each RegEx pattern matches against the full string databaseName.tableName.columnName, for example:

RegEx PatternResults

db.users.address_.*

Select all columns starting with address_ from the users table in the db database.

db.*.(.*_pii)

Select all columns ending in _pii across all tables in the db database.

Examples

Ingest data into the data lake

The following example creates a job to ingest data from SQL Server into a table in the data lake.

CREATE SYNC JOB load_raw_data_from_mssql
AS COPY FROM MSSQL my_mssql_connection
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

Ingest data with additional options

The above example shows you how to create a job using minimal code, however, you can use job and source options to enhance your job. In the following example, more options are included:

CREATE SYNC JOB load_raw_data_from_mssql
  COLUMN_TRANSFORMATIONS = (email = MD5(email))
  COMMENT = 'Ingest CDC data from SQL Server'
AS COPY FROM MSSQL my_mssql_connection
INTO default_glue_catalog.upsolver_samples.orders_raw_data
    TABLE_INCLUDE_LIST = ('dbo.customers', 'dbo.orders')
    COLUMN_EXCLUDE_LIST = ('dbo.*.credit_card', 'dbo.customers.address_.*')
    WITH EXPECTATION exp_orderid_not_null 
      EXPECT orderid IS NOT NULL 
      ON VIOLATION DROP;

The COLUMN_TRANSFORMATION job option is included to mask the value in the email column to protect PII. Furthermore, the TABLE_INCLUDE_LIST source option limits the ingested data to the customers and orders tables and COLUMN_EXCLUDE_LIST instructs the job to ignore columns named credit_card in any table, along with columns prefixed with address_ in the customers table.

The expectation exp_orderid_not_null has been added to the job to check that the orderid column is not NULL. Any ingested rows without an orderid value will be dropped and won't be loaded into the target table.

Last updated