Ingestion
An ingestion job uses the COPY FROM
command to copy data from a given source into a table created in a metastore connection, or directly into a supported target. If writing into a data lake, this serves as your staging table and can be used with transformation jobs to write to various target locations.
Note that when writing to a staging table using an ingestion job, the table should not have any primary keys and, if partitioned, should only be partitioned by time.
Additionally, multiple ingestion jobs can write to the same staging table, which results in a UNION ALL
of the data copied into the table. This means that any duplicate records that arrive are not removed from the table.
The COPY FROM
command does not support the SELECT
statement, however, there are multiple job options you can include to manage and transform your data before it lands in the target. Furthermore, you can use expectations to manage rows that fail to meet data quality requirements as they are ingested.
Syntax
Job identifier
This is the name of your job, which should match the following format:
Job options
The following job options can be applied to all sources. See the links below for source-specific job options.
Jump to
See also
COLUMN_TRANSFORMATIONS
COLUMN_TRANSFORMATIONS
Values: ( <column> = <expression>, ...)
(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.
However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.
If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream. Combining hash functions with the EXCLUDE_COLUMNS
option enables you to control your data protection.
COMMENT
— editable
COMMENT
— editableType: text
(Optional) A description or comment regarding this job.
COMPUTE_CLUSTER
— editable
COMPUTE_CLUSTER
— editableType: identifier
Default: The sole cluster in your environment
(Optional) The compute cluster to run this job.
This option can only be omitted when there is only one cluster in your environment.
If you have more than one compute cluster, you need to determine which one to use through this option.
END_AT
— editable
END_AT
— editableValues: { NOW | <timestamp> }
Default: Never
(Optional) Configures the time to stop ingesting data. Files after the specified time are ignored.
Timestamps should be based on UTC and in the following format: TIMESTAMP 'YYYY-MM-DD HH:MM:SS'
.
EXCLUDE_COLUMNS
EXCLUDE_COLUMNS
Values: ( '<column>', ...)
(Optional) The EXCLUDE_COLUMNS
option tells Upsolver to ignore data in the columns specified in this list, and the column is not created on the target. To exclude columns, provide a single column name (as a string literal) or a list of column names, or use a glob pattern.
When you simply don't need columns, you want to save storage space, or maintain a clean data structure, use EXCLUDE_COLUMNS
and the specified columns will be ignored. This option gives you control over the width of the target table by enabling you to manage how many columns are created. If your target system has a limit on the number of columns it supports, continuously adding columns can cause issues.
Furthermore, columns containing sensitive information can be excluded, ensuring private data is not copied downstream to a staging table in your data lake, or directly into your target.
Example:
Source-specific job options
Visit the pages below for source-specific job options:
Target identifier
Table
The table identifier must be provided in the following format:
Direct ingestion to target
If you are creating a job to ingest your source data directly into a target system, for example to Snowflake, use the following format:
Please note that direct ingestion to Snowflake is not supported for CDC sources: Microsoft SQL Server, MongoDB, MySQL, or PostgreSQL.
Please see Replication jobs for an alternative option.
Examples
Exclude and transform columns
In this example, data is ingested directly from Kafka to Snowflake, using a transformation to mask PII data before loading it into the target table in Snowflake.
The customer email is copied into a new column named hashed_email, and the values are masked using the MD5()
hash function. The original column, customer.email, is excluded.
Manage data quality with expectations
This example shows you how to use expectations to manage the quality of the data that is ingested by your job. The expectation exp_orderid_not_null checks that the orderid has a value: if this expectation returns FALSE
, the row is dropped.
Similar to this, the expectation exp_add1_not_null checks that the first line of the customer address is not null or an empty string. If the condition returns FALSE
, the row is loaded into the target, but a warning is triggered. The taxrate expectation checks that the value in this column is 0.12, otherwise, the row is loaded into the target and a warning is triggered.
If a row is ingested that violates both a warning and a drop expectation, the row is dropped, however, the trigger is incremented in the system tables for each expectation.
Last updated