LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
How To Guides
How To Guides
  • How To Guides
  • SETUP
    • Deploy Upsolver on AWS
      • Deployment Guide
      • AWS Role Permissions
      • VPC Peering Guide
      • Role-Based AWS Credentials
    • Enable API Integration
    • Install the Upsolver CLI
  • CONNECTORS
    • Create Connections
      • Amazon Kinesis
      • Amazon Redshift
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • ClickHouse
      • Confluent Cloud
      • Elasticsearch
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • PostgreSQL
      • Snowflake
      • Tabular
    • Configure Access
      • Amazon Kinesis
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • Confluent Kafka
    • Enable CDC
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • PostgreSQL
  • JOBS
    • Basics
      • Real-time Data Ingestion — Amazon Kinesis to ClickHouse
      • Real-time Data Ingestion — Amazon S3 to Amazon Athena
      • Real-time Data Ingestion — Apache Kafka to Amazon Athena
      • Real-time Data Ingestion — Apache Kafka to Snowflake
    • Advanced Use Cases
      • Build a Data Lakehouse
      • Enriching Data - Amazon S3 to ClickHouse
      • Joining Data — Amazon S3 to Amazon Athena
      • Upserting Data — Amazon S3 to Amazon Athena
      • Aggregating Data — Amazon S3 to Amazon Athena
      • Managing Data Quality - Ingesting Data with Expectations
    • Database Replication
      • Replicate CDC Data into Snowflake
      • Replicate CDC Data to Multiple Targets in Snowflake
      • Ingest Your Microsoft SQL Server CDC Data to Snowflake
      • Ingest Your MongoDB CDC Data to Snowflake
      • Handle PostgreSQL TOAST Values
    • VPC Flow Logs
      • Data Ingestion — VPC Flow Logs
      • Data Analytics — VPC Flow Logs
    • Job Monitoring
      • Export Metrics to a Third-Party System
    • Data Observability
      • Observe Data with Datasets
  • DATA
    • Query Upsolver Iceberg Tables from Snowflake
  • APACHE ICEBERG
    • Analyze Your Iceberg Tables Using the Upsolver CLI
    • Optimize Your Iceberg Tables
Powered by GitBook
On this page
  • Prerequisites
  • Step 1
  • Connect to MongoDB
  • Step 2
  • Create a staging table to store the CDC data
  • Step 3
  • Create an ingestion job
  • Step 4
  • View the job status
  • Step 5
  • View the CDC data in the staging table
  • Step 6
  • Connect to Snowflake
  • Step 7
  • Create a transformation job
  • Conclusion
  • Try it yourself
  1. JOBS
  2. Database Replication

Ingest Your MongoDB CDC Data to Snowflake

Learn how to ingest your MongoDB CDC data to Snowflake

Last updated 11 months ago

In this guide, you will learn how to ingest data from your MongoDB database into a staging table in the data lake. Then you will understand how to monitor the snapshotting process, before creating a transformation job to load the data into Snowflake.

This example uses a Snowflake database as the target, however, the process for writing to is almost identical.

Please see the jobs reference for the relevant syntax and options for your chosen target.

Prerequisites

Before you ingest data into Upsolver, you must enable change data capture on your MongoDB database. If you are using a managed MongoDB service such as Atlas, CDC is most likely to be enabled, if not, please refer to the guide to for more information.

The steps for ingesting your CDC data are as follows:

  1. Connect to MongoDB

  2. Create a staging table to store the CDC data

  3. Create an ingestion job

  4. View the job status to check the snapshotting process

  5. View the CDC data in the staging table

  6. Connect to Snowflake

  7. Create a transformation job

Step 1

Connect to MongoDB

First, we will create a connection to the MongoDB database from which you want to ingest your CDC data. You will need the connection string to your database, and the username and password. Ensure your login has appropriate credentials for reading from the change data capture collections.

Here's the code:

CREATE MONGODB CONNECTION my_mongodb_connection
  CONNECTION_STRING = 'mongodb+srv://upsolver.example.mongodb.net/salesdb'
  USER_NAME = '<user_name>'
  PASSWORD = '<password>'
  COMMENT = 'Connection to MongoDB sales database';

Step 2

Create a staging table to store the CDC data

Now that you have connected to your source database, the next step is to create a table in the data lake to stage the CDC data.

Here's the code to create the staging table:

CREATE TABLE default_glue_catalog.upsolver_samples.sales_raw_data()
    PARTITIONED BY $event_date;

Let's understand what this code does.

Firstly, a table named sales_raw_data is created in the upsolver_samples database. The open brackets with no defined columns instruct Upsolver to infer the columns and types during data ingestion. This is helpful if you are unsure of the data in the source and want Upsolver to manage type changes and schema updates. In this example, we will ingest data from multiple collections into one staging table and don't need to worry about schemas, as Upsolver handles this for us.

Upsolver recommends partitioning by the system column $event_date or another date column in order to optimize your query performance. The $event_date column is added by default as a system column, along with $event_time, which will be used later when you create your transformation job. You can view all the system columns that Upsolver adds to the tables in your default glue catalog, by expanding the table name in the Entities tree in Upsolver, and then expanding SYSTEM COLUMNS.

Learn More

Step 3

Create an ingestion job

After defining the staging table, the next step is to create an ingestion job to copy in the CDC data.

Here's the code to create the ingestion job:

CREATE SYNC JOB load_raw_data_from_mongodb
    COMMENT = 'Ingest sales data from MongoDB'
AS COPY FROM MONGODB my_mongodb_connection
    COLLECTION_INCLUDE_LIST = ('sales.orders', 'sales.products', 'sales.customers')
INTO default_glue_catalog.upsolver_samples.sales_raw_data; 

Let's take a look at what this code does.

A job named load_raw_data_from_mongodb is created with an optional comment that you can use to describe the purpose of your job. Other users in your organization can see comments, so it is useful to provide a comment to describe the purpose of the job.

Ingestion jobs use the COPY FROM command to copy source data to the target, in this case, the sales_raw_data table in the AWS Glue Data Catalog, using the my_mongodb_connection connection.

In our example, the COLLECTION_INCLUDE_LIST source option instructs the job to ingest from the orders, products, and customers collections, and ignore everything else.

Step 4

View the job status

When you create your CDC job, Upsolver takes a snapshot of each of the included collections prior to the streaming process. Optionally you can use the SKIP_SNAPSHOTS job option to ignore this step, but usually, you want to ingest all historical data.

You can check the status of the snapshotting process by clicking on Jobs from the main menu on the left-hand side of the Upsolver UI. Then, click on the job you created, e.g. load_raw_data_from_mongodb, and the job page displays each collection and status, e.g. Pending, Snapshotting, or Streaming.

After the snapshot process has been completed and all collections are streaming, you can continue to use the job page to monitor and troubleshoot your job.

Learn More

Step 5

View the CDC data in the staging table

During the snapshotting process, Upsolver reads the column names and types from the CDC collections in the source database and creates a corresponding column in the staging table. Appended to your CDC columns are system information columns, including the source database and collection names. Additional columns include the binlog timestamp when the change was committed on the source database and an $is_delete column.

Prior to creating a transformation job to load data into the target, it is good practice to check the data in the staging table. Depending on the volume of data you are ingesting, it can take some time for the data to appear.

Here's the code:

SELECT * 
FROM default_glue_catalog.upsolver_samples.sales_raw_data
LIMIT 50; -- return 50 rows to view a small sample of the ingested data 

Confirm your data is as expected, before moving on to the next steps of creating a transformation job to load the data into your target.

Step 6

Connect to Snowflake

Next, we want to create a connection to the target database, in this case, Snowflake. Create a persistent connection that is shared with other users in your organization as follows.

Here's the code:

CREATE SNOWFLAKE CONNECTION my_snowflake_connection
    CONNECTION_STRING = 
        'jdbc:snowflake://snowflakedemo.us-east-1.snowflakecomputing.com?
         db=DEMO_DB&warehouse=DEMO_WH&role=ADMIN'
    USER_NAME = '<user_name>'
    PASSWORD = '<password>';

Step 7

Create a transformation job

Now that you have a connection to Snowflake, you can load your data using a transformation job. If you haven't already done so, create a target table in Snowflake.

Here's the code to create a table in Snowflake:

CREATE OR REPLACE TABLE DEMO_DB.SALES.WEB_ORDERS(
	ORDER_ID VARCHAR(16777216),
	CUSTOMER_ID VARCHAR(16777216),
	CUSTOMER_NAME VARCHAR(16777216),
	NET_TOTAL FLOAT,
	ORDER_DATE DATE NOT NULL
);

You can adapt this script to create additional tables for products and customers, or extend the columns in the table definition to load all data and transform it in Snowflake.

Next, create a transformation job to replicate your CDC data to the target table.

Here's the code:

CREATE SYNC JOB insert_web_orders_into_snowflake
    START_FROM = BEGINNING
    RUN_INTERVAL = 5 MINUTE
AS INSERT INTO SNOWFLAKE my_snowflake_connection.SALES.WEB_ORDERS 
  MAP_COLUMNS_BY_NAME
    SELECT 
        custid AS CUSTOMER_ID,
        orderid AS ORDER_ID,  
        first_name || ' ' || last_name AS CUSTOMER_NAME,
        nettotal AS NET_TOTAL,
        order_date AS ORDER_DATE           
    FROM default_glue_catalog.upsolver_samples.sales_raw_data 
    WHERE $event_time BETWEEN run_start_time() AND run_end_time();

Let's understand what this job does.

This code creates a job named insert_web_orders_into_snowflake and includes a couple of job options: START_FROM instructs the job to replicate all historical data by specifying the BEGINNING parameter, while RUN_INTERVAL tells Upsolver that this job should execute every 5 MINUTES.

The job inserts the data into the WEB_ORDERS table in the SALES schema in Snowflake. We don't need to specify the database (DEMO_DB) here because this is included in the connection string.

The MAP_COLUMNS_BY_NAME option maps each column in the SELECT statement to the column with the same name in the target table. This is helpful as the job, therefore, does not map the columns based on ordinality: if you compare the order of the columns in the script that creates the table with the order of the columns in the SELECT statement of the job, you'll notice that CUSTOMER_ID and ORDER_ID are in different positions.

The SELECT statement specifies which columns will be loaded into the target, and the alias names enable column mapping by name. A string function has been used to concatenate the customer's first and last names into the CUSTOMER_NAME column.

In the WHERE clause, all rows that have an $event_time that is between the start and end time interval of the job will be included in the load. The $event_time system column is populated with a timestamp when the data lands in the staging table.


Conclusion

In this guide you learned how to connect to your MongoDB database and ingest your change data capture collection into a staging table in the data lake. Then, you checked the status of your collections during the snapshotting process and saw how to view the ingested data in the staging table. Furthermore, you discovered how to write a transformation job to copy the data from your staging table to your target table in Snowflake.

Try it yourself

To ingest your CDC data from MongoDB:

  1. View the CDC data in the staging table


Learn More

To learn more about table options and data retention, please see .

Discover how to monitor the performance of your jobs and troubleshoot problems using the reference.

Upsolver includes a wide range of and that you can use to create advanced use cases with your jobs and customize your data to suit requirements.

Create a connection to your CDC-enabled database

Create a to store your data prior to loading it to the target

your change data capture to the data lake

View the to check the snapshotting process

Connect to your destination

Write a job to replicate the data to your target

Please see the SQL command reference for for the full list of connection options, and examples.

Deploy a Replica Set
other destinations
Transformation
CREATE TABLE
Job Status
Functions
Operators
MongoDB
staging table
Ingest
job status
transformation
Snowflake
MongoDB