Column Transformations

Learn how column transformations can be applied to ingestion jobs.

If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target.

Flexible data shaping

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.

For example, in the following job, the ordertype type column has a transformation applied to convert all values to upper case to maintain consistency in the target table in Snowflake. Furthermore, the review column has a right trim function applied to remove trailing spaces that commonly appear in this column:

CREATE SYNC JOB ingest_s3_to_snowflake
   COMMENT = 'Load orders into Snowflake'
   CONTENT_TYPE = JSON
   COLUMN_TRANSFORMATIONS = (ordertype = UPPER(ordertype), review = RTRIM(review)) 
   COMMIT_INTERVAL = 10 MINUTES
AS COPY FROM S3 upsolver_s3_samples 
   LOCATION = 's3://upsolver-samples/orders/' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;

Protect sensitive data

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream.

In the following example, the MD5() masking function is applied to the customer.email column to hide the raw data, and transformed into a new column named hashed_email in the target.

CREATE SYNC JOB ingest_kinesis_to_snowflake
    COMMENT = 'Ingest orders to Snowflake'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
    EXCLUDE_COLUMNS = ('customer.email') 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))       
    COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection 
    STREAM = 'orders'  
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;

In the example above, the customer's email address has been transformed into the new column, hashed_email. However, the existing column - customer.email - would also be included in the ingestion and loaded into Snowflake in its raw format.

So in this case, the EXCLUDE_COLUMNS option has been included and the customer.email column does not get ingested to the target, thereby protecting this information.


Learn More

Last updated