CDC data sources (Debezium)

This article provides an introduction to how Upsolver works with CDC (Change Data Capture) data sources.

What is Debezium?

Debezium is an open source distributed platform for change data capture. To use it, start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Upsolver currently uses Debezium v1.4.

CDC Data Sources

Upsolver supports ingesting CDC data from relational databases such as MySQL, MariaDB and PostgreSql. Upsolver provides CDC capabilities by running a Debezium Engine under the hood to connect to database journals. The connectors automatically detect and ingest any change.

Event Format

Upsolver will read Debezium Change events with the following fields:

  • before -The state of the row before the change that was applied in the current event. This can be null if this row is new.

  • after - The state of the row after the change that was applied in the current event.

  • source - Information about the change event. Including things such as what binlog file it came from, and it's sequence number or position within the file. The Source Table and Database will also be here.

  • op - The change type. The options are:

    • r - Read events (when loading the initial data)

    • c - Create

    • u - Update

    • d - Delete

Example 1:

{
    "ts_ms": 1617525879250,
    "op": "c",
    "after": {
        "id": "188283-21202",
        "cost": 4.2,
        "item_id": 10
    },
    "source": {
        "name": "debezium",
        "db": "prod",
        "row": 0,
        "server_id": 0,
        "snapshot": "true",
        "table": "sales",
        "version": "1.4.2.Final",
        "ts_ms": 0,
        "file": "mysql-bin-changelog.008019",
        "pos": 156,
        "connector": "mysql"
    }
}

This example event represents a new row being added to the table. You can tell this by the op type being c. In this case we added a new sale to the sales table in the prod database (this can be seen in the source information).

Example 2:

{
    "ts_ms": 1617525879252,
    "op": "u",
    "before": {
        "id": "188283-21202",
        "cost": 4.2,
        "item_id": 10
    },
    "after": {
        "id": "188283-21202",
        "cost": 5,
        "item_id": 10
    },
    "source": {
        "name": "debezium",
        "db": "prod",
        "row": 0,
        "server_id": 0,
        "snapshot": "true",
        "table": "sales",
        "version": "1.4.2.Final",
        "ts_ms": 0,
        "file": "mysql-bin-changelog.008019",
        "pos": 157,
        "connector": "mysql"
    }
}

In this case we received an updated event for the event in Example 1. We can see the old values in before and the new updated values in after.

Supported Databases

Currently the following databases and versions are supported:

Database

Version

AWS RDS Supported?

MySQL

5.6+

Yes

PostgreSQL

10, 11, and 12

Yes

Some databases may require specific journal configurations to be used. See the documentation page for creating a CDC data source for your database for info.

MySQL insert example

/{
  "time": "2021-11-18 10:33:45",
  "data": {
    "operation": "insert",
    "database_name": "prod",
    "table_name": "sales",
    "full_table_name": "prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "metadata": {
      "binlog_file_name": "mysql-bin-changelog.030565",
      "binlog_file_position": 749,
      "binlog_row": 0,
      "from_snapshot": false,
      "binlog_timestamp": 1637224425000,
      "is_delete": false
    }
  }
}

MySQL update example

{
  "time": "2021-11-18 10:33:45",
  "data": {
    "operation": "update",
    "database_name": "prod",
    "table_name": "sales",
    "full_table_name": "prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "old_row": {
      "id": "188283-21202",
      "cost": 3,
      "item_id": 10
    },
    "metadata": {
      "binlog_file_name": "mysql-bin-changelog.030565",
      "binlog_file_position": 749,
      "binlog_row": 0,
      "from_snapshot": false,
      "binlog_timestamp": 1637224425000,
      "is_delete": false
    }
  }
}

Postgres insert example

{
  "time": "2021-11-18 09:38:00",
  "data": {
    "operation": "insert",
    "database_name": "postgres",
    "schema_name": "prod",
    "table_name": "sales",
    "full_table_name": "postgres.prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "metadata": {
      "lsn": 2032660385960,
      "from_snapshot": false,
      "binlog_timestamp": 1637221080616,
      "is_delete": false,
      "is_heartbeat": false
    }
  }
}

Postgres update example

{
  "time": "2021-11-18 09:38:00",
  "data": {
    "operation": "update",
    "database_name": "postgres",
    "schema_name": "prod",
    "table_name": "sales",
    "full_table_name": "postgres.prod.sales",
    "primary_key": "188283-21202",
    "row": {
      "id": "188283-21202",
      "cost": 5,
      "item_id": 10
    },
    "old_row": {
      "id": "188283-21202",
      "cost": 3,
      "item_id": 10
    },
    "metadata": {
      "lsn": 2032660385960,
      "from_snapshot": false,
      "binlog_timestamp": 1637221080616,
      "is_delete": false,
      "is_heartbeat": false
    }
  }
}

Supported Data Outputs

Upsolver currently supports the following data outputs (for database replications).

  • Ahana

  • Amazon Athena

  • Dremio

  • Hive Metastore

  • PrestoDB

  • Qubole

  • Redshift Spectrum

  • Starburst

  • Upsolver Query

Last updated