Upsolver
Contact Support
  • Welcome to Upsolver
  • Getting Started
    • Start using Upsolver for free
    • Get started as a Upsolver user
      • Upsolver in 5 minutes
        • Upsolver Quickstart in 5 minutes
          • Additional sandbox fun
        • Amazon Athena data output
        • MySQL (AWS RDS) data output
        • Local MySQL data output
      • Upsolver free training
        • Introduction to Upsolver
          • Transform and write data to Amazon Athena
          • Pre-aggregate data for efficiency and performance
          • UPSERT streaming data to Amazon Athena
      • Prerequisites for AWS deployment
      • AWS integration
      • Deploy Upsolver on your AWS account
      • Prerequisites for Azure Deployment
      • Azure Integration
        • Prerequisites for Azure Users
        • Log into Upsolver
        • Log into Azure & Authenticate
        • Set Up and Deploy Azure Resources
        • Delegate Resource Group, and Deploy Upsolver in Azure
        • Integrate Azure with Upsolver
    • Upsolver concepts
      • Deployment models
      • Upsolver components
      • Data ingestion
    • Upsolver Amazon AWS deployment guide
      • Private VPC
      • Upsolver VPC
      • AWS role permissions
      • VPC peering
    • Tutorials and FAQ
      • Tutorials
        • How To Re-process Data
        • Create an Amazon S3 data source
        • Create an Amazon Athena data output
        • Join multiple data streams for real-time analytics
        • Use Upsolver to index less data into Splunk
        • Upsert and delete use case
        • AWS S3 to Athena use case
        • Merge data use case
        • Full vs. Partial Inbound Data Records
      • FAQ
      • Infrastructure
        • What is a dry-run cluster?
    • Glossary
      • Language guide
        • SQL syntax reference
        • Functions
          • Aggregation Functions
            • APPROX_COUNT_DISTINCT
            • APPROX_COUNT_DISTINCT_EACH
            • AVG
            • AVG_EACH
            • AVG_TIME_SERIES
            • COLLECT_SET
            • COLLECT_SET_EACH
            • COUNT
            • COUNT(*)
            • COUNT_DISTINCT
            • COUNT_EACH
            • COUNT_IF
            • DECAYED_SUM
            • DYNAMIC_SESSIONS
            • FIRST
            • FIRST_ARRAY
            • FIRST_EACH
            • FIRST_TIME_SERIES
            • LAST
            • LAST_ARRAY
            • LAST_EACH
            • LAST_K
            • LAST_K_EACH
            • LAST_TIME_SERIES
            • MAX
            • MAX_BY
            • MAX_EACH
            • MAX_TIME_SERIES
            • MIN
            • MIN_BY
            • MIN_EACH
            • MIN_TIME_SERIES
            • SESSION_COUNT
            • STD_DEV
            • STD_DEV_EACH
            • STRING_MAX_EACH
            • STRING_MIN_EACH
            • SUM
            • SUM_EACH
            • SUM_TIME_SERIES
            • WEIGHTED_AVERAGE
          • Calculated functions
            • Aerospike functions
            • Array functions
            • Conditional functions
            • Date functions
            • External API functions
            • Filter functions
            • Numeric functions
            • Spatial functions
            • String functions
            • Structural functions
              • ZIP
            • Type conversion functions
      • Data formats
      • Data types and features
      • Database output options
      • Upsolver shards
      • Permissions list
      • Index
    • Troubleshooting
      • My CloudFormation stack failed to deploy
      • My private API doesn't start or I can't connect to it
        • Elastic IPs limit reached
        • EC2 Spot Instance not running
        • DNS cache
        • Security group not open
      • My compute cluster doesn't start
      • I can't connect to my Kafka cluster
      • I can't create an S3 data source
      • Data doesn't appear in Athena table
      • I get an exception when querying my Athena table
      • Unable to define a JDBC (Postgres) connection
  • Connecting data sources
    • Amazon AWS data sources
      • Amazon S3 data source
        • Quick guide: S3 data source
        • Full guide: S3 data source
      • Amazon Kinesis Stream data source
      • Amazon S3 over SQS data source
      • Amazon AppFlow data source
        • Setup Google Analytics client ID and client secret.
    • Microsoft Azure data sources
      • Azure Blob storage data source
      • Azure Event Hubs data source
    • Kafka data source
    • Google Cloud Storage data source
    • File upload data source
    • CDC data sources (Debezium)
      • MySQL CDC data source
        • Binlog retention in MySQL
      • PostgreSQL CDC database replication
    • JDBC data source
    • HDFS data source
    • Data source UI
    • Data source properties
  • Data outputs and data transformation
    • Data outputs
      • Amazon AWS data outputs
        • Amazon S3 data output
        • Amazon Athena data output
          • Quick guide: Athena data output
          • Full guide: Athena data output
          • Output all data source fields to Amazon Athena
        • Amazon Kinesis data output
        • Amazon Redshift data output
        • Amazon Redshift Spectrum data output
          • Connect Redshift Spectrum to Glue Data Catalog
        • Amazon SageMaker data output
      • Data lake / database data outputs
        • Snowflake data output
          • Upsert data to Snowflake
        • MySQL data output
        • PostgreSQL data output
        • Microsoft SQL Server data output
        • Elasticsearch data output
        • Dremio
        • PrestoDB
      • Upsolver data output
      • HDFS data output
      • Google Storage data output
      • Microsoft Azure Storage data output
      • Qubole data output
      • Lookup table data output
        • Lookup table alias
        • API Playground
        • Serialization of EACH aggregations
      • Kafka data output
    • Data transformation
      • Transform with SQL
        • Mapping data to a desired schema
        • Transforming data with SQL
        • Aggregate streaming data
        • Query hierarchical data
      • Work with Arrays
      • View outputs
      • Create an output
        • Modify an output in SQL
          • Quick guide: SQL data transformation
        • Add calculated fields
        • Add filters
        • Add lookups
          • Add lookups from data sources
          • Add lookups from lookup tables
          • Adding lookups from reference data
        • Output properties
          • General output properties
      • Run an output
      • Edit an output
      • Duplicate an output
      • Stop an output
      • Delete an output
  • Guide for developers
    • Upsolver REST API
      • Create a data source
      • Modify a data source
      • API content formats
    • CI/CD on Upsolver
  • Administration
    • Connections
      • Amazon S3 connection
      • Amazon Kinesis connection
      • Amazon Redshift connection
      • Amazon Athena connection
      • Amazon S3 over SQS connection
      • Google Storage connection
      • Azure Blob storage connection
      • Snowflake connection
      • MySQL connection
      • Elasticsearch connection
      • HDFS connection
      • Qubole connection
      • PostgreSQL connection
      • Microsoft SQL Server connection
      • Spotinst Private VPC connection
      • Kafka connection
    • Clusters
      • Cluster types
        • Compute cluster
        • Query cluster
        • Local API cluster
      • Monitoring clusters
      • Cluster tasks
      • Cluster Elastic IPs
      • Cluster properties
      • Uploading user-provided certificates
    • Python UDF
    • Reference data
    • Workspaces
    • Monitoring
      • Credits
      • Delays In Upsolver pipelines
      • Monitoring reports
        • Monitoring system properties
        • Monitoring metrics
    • Security
      • IAM: Identity and access management
        • Manage users
        • Manage groups
        • Manage policies
      • Git integration
      • Single sign-on with SAML
        • Microsoft Azure AD with SAML sign-on
        • Okta with SAML sign-on
        • OneLogin with SAML sign-on
      • AMI security updates
  • Support
    • Upsolver support portal
  • Change log
  • Legal
Powered by GitBook
On this page
  • CDC Data Sources
  • Supported Data Outputs

Was this helpful?

  1. Connecting data sources

CDC data sources (Debezium)

This article provides an introduction to how Upsolver works with CDC (Change Data Capture) data sources.

PreviousFile upload data sourceNextMySQL CDC data source

Last updated 3 years ago

Was this helpful?

What is Debezium?

Debezium is an open source distributed platform for change data capture. To use it, start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Upsolver currently uses Debezium v1.4.

CDC Data Sources

Upsolver supports ingesting CDC data from relational databases such as MySQL, MariaDB and PostgreSql. Upsolver provides CDC capabilities by running a under the hood to connect to database journals. The connectors automatically detect and ingest any change.

Event Format

Upsolver will read Debezium Change events with the following fields:

  • before -The state of the row before the change that was applied in the current event. This can be null if this row is new.

  • after - The state of the row after the change that was applied in the current event.

  • source - Information about the change event. Including things such as what binlog file it came from, and it's sequence number or position within the file. The Source Table and Database will also be here.

  • op - The change type. The options are:

    • r - Read events (when loading the initial data)

    • c - Create

    • u - Update

    • d - Delete

Example 1:

{
    "ts_ms": 1617525879250,
    "op": "c",
    "after": {
        "id": "188283-21202",
        "cost": 4.2,
        "item_id": 10
    },
    "source": {
        "name": "debezium",
        "db": "prod",
        "row": 0,
        "server_id": 0,
        "snapshot": "true",
        "table": "sales",
        "version": "1.4.2.Final",
        "ts_ms": 0,
        "file": "mysql-bin-changelog.008019",
        "pos": 156,
        "connector": "mysql"
    }
}

This example event represents a new row being added to the table. You can tell this by the op type being c. In this case we added a new sale to the sales table in the prod database (this can be seen in the source information).

Example 2:

{
    "ts_ms": 1617525879252,
    "op": "u",
    "before": {
        "id": "188283-21202",
        "cost": 4.2,
        "item_id": 10
    },
    "after": {
        "id": "188283-21202",
        "cost": 5,
        "item_id": 10
    },
    "source": {
        "name": "debezium",
        "db": "prod",
        "row": 0,
        "server_id": 0,
        "snapshot": "true",
        "table": "sales",
        "version": "1.4.2.Final",
        "ts_ms": 0,
        "file": "mysql-bin-changelog.008019",
        "pos": 157,
        "connector": "mysql"
    }
}

In this case we received an updated event for the event in Example 1. We can see the old values in before and the new updated values in after.

Supported Databases

Currently the following databases and versions are supported:

Database

Version

AWS RDS Supported?

MySQL

5.6+

Yes

PostgreSQL

10, 11, and 12

Yes

Some databases may require specific journal configurations to be used. See the documentation page for creating a CDC data source for your database for info.

MySQL insert example

/{
  "time": "2021-11-18 10:33:45",
  "data": {
    "operation": "insert",
    "database_name": "prod",
    "table_name": "sales",
    "full_table_name": "prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "metadata": {
      "binlog_file_name": "mysql-bin-changelog.030565",
      "binlog_file_position": 749,
      "binlog_row": 0,
      "from_snapshot": false,
      "binlog_timestamp": 1637224425000,
      "is_delete": false
    }
  }
}

MySQL update example

{
  "time": "2021-11-18 10:33:45",
  "data": {
    "operation": "update",
    "database_name": "prod",
    "table_name": "sales",
    "full_table_name": "prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "old_row": {
      "id": "188283-21202",
      "cost": 3,
      "item_id": 10
    },
    "metadata": {
      "binlog_file_name": "mysql-bin-changelog.030565",
      "binlog_file_position": 749,
      "binlog_row": 0,
      "from_snapshot": false,
      "binlog_timestamp": 1637224425000,
      "is_delete": false
    }
  }
}

Postgres insert example

{
  "time": "2021-11-18 09:38:00",
  "data": {
    "operation": "insert",
    "database_name": "postgres",
    "schema_name": "prod",
    "table_name": "sales",
    "full_table_name": "postgres.prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "metadata": {
      "lsn": 2032660385960,
      "from_snapshot": false,
      "binlog_timestamp": 1637221080616,
      "is_delete": false,
      "is_heartbeat": false
    }
  }
}

Postgres update example

{
  "time": "2021-11-18 09:38:00",
  "data": {
    "operation": "update",
    "database_name": "postgres",
    "schema_name": "prod",
    "table_name": "sales",
    "full_table_name": "postgres.prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "old_row": {
      "id": "188283-21202",
      "cost": 3,
      "item_id": 10
    },
    "metadata": {
      "lsn": 2032660385960,
      "from_snapshot": false,
      "binlog_timestamp": 1637221080616,
      "is_delete": false,
      "is_heartbeat": false
    }
  }
}

Supported Data Outputs

Upsolver currently supports the following data outputs (for database replications).

  • Ahana

  • Amazon Athena

  • Dremio

  • Hive Metastore

  • PrestoDB

  • Qubole

  • Redshift Spectrum

  • Starburst

  • Upsolver Query

Debezium Engine