Ingestion

An ingestion job uses the COPY FROM command to copy data from a given source into a table created in a metastore connection, or directly into a supported target. If writing into a data lake, this serves as your staging table and can be used with transformation jobs to write to various target locations.

Note that when writing to a staging table using an ingestion job, the table should not have any primary keys and, if partitioned, should only be partitioned by time.

Additionally, multiple ingestion jobs can write to the same staging table, which results in a UNION ALL of the data copied into the table. This means that any duplicate records that arrive are not removed from the table.

The COPY FROM command does not support the SELECT statement, however, there are multiple job options you can include to manage and transform your data before it lands in the target. Furthermore, you can use expectations to manage rows that fail to meet data quality requirements as they are ingested.


Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM { 
               CONFLUENT 
               | KAFKA 
               | KINESIS 
               | MSSQL
               | MONGODB
               | MYSQL
               | POSTGRES 
               | S3
}
   <connection_identifier>
   [{ source_options }]
 INTO { <table_identifier> | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
[ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];

Job identifier

This is the name of your job, which should match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

The following job options can be applied to all sources. See the links below for source-specific job options.

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]             
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]

Jump to

See also

COLUMN_TRANSFORMATIONS

Values: ( <column> = <expression>, ...)

(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream. Combining hash functions with the EXCLUDE_COLUMNS option enables you to control your data protection.

See the Functions and Operators references for a full list of options that you can apply.

COMMENT — editable

Type: text

(Optional) A description or comment regarding this job.

COMPUTE_CLUSTER — editable

Type: identifier

Default: The sole cluster in your environment

(Optional) The compute cluster to run this job.

This option can only be omitted when there is only one cluster in your environment.

If you have more than one compute cluster, you need to determine which one to use through this option.

END_AT — editable

Values: { NOW | <timestamp> }

Default: Never

(Optional) Configures the time to stop ingesting data. Files after the specified time are ignored.

Timestamps should be based on UTC and in the following format: TIMESTAMP 'YYYY-MM-DD HH:MM:SS'.

EXCLUDE_COLUMNS

Values: ( '<column>', ...)

(Optional) The EXCLUDE_COLUMNS option tells Upsolver to ignore data in the columns specified in this list, and the column is not created on the target. To exclude columns, provide a single column name (as a string literal) or a list of column names, or use a glob pattern.

When you simply don't need columns, you want to save storage space, or maintain a clean data structure, use EXCLUDE_COLUMNS and the specified columns will be ignored. This option gives you control over the width of the target table by enabling you to manage how many columns are created. If your target system has a limit on the number of columns it supports, continuously adding columns can cause issues.

Furthermore, columns containing sensitive information can be excluded, ensuring private data is not copied downstream to a staging table in your data lake, or directly into your target.

Example:

CREATE JOB my_job
    START_FROM = NOW
    CONTENT_TYPE = JSON
    COLUMN_TRANSFORMATIONS = (
      fixed_column="fixed-column",
      fixed_column2="fixed-column2"
    )
    EXCLUDE_COLUMNS = (
      'fixed-column',
      'fixed-column2'
    )
AS COPY FROM KAFKA my_kafka TOPIC = 'test'
INTO my_catalog.my_schema.my_table;

Source-specific job options

Visit the pages below for source-specific job options:

Target identifier

Table

The table identifier must be provided in the following format:

INTO <catalog_name>.<schema_name>.<table_name>

Direct ingestion to target

If you are creating a job to ingest your source data directly into a target system, for example to Snowflake, use the following format:

INTO SNOWFLAKE <connection_name>.<schema_name>.<table_name>

Please note that direct ingestion to Snowflake is not supported for CDC sources: Microsoft SQL Server, MongoDB, MySQL, or PostgreSQL.

Please see Replication jobs for an alternative option.


Examples

Exclude and transform columns

In this example, data is ingested directly from Kafka to Snowflake, using a transformation to mask PII data before loading it into the target table in Snowflake.

The customer email is copied into a new column named hashed_email, and the values are masked using the MD5() hash function. The original column, customer.email, is excluded.

CREATE SYNC JOB ingest_and_tranform
    COMMENT = 'Load raw orders from Kafka and transform and exclude data'
    CONTENT_TYPE = JSON
    COMPUTE_CLUSTER = "Default Compute (Free)"
    // Exclude the original column
    EXCLUDE_COLUMNS = ('customer.email')
    // Mask customer email into new column 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))        
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;

Manage data quality with expectations

This example shows you how to use expectations to manage the quality of the data that is ingested by your job. The expectation exp_orderid_not_null checks that the orderid has a value: if this expectation returns FALSE, the row is dropped.

Similar to this, the expectation exp_add1_not_null checks that the first line of the customer address is not null or an empty string. If the condition returns FALSE, the row is loaded into the target, but a warning is triggered. The taxrate expectation checks that the value in this column is 0.12, otherwise, the row is loaded into the target and a warning is triggered.

If a row is ingested that violates both a warning and a drop expectation, the row is dropped, however, the trigger is incremented in the system tables for each expectation.

CREATE SYNC JOB load_orders_raw_data_from_s3_expectations
   CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples 
   LOCATION = 's3://upsolver-samples/orders/' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data
   WITH EXPECTATION exp_orderid_not_null 
     EXPECT orderid IS NOT NULL 
     ON VIOLATION DROP
   WITH EXPECTATION exp_add1_not_null 
     EXPECT customer.address.address1 IS NOT NULL OR customer.address.address1 <> '' 
     ON VIOLATION WARN
   WITH EXPECTATION exp_taxrate 
     EXPECT taxrate = 0.12 
     ON VIOLATION WARN;

Last updated