LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
Reference
Reference
  • Reference
  • ✨Learning Paths
    • Amazon Kinesis
    • Amazon S3
    • Apache Kafka
    • Confluent Cloud
    • Microsoft SQL Server
    • MongoDB
    • MySQL
    • PostgreSQL
  • SQL COMMANDS
    • Clusters
      • CREATE CLUSTER
      • ALTER CLUSTER
      • ROLL CLUSTER
      • STOP/START CLUSTER
      • DROP CLUSTER
    • Connections
      • CREATE CONNECTION
        • Amazon Kinesis
        • Amazon Redshift
        • Amazon S3
        • Apache Kafka
        • AWS Glue Data Catalog
        • ClickHouse
        • Confluent Cloud
        • Elasticsearch
        • Hive Metastore
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • Polaris Catalog
        • Iceberg REST catalogs
        • PostgreSQL
        • Snowflake
      • ALTER CONNECTION
      • DROP CONNECTION
      • CDC Connections with SSL
    • Jobs
      • CREATE JOB
        • Ingestion
          • Amazon Kinesis
          • Amazon S3
          • Apache Kafka
          • Confluent Kafka
          • Content Types
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Replication
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Transformation
          • INSERT
            • MAP_COLUMNS_BY_NAME
            • UNNEST
          • MERGE
          • SELECT
          • Job Options
            • Amazon Redshift
            • Amazon S3
            • Apache Iceberg
            • ClickHouse
            • Data Lake Tables
            • Elasticsearch
            • PostgreSQL
            • Snowflake
        • Monitoring
          • Amazon CloudWatch
          • Datadog
          • Dynatrace
      • ALTER JOB
      • PAUSE / RESUME JOB
      • DROP JOB
    • Materialized Views
      • CREATE MATERIALIZED VIEW
      • ALTER MATERIALIZED VIEW
      • DROP MATERIALIZED VIEW
      • Join with a Materialized View
    • Tables
      • CREATE TABLE
      • ALTER TABLE
      • DROP TABLE
    • Iceberg Tables
      • Upsolver Managed Tables
        • CREATE ICEBERG TABLE
        • ALTER ICEBERG TABLE
        • DROP ICEBERG TABLE
        • OPTIMIZE ICEBERG TABLE
      • External Iceberg Tables
        • CREATE EXTERNAL ICEBERG TABLE
        • DROP EXTERNAL ICEBERG TABLE
      • Mirror Iceberg Tables
        • CREATE MIRROR
        • ALTER MIRROR
        • PAUSE / RESUME MIRROR
        • DROP MIRROR
  • FUNCTIONS & OPERATORS
    • Data Types
    • Functions
      • Aggregate
        • APPROX_COUNT_DISTINCT
        • APPROX_COUNT_DISTINCT_EACH
        • AVG
        • AVG_EACH
        • AVG_TIME_SERIES
        • COLLECT_SET
        • COLLECT_SET_EACH
        • COUNT
        • COUNT(*)
        • COUNT(DISTINCT ...)
        • COUNT_EACH
        • COUNT_IF
        • DYNAMIC_SESSIONS
        • FIRST
        • FIRST_ARRAY
        • FIRST_EACH
        • FIRST_TIME_SERIES
        • LAST
        • LAST_ARRAY
        • LAST_EACH
        • LAST_K
        • LAST_K_EACH
        • LAST_TIME_SERIES
        • MAX
        • MAX_BY
        • MAX_EACH
        • MAX_TIME_SERIES
        • MIN
        • MIN_BY
        • MIN_EACH
        • MIN_TIME_SERIES
        • SESSION_COUNT
        • STD_DEV
        • STD_DEV_EACH
        • STRING_MAX
        • STRING_MAX_EACH
        • STRING_MIN_EACH
        • SUM
        • SUM_EACH
        • SUM_TIME_SERIES
        • WEIGHTED_AVERAGE
      • Array
        • ARRAY_DISTINCT
        • ARRAY_JOIN
        • ARRAY_MAX
        • ARRAY_MIN
        • ARRAY_SORT
        • ARRAY_SORT_DESC
        • ARRAY_SUM
        • COUNT_VALUES_IF
        • COUNT_VALUES
        • ELEMENT_AT
        • FIRST_ELEMENT
        • LAST_ELEMENT
        • VALUE_INDEX_IN_ARRAY
        • VALUE_INDEX_IN_ROW
      • Comparison
        • GREATEST
        • LEAST
      • Conditional
        • COALESCE
        • IF_ELSE
        • NULL_IF
      • Date & Time
        • ADD_TIME_ZONE_OFFSET
        • DATE
        • DATE_ADD
        • DATE_DIFF
        • DATE_TRUNC
        • DAY
        • DAY_OF_WEEK
        • DAY_OF_YEAR
        • EXTRACT_TIMESTAMP
        • EXTRACT
        • FORMAT_DATETIME
        • FROM_ISO8601_DATE
        • FROM_UNIXTIME
        • HOUR
        • MILLISECOND
        • MINUTE
        • MONTH
        • QUARTER
        • RUN_END_TIME
        • RUN_START_TIME
        • SECOND
        • SUBTRACT_TIME_ZONE_OFFSET
        • TO_UNIX_EPOCH_MILLIS
        • TO_UNIX_EPOCH_SECONDS
        • TO_UNIXTIME
        • WEEK
        • YEAR_OF_WEEK
        • YEAR
      • Filter
        • IS_DUPLICATE
        • NOT
      • Interval
        • PARSE_DURATION
      • Mathematical
        • ABS
        • CBRT
        • CEIL
        • CEILING
        • DEGREES
        • EXP
        • FLOOR
        • GET_SHARD_NUMBER
        • LN
        • LOG
        • LOG2
        • LOG10
        • MOD
        • MODULO
        • POW
        • POWER
        • RADIANS
        • RAND
        • RANDOM
        • RECIPROCAL
        • ROUND
        • SIGN
        • SORT_VALUES
        • SQRT
        • TRUNCATE
      • Regular Expressions
        • REGEXP_EXTRACT
        • REGEXP_EXTRACT_ALL
        • REGEXP_LIKE
        • REGEX_MATCH_POSITION
        • REGEX_NAMED_GROUPS
        • REGEXP_REPLACE
      • Spatial
        • ST_DISTANCE
        • ST_WGS84_DISTANCE
        • WKT_SPATIAL_CONTAINS
        • WKT_SPATIAL_INTERSECT
      • String
        • BASE64_DECODE
        • BASE64_TO_HEX
        • BYTES_SUBSTRING
        • CONCAT
        • DATE
        • JOIN_ARRAYS
        • LENGTH
        • LOWER
        • LPAD
        • LTRIM
        • MD5
        • PARSE_DATETIME
        • REPLACE
        • REVERSE
        • RPAD
        • RTRIM
        • SHA1
        • SHA3_512
        • SHA256
        • SHA512
        • SORT_VALUES
        • SPLIT
        • SPLIT_TO_RECORD
        • STRING_FORMAT
        • STRIP_MARGIN
        • STRIP_PREFIX
        • STRIP_SUFFIX
        • STRPOS
        • SUBSTR
        • SUBSTRING
        • TRANSLATE
        • TRIM_CHARS
        • TRIM
        • UPPER
        • UUID_GENERATOR
        • XX_HASH
      • Structural
        • FROM_KEY_VALUE
        • GET_RANGE
        • JOIN_ALL_BY_KEY
        • JSON_PATH
        • JSON_TO_RECORD
        • MAP_WITH_INDEX
        • QUERY_STRING_TO_RECORD
        • RECORD_TO_JSON
        • SORT_BY
        • TO_ARRAY
        • ZIP_WITH_INDEX
        • ZIP
      • Trigonometric
        • COS
        • SIN
        • TAN
        • TANH
      • Type Conversion
        • CAST
        • CHR
        • DECIMAL_TO_HEX
        • HEX_TO_DECIMAL
        • TO_BIGINT
        • TO_DOUBLE
        • TO_STRING
      • URL
        • TOP_PRIVATE_DOMAIN
        • URL_DECODE
        • URL_ENCODE
        • URL_PARSER
    • Operators
      • Comparison
      • Conditional
        • CASE
      • Logical
      • Mathematical
      • String
  • MONITORING
    • Clusters
    • Datasets
      • Ingested Data
        • Column
      • Lineage
      • Data Violations
      • Statistics
      • Maintenance
        • Compactions
        • Expire Snapshots
        • Orphan Files
      • Columns
      • Partitions
      • Properties
    • Job Status
      • Stream & File Sources
        • Monitoring
        • Graphs
        • Lineage
        • Settings
      • CDC Sources
        • Monitoring
        • Replication Settings
        • Job Settings
    • System Catalog
      • Information Schema
        • Clusters
        • Columns
        • Connections
        • Jobs
        • Mirrors
        • Tables
        • Users
        • Views
      • Insights
        • job_output_column_stats
        • dataset_column_stats
      • Monitoring
        • CDC Status
        • Clusters
        • Expectations
        • Jobs
        • Partition Statistics
        • Recent Compactions
        • Running Queries
        • Table Statistics
      • Task Executions Table
  • GENERAL
    • Common SQL Syntax
    • View Entity Syntax
    • Keyboard Shortcuts
Powered by GitBook
On this page
  • Syntax
  • Job options
  • Source options
  • Examples
  • Ingest multiple tables
  • Disable the initial snapshot
  • Exclude columns from ingestion
  1. SQL COMMANDS
  2. Jobs
  3. CREATE JOB
  4. Ingestion

MySQL

Last updated 11 months ago

Syntax

CREATE JOB <job_name>
    [{ job_options }]
AS COPY FROM MYSQL <connection_identifier>
    [{ source_options }]
INTO <table_identifier>
[ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Jump to

Job options

The following job properties configure the behavior of the ingestion job.

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]  
[ DDL_FILTER = ('<ddl_expression>', ...) ]
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]   
[ PARSE_JSON_COLUMNS = { TRUE | FALSE } ]    
[ SKIP_SNAPSHOTS = { TRUE | FALSE } ]
[ SNAPSHOT_PARALLELISM = <integer> ]

Jump to

MySQL job options:

General job options:

See also:

DDL_FILTERS

Type: array[string]

Default: ''

(Optional) Comma-separated list of DDL expressions that the job will ignore when there are errors from the Debezium engine.

PARSE_JSON_COLUMNS

Type: Boolean

Default: false

If enabled, Upsolver will parse JSON columns into a struct matching the JSON value.

SKIP_SNAPSHOTS— editable

Type: Boolean

Default: false

(Optional) By default, snapshots are enabled for new tables. This means that Upsolver will take a full snapshot of the table(s) and ingest it into the staging table before it continues to listen for change events. When set to True, Upsolver will not take an initial snapshot and only process change events starting from the time the ingestion job is created.

In the majority of cases, when you connect to your source tables, you want to take a full snapshot and ingest it as the baseline of your table. This creates a full copy of the source table in your data lake before you begin to stream the most recent change events. If you skip taking a snapshot, you will not have the historical data in the target table, only the newly added or changed rows.

Skipping a snapshot is useful in scenarios where your primary database instance crashed or became unreachable, failing over to the secondary. In this case, you will need to re-establish the CDC connection but would not want to take a full snapshot because you already have all of the history in your table. In this case, you would want to restart processing from the moment you left off when the connection to the primary database went down.

SNAPSHOT_PARALLELISM

Type: int

Default: 1

(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, doing more snapshots in parallel increases the load on the source database.

Source options

The following data source properties configure how to replicate data from MySQL.

[ TABLE_INCLUDE_LIST = ('regexFilter1', 'regexFilter2') ]
[ COLUMN_EXCLUDE_LIST = ('regexFilter1', 'regexFilter2') ]

Jump to

TABLE_INCLUDE_LIST — editable

Type: text

Default: ''

By default, the connector captures changes in every non-system table in all databases. To match the name of a table, Upsolver applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the table. It does not match substrings that might be present in a table name.

Each RegEx pattern matches against the full string databaseName.tableName, for example:

RegEx Pattern
Results

db_name.*

Select all tables under db_name database

db_name.users, db_name.items

Selects tables users and items under db_name database

db1.items_.*

Selects all tables from db1, that start with items_

COLUMN_EXCLUDE_LIST — editable

Type: text

Default: ''

By default, the connector matches all columns of the tables listed in TABLE_INCLUDE_LIST. To match the name of a column, Upsolver applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name.

Each RegEx pattern matches against the full string databaseName.tableName.columnName, for example:

RegEx Pattern
Results

db.users.address_.*

Selects all of the columns that start with address_ in the users table of database db.

db.*.(.*_pii)

Selects all of the columns ending with _pii across all tables within db database.

Examples

Ingest multiple tables

The following job creates a synchronized job that replicates the data from the samples.orders and samples.customers tables into the target table in the data lake. Use the TABLE_INCLUDE_LIST source option to specify which data to ingest.

CREATE SYNC JOB replicate_mysql_tables
AS COPY FROM MYSQL upsolver_mysql_samples 
      TABLE_INCLUDE_LIST = ('samples.orders', 'samples.customers')
INTO default_glue_catalog.upsolver_samples.orders_raw_data;

Disable the initial snapshot

This example uses the SKIP_SNAPSHOTS option to instruct the job not to take a snapshot. Setting this value to TRUE ensures that only events that arrive from the time when the job starts, will be ingested. All historical data is ignored.

CREATE SYNC JOB copy_all_table_no_history_job
   SKIP_SNAPSHOTS = TRUE
AS COPY FROM MYSQL upsolver_mysql_samples
INTO default_glue_catalog.upsolver_samples.raw_cdc_tables;

Exclude columns from ingestion

You can use the TABLE_INCLUDE_LIST to specify the tables you want to ingest. In the example below, the job will copy data from the customers and orders tables. However, personally identifiable information is stored in these tables, and this data should not be held in the staging or target tables as this would violate privacy laws. The COLUMN_EXCLUDE_LIST option enables the columns containing sensitive information to be excluded from the ingestion and, in this case, credit_card and customer address data are ignored from the process. This option could equally be used to remove extraneous data.

CREATE SYNC JOB copy_table_exclude_cols
AS COPY FROM MYSQL upsolver_mysql_samples
     TABLE_INCLUDE_LIST = ('db.customers', 'db.orders')
     COLUMN_EXCLUDE_LIST = ('db.*.credit_card', 'db.customers.address_.*')
INTO default_glue_catalog.upsolver_samples.raw_cdc_tables;

(Optional) Comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture. This maps to the Debezium property.

(Optional) Comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. This maps to Debezium property.

table.include.list
column.exclude.list
Job options
Source options
DDL FILTERS
PARSE_JSON_COLUMNS
SKIP_SNAPSHOTS
SNAPSHOT_PARALLELISM
TABLE_INCLUDE_LIST
COLUMN_EXCLUDE_LIST
COLUMN_TRANSFORMATIONS
COMMENT
COMPUTE_CLUSTER
END_AT
EXCLUDE_COLUMNS
WITH EXPECTATION