LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
Reference
Reference
  • Reference
  • ✨Learning Paths
    • Amazon Kinesis
    • Amazon S3
    • Apache Kafka
    • Confluent Cloud
    • Microsoft SQL Server
    • MongoDB
    • MySQL
    • PostgreSQL
  • SQL COMMANDS
    • Clusters
      • CREATE CLUSTER
      • ALTER CLUSTER
      • ROLL CLUSTER
      • STOP/START CLUSTER
      • DROP CLUSTER
    • Connections
      • CREATE CONNECTION
        • Amazon Kinesis
        • Amazon Redshift
        • Amazon S3
        • Apache Kafka
        • AWS Glue Data Catalog
        • ClickHouse
        • Confluent Cloud
        • Elasticsearch
        • Hive Metastore
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • Polaris Catalog
        • Iceberg REST catalogs
        • PostgreSQL
        • Snowflake
      • ALTER CONNECTION
      • DROP CONNECTION
      • CDC Connections with SSL
    • Jobs
      • CREATE JOB
        • Ingestion
          • Amazon Kinesis
          • Amazon S3
          • Apache Kafka
          • Confluent Kafka
          • Content Types
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Replication
          • Microsoft SQL Server
          • MongoDB
          • MySQL
          • PostgreSQL
        • Transformation
          • INSERT
            • MAP_COLUMNS_BY_NAME
            • UNNEST
          • MERGE
          • SELECT
          • Job Options
            • Amazon Redshift
            • Amazon S3
            • Apache Iceberg
            • ClickHouse
            • Data Lake Tables
            • Elasticsearch
            • PostgreSQL
            • Snowflake
        • Monitoring
          • Amazon CloudWatch
          • Datadog
          • Dynatrace
      • ALTER JOB
      • PAUSE / RESUME JOB
      • DROP JOB
    • Materialized Views
      • CREATE MATERIALIZED VIEW
      • ALTER MATERIALIZED VIEW
      • DROP MATERIALIZED VIEW
      • Join with a Materialized View
    • Tables
      • CREATE TABLE
      • ALTER TABLE
      • DROP TABLE
    • Iceberg Tables
      • Upsolver Managed Tables
        • CREATE ICEBERG TABLE
        • ALTER ICEBERG TABLE
        • DROP ICEBERG TABLE
        • OPTIMIZE ICEBERG TABLE
      • External Iceberg Tables
        • CREATE EXTERNAL ICEBERG TABLE
        • DROP EXTERNAL ICEBERG TABLE
      • Mirror Iceberg Tables
        • CREATE MIRROR
        • ALTER MIRROR
        • PAUSE / RESUME MIRROR
        • DROP MIRROR
  • FUNCTIONS & OPERATORS
    • Data Types
    • Functions
      • Aggregate
        • APPROX_COUNT_DISTINCT
        • APPROX_COUNT_DISTINCT_EACH
        • AVG
        • AVG_EACH
        • AVG_TIME_SERIES
        • COLLECT_SET
        • COLLECT_SET_EACH
        • COUNT
        • COUNT(*)
        • COUNT(DISTINCT ...)
        • COUNT_EACH
        • COUNT_IF
        • DYNAMIC_SESSIONS
        • FIRST
        • FIRST_ARRAY
        • FIRST_EACH
        • FIRST_TIME_SERIES
        • LAST
        • LAST_ARRAY
        • LAST_EACH
        • LAST_K
        • LAST_K_EACH
        • LAST_TIME_SERIES
        • MAX
        • MAX_BY
        • MAX_EACH
        • MAX_TIME_SERIES
        • MIN
        • MIN_BY
        • MIN_EACH
        • MIN_TIME_SERIES
        • SESSION_COUNT
        • STD_DEV
        • STD_DEV_EACH
        • STRING_MAX
        • STRING_MAX_EACH
        • STRING_MIN_EACH
        • SUM
        • SUM_EACH
        • SUM_TIME_SERIES
        • WEIGHTED_AVERAGE
      • Array
        • ARRAY_DISTINCT
        • ARRAY_JOIN
        • ARRAY_MAX
        • ARRAY_MIN
        • ARRAY_SORT
        • ARRAY_SORT_DESC
        • ARRAY_SUM
        • COUNT_VALUES_IF
        • COUNT_VALUES
        • ELEMENT_AT
        • FIRST_ELEMENT
        • LAST_ELEMENT
        • VALUE_INDEX_IN_ARRAY
        • VALUE_INDEX_IN_ROW
      • Comparison
        • GREATEST
        • LEAST
      • Conditional
        • COALESCE
        • IF_ELSE
        • NULL_IF
      • Date & Time
        • ADD_TIME_ZONE_OFFSET
        • DATE
        • DATE_ADD
        • DATE_DIFF
        • DATE_TRUNC
        • DAY
        • DAY_OF_WEEK
        • DAY_OF_YEAR
        • EXTRACT_TIMESTAMP
        • EXTRACT
        • FORMAT_DATETIME
        • FROM_ISO8601_DATE
        • FROM_UNIXTIME
        • HOUR
        • MILLISECOND
        • MINUTE
        • MONTH
        • QUARTER
        • RUN_END_TIME
        • RUN_START_TIME
        • SECOND
        • SUBTRACT_TIME_ZONE_OFFSET
        • TO_UNIX_EPOCH_MILLIS
        • TO_UNIX_EPOCH_SECONDS
        • TO_UNIXTIME
        • WEEK
        • YEAR_OF_WEEK
        • YEAR
      • Filter
        • IS_DUPLICATE
        • NOT
      • Interval
        • PARSE_DURATION
      • Mathematical
        • ABS
        • CBRT
        • CEIL
        • CEILING
        • DEGREES
        • EXP
        • FLOOR
        • GET_SHARD_NUMBER
        • LN
        • LOG
        • LOG2
        • LOG10
        • MOD
        • MODULO
        • POW
        • POWER
        • RADIANS
        • RAND
        • RANDOM
        • RECIPROCAL
        • ROUND
        • SIGN
        • SORT_VALUES
        • SQRT
        • TRUNCATE
      • Regular Expressions
        • REGEXP_EXTRACT
        • REGEXP_EXTRACT_ALL
        • REGEXP_LIKE
        • REGEX_MATCH_POSITION
        • REGEX_NAMED_GROUPS
        • REGEXP_REPLACE
      • Spatial
        • ST_DISTANCE
        • ST_WGS84_DISTANCE
        • WKT_SPATIAL_CONTAINS
        • WKT_SPATIAL_INTERSECT
      • String
        • BASE64_DECODE
        • BASE64_TO_HEX
        • BYTES_SUBSTRING
        • CONCAT
        • DATE
        • JOIN_ARRAYS
        • LENGTH
        • LOWER
        • LPAD
        • LTRIM
        • MD5
        • PARSE_DATETIME
        • REPLACE
        • REVERSE
        • RPAD
        • RTRIM
        • SHA1
        • SHA3_512
        • SHA256
        • SHA512
        • SORT_VALUES
        • SPLIT
        • SPLIT_TO_RECORD
        • STRING_FORMAT
        • STRIP_MARGIN
        • STRIP_PREFIX
        • STRIP_SUFFIX
        • STRPOS
        • SUBSTR
        • SUBSTRING
        • TRANSLATE
        • TRIM_CHARS
        • TRIM
        • UPPER
        • UUID_GENERATOR
        • XX_HASH
      • Structural
        • FROM_KEY_VALUE
        • GET_RANGE
        • JOIN_ALL_BY_KEY
        • JSON_PATH
        • JSON_TO_RECORD
        • MAP_WITH_INDEX
        • QUERY_STRING_TO_RECORD
        • RECORD_TO_JSON
        • SORT_BY
        • TO_ARRAY
        • ZIP_WITH_INDEX
        • ZIP
      • Trigonometric
        • COS
        • SIN
        • TAN
        • TANH
      • Type Conversion
        • CAST
        • CHR
        • DECIMAL_TO_HEX
        • HEX_TO_DECIMAL
        • TO_BIGINT
        • TO_DOUBLE
        • TO_STRING
      • URL
        • TOP_PRIVATE_DOMAIN
        • URL_DECODE
        • URL_ENCODE
        • URL_PARSER
    • Operators
      • Comparison
      • Conditional
        • CASE
      • Logical
      • Mathematical
      • String
  • MONITORING
    • Clusters
    • Datasets
      • Ingested Data
        • Column
      • Lineage
      • Data Violations
      • Statistics
      • Maintenance
        • Compactions
        • Expire Snapshots
        • Orphan Files
      • Columns
      • Partitions
      • Properties
    • Job Status
      • Stream & File Sources
        • Monitoring
        • Graphs
        • Lineage
        • Settings
      • CDC Sources
        • Monitoring
        • Replication Settings
        • Job Settings
    • System Catalog
      • Information Schema
        • Clusters
        • Columns
        • Connections
        • Jobs
        • Mirrors
        • Tables
        • Users
        • Views
      • Insights
        • job_output_column_stats
        • dataset_column_stats
      • Monitoring
        • CDC Status
        • Clusters
        • Expectations
        • Jobs
        • Partition Statistics
        • Recent Compactions
        • Running Queries
        • Table Statistics
      • Task Executions Table
  • GENERAL
    • Common SQL Syntax
    • View Entity Syntax
    • Keyboard Shortcuts
Powered by GitBook
On this page
  • Syntax
  • Job identifier
  • Job options
  • Source-specific job options
  • Target identifier
  • Table
  • Direct ingestion to target
  • Examples
  • Exclude and transform columns
  • Manage data quality with expectations
  1. SQL COMMANDS
  2. Jobs
  3. CREATE JOB

Ingestion

Last updated 9 months ago

An ingestion job uses the COPY FROM command to copy data from a given source into a table created in a metastore connection, or directly into a supported target. If writing into a data lake, this serves as your staging table and can be used with jobs to write to various target locations.

Note that when writing to a staging table using an ingestion job, the table should not have any primary keys and, if partitioned, should only be partitioned by time.

Additionally, multiple ingestion jobs can write to the same staging table, which results in a UNION ALL of the data copied into the table. This means that any duplicate records that arrive are not removed from the table.

The COPY FROM command does not support the SELECT statement, however, there are multiple job options you can include to manage and transform your data before it lands in the target. Furthermore, you can use expectations to manage rows that fail to meet data quality requirements as they are ingested.


Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM { 
               CONFLUENT 
               | KAFKA 
               | KINESIS 
               | MSSQL
               | MONGODB
               | MYSQL
               | POSTGRES 
               | S3
}
   <connection_identifier>
   [{ source_options }]
 INTO { <table_identifier> | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
[ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Job identifier

This is the name of your job, which should match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]             
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]

Jump to

See also

COLUMN_TRANSFORMATIONS

Values: ( <column> = <expression>, ...)

(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream. Combining hash functions with the EXCLUDE_COLUMNS option enables you to control your data protection.

COMMENT — editable

Type: text

(Optional) A description or comment regarding this job.

COMPUTE_CLUSTER — editable

Type: identifier

Default: The sole cluster in your environment

(Optional) The compute cluster to run this job.

This option can only be omitted when there is only one cluster in your environment.

If you have more than one compute cluster, you need to determine which one to use through this option.

END_AT — editable

Values: { NOW | <timestamp> }

Default: Never

(Optional) Configures the time to stop ingesting data. Files after the specified time are ignored.

Timestamps should be based on UTC and in the following format: TIMESTAMP 'YYYY-MM-DD HH:MM:SS'.

EXCLUDE_COLUMNS

Values: ( '<column>', ...)

(Optional) The EXCLUDE_COLUMNS option tells Upsolver to ignore data in the columns specified in this list, and the column is not created on the target. To exclude columns, provide a single column name (as a string literal) or a list of column names, or use a glob pattern.

When you simply don't need columns, you want to save storage space, or maintain a clean data structure, use EXCLUDE_COLUMNS and the specified columns will be ignored. This option gives you control over the width of the target table by enabling you to manage how many columns are created. If your target system has a limit on the number of columns it supports, continuously adding columns can cause issues.

Furthermore, columns containing sensitive information can be excluded, ensuring private data is not copied downstream to a staging table in your data lake, or directly into your target.

Example:

CREATE JOB my_job
    START_FROM = NOW
    CONTENT_TYPE = JSON
    COLUMN_TRANSFORMATIONS = (
      fixed_column="fixed-column",
      fixed_column2="fixed-column2"
    )
    EXCLUDE_COLUMNS = (
      'fixed-column',
      'fixed-column2'
    )
AS COPY FROM KAFKA my_kafka TOPIC = 'test'
INTO my_catalog.my_schema.my_table;

Source-specific job options

Visit the pages below for source-specific job options:

Target identifier

Table

The table identifier must be provided in the following format:

INTO <catalog_name>.<schema_name>.<table_name>

Direct ingestion to target

If you are creating a job to ingest your source data directly into a target system, for example to Snowflake, use the following format:

INTO SNOWFLAKE <connection_name>.<schema_name>.<table_name>

Please note that direct ingestion to Snowflake is not supported for CDC sources: Microsoft SQL Server, MongoDB, MySQL, or PostgreSQL.


Examples

Exclude and transform columns

In this example, data is ingested directly from Kafka to Snowflake, using a transformation to mask PII data before loading it into the target table in Snowflake.

The customer email is copied into a new column named hashed_email, and the values are masked using the MD5() hash function. The original column, customer.email, is excluded.

CREATE SYNC JOB ingest_and_tranform
    COMMENT = 'Load raw orders from Kafka and transform and exclude data'
    CONTENT_TYPE = JSON
    COMPUTE_CLUSTER = "Default Compute (Free)"
    // Exclude the original column
    EXCLUDE_COLUMNS = ('customer.email')
    // Mask customer email into new column 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))        
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;

Manage data quality with expectations

This example shows you how to use expectations to manage the quality of the data that is ingested by your job. The expectation exp_orderid_not_null checks that the orderid has a value: if this expectation returns FALSE, the row is dropped.

Similar to this, the expectation exp_add1_not_null checks that the first line of the customer address is not null or an empty string. If the condition returns FALSE, the row is loaded into the target, but a warning is triggered. The taxrate expectation checks that the value in this column is 0.12, otherwise, the row is loaded into the target and a warning is triggered.

If a row is ingested that violates both a warning and a drop expectation, the row is dropped, however, the trigger is incremented in the system tables for each expectation.

CREATE SYNC JOB load_orders_raw_data_from_s3_expectations
   CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples 
   LOCATION = 's3://upsolver-samples/orders/' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data
   WITH EXPECTATION exp_orderid_not_null 
     EXPECT orderid IS NOT NULL 
     ON VIOLATION DROP
   WITH EXPECTATION exp_add1_not_null 
     EXPECT customer.address.address1 IS NOT NULL OR customer.address.address1 <> '' 
     ON VIOLATION WARN
   WITH EXPECTATION exp_taxrate 
     EXPECT taxrate = 0.12 
     ON VIOLATION WARN;

The following job options can be applied to all sources. See the links below for .

Please see for detailed information on adding data quality rules to your pipelines.

See the and references for a full list of options that you can apply.

Please see jobs for an alternative option.

transformation
Functions
Operators
Replication
source-specific job options
COLUMN_TRANSFORMATIONS
COMMENT
COMPUTE_CLUSTER
END_AT
EXCLUDE_COLUMNS

View Job Options
View Job Options
View Job Options
View Job Options
View Job Options
View Job Options
View Job Options
View Job Options
Cover
Cover
Cover
Cover
Cover
Expectations
Cover
Cover
Cover