Ingestion jobs
This section shows you how to use SQL syntax to create and manage ingestion jobs.
An ingestion job uses the COPY FROM
command to copy data from a given source into a table created in a metastore connection, or directly into a supported target. If writing into a data lake, this serves as your staging table and can be used with transformation jobs to write to various target locations.
Note that when writing to a staging table using an ingestion job, the table should not have any primary keys and, if partitioned, should only be partitioned by time.
Additionally, multiple ingestion jobs can write to the same staging table, which results in a UNION ALL
of the data copied into the table. This means that any duplicate records that arrive are not removed from the table.
The COPY FROM
command does not support the SELECT statement, however, there are multiple job options you can include to manage and transform your data before it lands in the target. Furthermore, you can use expectations to manage rows that fail to meet data quality requirements as they are ingested.
Syntax
The SQL syntax for creating an ingestion job.
Job identifier
This is the name of your job, which should match the following format:
Job options
The following job options can be applied to all sources. See the links below for source-specific job options.
Jump to
COLUMN_TRANSFORMATIONS
COLUMN_TRANSFORMATIONS
Values: ( <column> = <expression>, ...)
(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.
However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.
If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream. Combining hash functions with the EXCLUDE_COLUMNS
option enables you to control your data protection.
See the Functions and operators reference for a full list of options that you can apply.
COMMENT
— editable
COMMENT
— editableType: text
(Optional) A description or comment regarding this job.
COMPUTE_CLUSTER
— editable
COMPUTE_CLUSTER
— editableType: identifier
Default: The sole cluster in your environment
(Optional) The compute cluster to run this job.
This option can only be omitted when there is only one cluster in your environment.
If you have more than one compute cluster, you need to determine which one to use through this option.
END_AT
— editable
END_AT
— editableValues: { NOW | <timestamp> }
Default: Never
(Optional) Configures the time to stop ingesting data. Files after the specified time are ignored.
Timestamps should be based on UTC and in the following format: TIMESTAMP 'YYYY-MM-DD HH:MM:SS'
.
EXCLUDE_COLUMNS
EXCLUDE_COLUMNS
Values: ( , ...)
(Optional) The EXCLUDE_COLUMNS
option tells Upsolver to ignore data in the columns specified in this list, and the column is not created on the target. To exclude columns, provide a single column or a list of column names, or use a glob pattern.
When you simply don't need columns, you want to save storage space, or maintain a clean data structure, use EXCLUDE_COLUMNS
and the specified columns will be ignored.
This option gives you control over the width of the target table by enabling you to manage how many columns are created. If your target system has a limit on the number of columns it supports, continuously adding columns can cause issues. Furthermore, columns containing sensitive information can be excluded, ensuring private data is not copied downstream to a staging table in your data lake, or directly into your target.
Source-specific job options
Visit the pages below for source-specific job options:
Target identifier
Table
The table identifier must be provided in the following format:
Direct ingestion to target
If you are creating a job to ingest your source data directly into a target system, for example to Snowflake, use the following format:
Please note that direct ingestion is not supported by MySQL or PostgreSQL.
Expectations
Data quality conditions can be added to your job to drop a row or trigger a warning when a column violates a predefined condition.
Each expectation name should be unique to the job, and the predicate that determines whether the condition is violated must return a Boolean value. The SQL predicate can be any syntax supported in a WHERE
clause, however, aggregates are not supported.
Each expectation has an action of either DROP
or WARN
. If you define the action as DROP
, any row with a predicate that returns FALSE
, is dropped from the ingestion stream and is not loaded into the target. Rows violating the condition defined in an expectation with a WARN
action are loaded into the target.
You can add an expectation when you create a new job, or use the ALTER JOB
command to update an existing job to add a new expectation. However, you must drop and recreate an expectation to alter an existing rule. A job can have an unlimited number of expectations and each expectation name must be unique to the job.
To add an expectation when you create a new job, use the following syntax:
You can easily add an expectation to an existing job using the ALTER JOB
command. The syntax below shows how to alter a job you have already created:
To drop an expectation, use the ALTER JOB
command using the following syntax:
When you drop an expectation, the entry is dropped from the system.monitoring.expectations
system table, so if you want to retain a count of warnings or dropped rows, ensure you take a copy of these values prior to dropping the expectation. Furthermore, values are removed from the system.monitoring.jobs
table.
Dropping a job will also drop the associated expectations, so you will no longer see the count of warnings or dropped rows in the system tables. You can take a backup of this data if you want to retain it for reporting purposes. See the how-to guide Managing data quality - ingesting data with expectations for instructions on creating a reporting table to record violated conditions.
Monitoring data quality with expectations
For both DROP
and WARN
actions, counters are incremented in the system.monitoring.expectations
and system.monitoring.jobs
tables to monitor the number of rows violating the condition.
system.monitoring.expectations
When a row fails to meet an expectation, the system.monitoring.expectations
table is updated, and you can use the values in this table to report on the quality of your ingested data:
The
triggered_today
value is incremented each time an ingested row violates the condition, and the count shows all rows that violated the expectation since midnight (UTC time).The
total_triggered
value is incremented each time an ingested row violates the condition, and the count shows all rows that violated the expectation since the job started.The
action
column indicates the trigger type, either drop or warn, defined on the expectation.
Run the following query against the expectations system table to return the values for an expectation:
You can also view the counts for all expectations defined in a job using the following query, simply by changing the WHERE
clause:
system.monitoring.jobs
Furthermore, you can view the sum of all expectations triggered since midnight (UTC time) in the system.monitoring.jobs
table, which counts all triggered expectations defined in a job:
drop_expectations_triggered_today
shows the number ofDROP
expectation violations today.warn_expectations_triggered_today
counts the total rows that were dropped from the ingestion stream today and were not loaded into the target because they failed to meet a condition.rows_filtered_by_expectations_today
the number of rows dropped due to expectation violations today.
A row can violate multiple DROP
and WARN
conditions. For example, if a row triggers two warn expectations, and two drop expectations, the drop_expectations_triggered_today column will be incremented by two, and the warn_expectations_triggered_today column will be incremented by two, resulting in four violations. However, the rows_filtered_by_expectations_today column will only be incremented by one.
Run the following statement to return a count of warnings and dropped rows for a job:
For more information on using expectations and monitoring rows that violate conditions, read the how-to guide, Managing data quality - ingesting data with expectations.
Examples
Transform and exclude columns
In this example, data is ingested directly from Kafka to Snowflake, using a transformation to mask PII data before loading it into the target table in Snowflake.
The customer email is copied into a new column named hashed_email, and the values are masked using the MD5()
hash function. The original column, customer.email, is excluded.
Transform data using COLUMN_TRANSFORMATIONS and EXCLUDE_COLUMNS in an ingestion job.
Manage data quality with expectations
This example shows you how to use expectations to manage the quality of the data that is ingested by your job. The expectation exp_orderid_not_null checks that the orderid has a value: if this expectation returns FALSE
, the row is dropped.
Similar to this, the expectation exp_add1_not_null checks that the first line of the customer address is not null or an empty string. If the condition returns FALSE
, the row is loaded into the target, but a warning is triggered. The taxrate expectation checks that the value in this column is 0.12, otherwise, the row is loaded into the target and a warning is triggered.
If a row is ingested that violates both a warning and a drop expectation, the row is dropped, however, the trigger is incremented in the system tables for each expectation.
Add expectations to a job to manage the quality of data as it is ingested.
Last updated