Performing Snapshots

The article describes the process for performing a snapshot of your CDC database.

Initial snapshots in Upsolver

By default, when you create a job to ingest data from a CDC-enabled database, Upsolver performs an initial snapshot. There are two ways this initial snapshot can be performed:

  1. The default is for Upsolver to perform an incremental snapshot of your database. This mechanism is a phased approach that runs in parallel to the streaming data capture, meaning you don’t need to wait for the full snapshot to complete before streaming can begin.

  2. Alternatively, you can take a full snapshot of the tables or collections in the database and ingest them into the staging table before it continues to listen for change events. One option for speeding up this process is to perform a parallel snapshot.

Upsolver embeds the Debezium engine, an open-source framework for handling change data capture. The schema is stored on Amazon S3, and the data is stored in a table, enabling it to have a retention policy limiting how long the data is stored.

Incremental snapshot

An incremental snapshot is a phased approach, that enables events to be streamed in parallel to the snapshot, rather than waiting for the full snapshot to finish.

When you run an incremental snapshot, the table is split into chunks. The chunk size, set by Upsolver, defines the number of rows that the snapshot collects during a fetch operation on the database. Working chunk by chunk, each row in the chunk is captured and a READ event is emitted. The event represents the value of the row when the snapshot for the chunk began. When a chunk has been snapshotted, events for the rows within that chunk can be streamed and captured while the next chunk is snapshotted, and so on.

Limitations

Upsolver does not support primary keys that are not primitives, e.g. anything that Debezium interprets as a record, array, map, or byte array.

Full snapshot

The full snapshot process captures the structure and data of the tables in the source database in a single process, prior to listening for change events. A full copy of your source tables is created in your data lake, before you begin to stream the most recent change events.

Schema capture

After connecting to your database, Debezium determines the tables to be captured. By default, all non-system table schemas are captured, including tables that were not configured for capture. Snapshotting the entire database schema means the event data in tables not currently enabled for CDC can be quickly and easily added to the process at a later date. To reduce the initial snapshot time and limit the tables included in this process, you can use either the TABLE_INCLUDE_LIST or COLLECTION_INCLUDE_LIST source option in your job, depending on your database platform.

PostgreSQL tables that aren't included in the publication will not be part of the snapshot, so you don't need to add the TABLE_INCLUDE_LIST option to your job.

Let’s look at a simple example using Microsoft SQL Server.

Imagine you have a Sales database, with three tables, Customers, Products, and Orders, with CDC enabled on Products and Orders. By default, during the initial snapshot, the table schemas for all three tables will be captured and stored in the Debezium internal database. So if you created the following job, it would capture the schema for the three tables:

CREATE SYNC JOB load_sales_data_from_mssql
AS COPY FROM my_mssql_connection
INTO default_glue_catalog.upsolver_samples.sales_data;

To prevent a full schema capture and limit this to the Products and Orders tables, you can add the TABLE_INCLUDE_LIST source option when you create your job:

CREATE SYNC JOB load_sales_data_from_mssql
AS COPY FROM my_mssql_connection
INTO default_glue_catalog.upsolver_samples.sales_data
TABLE_INCLUDE_LIST = ('dbo.Products', 'dbo.Orders');

If you later want to add the Customers table to your job, you would need to run an alter statement:

ALTER JOB load_sales_data_from_mssql
SET TABLE_INCLUDE_LIST = ('dbo.Products', 'dbo.Orders', 'dbo.Customers');

Having identified the tables, a lock is placed on each source table to ensure no schema changes are applied during the initial snapshotting process. Then the Debezium engine records the maximum log sequence number (LSN) in the database transaction log to create a marker point for capturing the data when the schema capture process completes.

While the locks are in place, the connector gathers the structure of the tables and records this in its internal history database. When this step is completed, the locks are released, and the tables can now be written to by other database clients.

Data capture

The next step is to capture the data. The connector scans the data at the LSN position previously taken when the table locks were established and performs the following tasks:

  • The connector checks each table to confirm it was created before the snapshot began: tables created after the snapshot are skipped.

  • When the snapshot process is complete, the connector converts to streaming and emits change events for tables created after the snapshot process began.

  • The connector generates a read event for each row that is captured from a table. All read events contain the same LSN, which is the position acquired at the previous step when the table locks were taken.

  • It then sends each read event to the Kafka topic for the table. Any logs appearing after this maximum LSN are captured by the ongoing streaming process.

Parallel snapshots

When you create your ingestion job, you can use the SNAPSHOT_PARALLELISM option to configure how many snapshots are performed concurrently. The more snapshots that are performed, the faster to have all tables streaming. However, doing more snapshots in parallel increases the load on the source database.

Let's look at a simple example using MySQL as the source database.

Returning to our Sales database example, this time, rather than the three tables we had previously, imagine the database contains 35 tables. The SNAPSHOT_PARALLELISM option is included and configured to perform 5 snapshots in parallel (rather than the default of 1):

CREATE SYNC JOB load_sales_data_from_mysql
    SNAPSHOT_PARALLELISM = 5
AS COPY FROM my_mysql_connection
INTO default_glue_catalog.upsolver_samples.sales_raw_data;

Table re-snapshot

The syntax for re-snapshotting a table is as follows:

ALTER JOB <cdc_job_name> 
RESNAPSHOT TABLE <full_table_name>

Collection re-snapshot

For jobs ingesting from MongoDB, use the following syntax to re-snapshot the collection:

ALTER JOB <cdc_job_name> 
RESNAPSHOT COLLECTION <collection_name>

Job options

Please see the specific job options and further examples for your source database:

Last updated