MongoDB

Syntax

CREATE JOB <job_name>
    [{ job_options }]
AS COPY FROM MONGODB <connection_identifier>
    [{ source_options }]
    INTO <table_identifier>
 WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Jump to

Job options

The following job properties configure the behavior of the ingestion job.

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]  
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ] 
[ SKIP_SNAPSHOTS = { TRUE | FALSE } ]
[ SNAPSHOT_PARALLELISM = <integer> ]

MongoDB job options:

General job options:

See also:

SKIP_SNAPSHOTS— editable

Type: Boolean

Default: false

(Optional) By default, snapshots are enabled for new collections. This means that Upsolver will take a full snapshot of the collection and ingest it into the staging table before it continues to listen for change events. When True, Upsolver will not take an initial snapshot and only process change events starting from the time the ingestion job is created.

In the majority of cases, when you connect to your source collections, you want to take a full snapshot and ingest a baseline of your data. This creates a full copy of the source data in your data lake before you begin to stream the most recent change events. If you skip taking a snapshot, you will not have the historical data in the target table, only the newly added or changed rows.

Skipping a snapshot is useful in scenarios where your primary database instance crashed or became unreachable, failing over to the secondary. In this case, you will need to re-establish the CDC connection but would not want to take a full snapshot because you already have all of the history in your table. In this case, you would want to restart processing from the moment you left off when the connection to the primary database went down.

SNAPSHOT_PARALLELISM

Type: integer

Default Value: 1

(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker it is to have all collections streaming. However, doing more snapshots in parallel increases the load on the source database.

Source options

[ COLLECTION_INCLUDE_LIST = ( '..' [ , '...' ] ) ]

COLLECTION_INCLUDE_LIST — editable

Type: text

Default: ''

(Optional) Comma-separated list of regular expressions that match fully-qualified namespaces of collections whose changes you want to capture. Collections not included in this list will not be loaded. If the list is left empty all collections will be loaded. This maps to the Debezium collections.include.list property.

By default, the connector captures changes in every non-system collection in all databases. To match the name of a collection, Upsolver applies the regular expression that you specify as an anchored regular expression, so that the specified expression is matched against the name string of the namespace; it does not match substrings in the name.

Each RegEx pattern matches against the full string databaseName.CollectionName, for example:

Examples

Ingest data into the data lake

The following example creates a job to ingest data from MongoDB into a table in the data lake.

CREATE SYNC JOB load_raw_data_from_mongodb
AS COPY FROM MONGODB my_mongodb_connection
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

Ingest data with additional options

The above example shows you how to create a job using minimal code, however, you can use job and source options to enhance your job. In the following example, additional options are included:

CREATE SYNC JOB load_raw_data_from_mongodb
  COLUMN_TRANSFORMATIONS = (email = MD5(email))
  COMMENT = 'Ingest CDC data from MongoDB'
AS COPY FROM MONGODB my_mongodb_connection
  COLLECTION_INCLUDE_LIST = ('sales.customers', 'sales.orders')
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

The COLUMN_TRANSFORMATION job option is included to mask the value in the email column to protect PII. Furthermore, the COLLECTION_INCLUDE_LIST source option limits the ingested data to the customers and orders data in the sales database.

Last updated