Ingestion jobs

This section shows you how to use SQL syntax to create and manage ingestion jobs.

An ingestion job uses the COPY FROM command to copy data from a given source into a table created in a metastore connection, or directly into a supported target. If writing into a data lake, this serves as your staging table and can be used with transformation jobs to write to various target locations.

Note that when writing to a staging table using an ingestion job, the table should not have any primary keys and, if partitioned, should only be partitioned by time.

Additionally, multiple ingestion jobs can write to the same staging table, which results in a UNION ALL of the data copied into the table. This means that any duplicate records that arrive are not removed from the table.

The COPY FROM command does not support the SELECT statement, however, there are multiple job options you can include to manage and transform your data before it lands in the target. Furthermore, you can use expectations to manage rows that fail to meet data quality requirements as they are ingested.

Syntax

CREATE [SYNC] JOB <job_name>
    [{ job_options }]
    AS COPY FROM { S3 
                 | KAFKA 
                 | KINESIS }
                     <connection_identifier>
                     [{ source_options }]
    INTO { <table_identifier> | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
    WITH EXPECTATION <expectation_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN };

The SQL syntax for creating an ingestion job.

Job identifier

This is the name of your job, which should match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

The following job options can be applied to all sources. See the links below for source-specific job options.

[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]             
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]

Jump to

COLUMN_TRANSFORMATIONS

Values: ( <column> = <expression>, ...)

(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream. Combining hash functions with the EXCLUDE_COLUMNS option enables you to control your data protection.

See the Functions and operators reference for a full list of options that you can apply.

COMMENT — editable

Type: text

(Optional) A description or comment regarding this job.

COMPUTE_CLUSTER — editable

Type: identifier

Default: The sole cluster in your environment

(Optional) The compute cluster to run this job.

This option can only be omitted when there is only one cluster in your environment.

If you have more than one compute cluster, you need to determine which one to use through this option.

END_AT — editable

Values: { NOW | <timestamp> }

Default: Never

(Optional) Configures the time to stop ingesting data. Files after the specified time are ignored.

Timestamps should be based on UTC and in the following format: TIMESTAMP 'YYYY-MM-DD HH:MM:SS'.

EXCLUDE_COLUMNS

Values: ( , ...)

(Optional) The EXCLUDE_COLUMNS option tells Upsolver to ignore data in the columns specified in this list, and the column is not created on the target. To exclude columns, provide a single column or a list of column names, or use a glob pattern.

When you simply don't need columns, you want to save storage space, or maintain a clean data structure, use EXCLUDE_COLUMNS and the specified columns will be ignored.

This option gives you control over the width of the target table by enabling you to manage how many columns are created. If your target system has a limit on the number of columns it supports, continuously adding columns can cause issues. Furthermore, columns containing sensitive information can be excluded, ensuring private data is not copied downstream to a staging table in your data lake, or directly into your target.

Source-specific job options

Visit the pages below for source-specific job options:

Target identifier

Table

The table identifier must be provided in the following format:

INTO <catalog_name>.<schema_name>.<table_name>

Direct ingestion to target

If you are creating a job to ingest your source data directly into a target system, for example to Snowflake, use the following format:

INTO SNOWFLAKE <connection_name>.<schema_name>.<table_name>

Please note that direct ingestion is not supported by MySQL or PostgreSQL.

Expectations

Data quality conditions can be added to your job to drop a row or trigger a warning when a column violates a predefined condition.

Each expectation name should be unique to the job, and the predicate that determines whether the condition is violated must return a Boolean value. The SQL predicate can be any syntax supported in a WHERE clause, however, aggregates are not supported.

Each expectation has an action of either DROP or WARN. If you define the action as DROP, any row with a predicate that returns FALSE, is dropped from the ingestion stream and is not loaded into the target. Rows violating the condition defined in an expectation with a WARN action are loaded into the target.

You can add an expectation when you create a new job, or use the ALTER JOB command to update an existing job to add a new expectation. However, you must drop and recreate an expectation to alter an existing rule. A job can have an unlimited number of expectations and each expectation name must be unique to the job.

To add an expectation when you create a new job, use the following syntax:

CREATE JOB <job_name> ...   
WITH EXPECTATION <expectation_name> EXPECT <sql_predicate> 
ON VIOLATION { DROP | WARN }

You can easily add an expectation to an existing job using the ALTER JOB command. The syntax below shows how to alter a job you have already created:

ALTER JOB <job_name> 
ADD EXPECTATION <expectation_name> 
  EXPECT <sql_predicate> ON VIOLATION { DROP | WARN }

To drop an expectation, use the ALTER JOB command using the following syntax:

ALTER JOB <job_name> 
DROP EXPECTATION <expectation_name>

When you drop an expectation, the entry is dropped from the system.monitoring.expectations system table, so if you want to retain a count of warnings or dropped rows, ensure you take a copy of these values prior to dropping the expectation. Furthermore, values are removed from the system.monitoring.jobs table.

Dropping a job will also drop the associated expectations, so you will no longer see the count of warnings or dropped rows in the system tables. You can take a backup of this data if you want to retain it for reporting purposes. See the how-to guide Managing data quality - ingesting data with expectations for instructions on creating a reporting table to record violated conditions.

Monitoring data quality with expectations

For both DROP and WARN actions, counters are incremented in the system.monitoring.expectations and system.monitoring.jobs tables to monitor the number of rows violating the condition.

system.monitoring.expectations

When a row fails to meet an expectation, the system.monitoring.expectations table is updated, and you can use the values in this table to report on the quality of your ingested data:

  • The triggered_today value is incremented each time an ingested row violates the condition, and the count shows all rows that violated the expectation since midnight (UTC time).

  • The total_triggered value is incremented each time an ingested row violates the condition, and the count shows all rows that violated the expectation since the job started.

  • The action column indicates the trigger type, either drop or warn, defined on the expectation.

Run the following query against the expectations system table to return the values for an expectation:

SELECT expectation_name, triggered_today, triggered_total, action 
FROM  system.monitoring.expectations 
WHERE expectation_name = '<expectation_name>';

You can also view the counts for all expectations defined in a job using the following query, simply by changing the WHERE clause:

SELECT expectation_name, triggered_today, triggered_total, action 
FROM  system.monitoring.expectations 
WHERE job_name = '<job_name>';

system.monitoring.jobs

Furthermore, you can view the sum of all expectations triggered since midnight (UTC time) in the system.monitoring.jobs table, which counts all triggered expectations defined in a job:

  • drop_expectations_triggered_today shows the number of DROP expectation violations today.

  • warn_expectations_triggered_today counts the total rows that were dropped from the ingestion stream today and were not loaded into the target because they failed to meet a condition.

  • rows_filtered_by_expectations_today the number of rows dropped due to expectation violations today.

A row can violate multiple DROP and WARN conditions. For example, if a row triggers two warn expectations, and two drop expectations, the drop_expectations_triggered_today column will be incremented by two, and the warn_expectations_triggered_today column will be incremented by two, resulting in four violations. However, the rows_filtered_by_expectations_today column will only be incremented by one.

Run the following statement to return a count of warnings and dropped rows for a job:

SELECT drop_expectations_triggered_today, 
       warn_expectations_triggered_today, 
       rows_filtered_by_expectations_today
FROM system.monitoring.jobs
WHERE job_name = '<job_name>';

For more information on using expectations and monitoring rows that violate conditions, read the how-to guide, Managing data quality - ingesting data with expectations.

Examples

Transform and exclude columns

In this example, data is ingested directly from Kafka to Snowflake, using a transformation to mask PII data before loading it into the target table in Snowflake.

The customer email is copied into a new column named hashed_email, and the values are masked using the MD5() hash function. The original column, customer.email, is excluded.

CREATE SYNC JOB ingest_and_tranform
    COMMENT = 'Load raw orders from Kafka and transform and exclude data'
    CONTENT_TYPE = JSON
    COMPUTE_CLUSTER = "Default Compute (Free)"
    EXCLUDE_COLUMNS = ('customer.email') -- exclude the original column
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email)) -- mask customer email into new column        
AS COPY FROM KAFKA upsolver_kafka_samples TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;

Transform data using COLUMN_TRANSFORMATIONS and EXCLUDE_COLUMNS in an ingestion job.

Manage data quality with expectations

This example shows you how to use expectations to manage the quality of the data that is ingested by your job. The expectation exp_orderid_not_null checks that the orderid has a value: if this expectation returns FALSE, the row is dropped.

Similar to this, the expectation exp_add1_not_null checks that the first line of the customer address is not null or an empty string. If the condition returns FALSE, the row is loaded into the target, but a warning is triggered. The taxrate expectation checks that the value in this column is 0.12, otherwise, the row is loaded into the target and a warning is triggered.

If a row is ingested that violates both a warning and a drop expectation, the row is dropped, however, the trigger is incremented in the system tables for each expectation.

CREATE SYNC JOB load_orders_raw_data_from_s3_expectations
   CONTENT_TYPE = JSON
   AS COPY FROM S3 upsolver_s3_samples LOCATION = 's3://upsolver-samples/orders/' 
   INTO default_glue_catalog.upsolver_samples.orders_raw_data
   WITH EXPECTATION exp_orderid_not_null EXPECT orderid IS NOT NULL ON VIOLATION DROP
   WITH EXPECTATION exp_add1_not_null EXPECT customer.address.address1 IS NOT NULL OR customer.address.address1 <> '' ON VIOLATION WARN
   WITH EXPECTATION exp_taxrate EXPECT taxrate = 0.12 ON VIOLATION WARN;

Add expectations to a job to manage the quality of data as it is ingested.

Last updated