Output to a Target Table

This quickstart guide describes how to insert data from a staging table to a target table in Upsolver.

Creating an output table for refined data in Amazon Athena

To start transforming your data, choose from two types of transformation jobs: INSERT or MERGE. To take full advantage of these jobs, we recommend that you explicitly create your target table, which enables you to define partitions, primary keys, and more. The following example creates a target table in the data lake.

Here's the code to create a target table in the data lake:

// Syntax
CREATE TABLE default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> (
     partition_date date)
    PARTITIONED BY partition_date;
    
// Example
CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data(
        partition_date date)
        PARTITIONED BY partition_date;

You don’t need to define all of your output columns, as Upsolver automatically adds any missing columns. If you do want to control which columns are added, you can define them in the CREATE TABLE statement and then set the ADD_MISSING_COLUMNS job property to FALSE in the job options.

Prepare your data for the refined zone

Now that you have created the target table, let's create a transformation job to read data from the staging table, transform it, and write the results into the target table inside your refined zone.

You must have data in your staging table to run this job.

Create a transformation job to read from staging and write to the target:

// Syntax
CREATE JOB <TRANSFORM_JOB_NAME>
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = TRUE	
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> MAP_COLUMNS_BY_NAME
    SELECT 
      <source_column_name> as <target_column_name>,
      MD5(<source_column_name>) as <target_column_name>,
      <new_variable> as <target_column_name>,
      <source_column_name> as <target_column_name>,
      $commit_time AS partition_date
    FROM default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>
    WHERE $commmit_time BETWEEN run_start_time() AND run_end_time()
    AND <column_name> = '<filter_by_value>';

// Example
CREATE JOB transform_orders_and_insert_into_athena
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = true	
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
    -- Use the SELECT statement to choose columns from the source and implement your business logic transformations.
    SELECT 
        orderid AS order_id, -- rename columns
        MD5(buyeremail) AS customer_id, -- hash or mask columns using built-in functions
        nettotal AS total, 
        $commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically cast to DATE
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE eventtype = 'ORDER' 
    AND $commit_time BETWEEN run_start_time() AND run_end_time();

Let's walk through this job:

CREATE JOB transform_orders_and_insert_into_athena
        START_FROM = BEGINNING
        ADD_MISSING_COLUMNS = true	
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME

First, we create a job named transform_orders_and_insert_into_athena. The START_FROM option instructs Upsolver to include all data going back in time, and ADD_MISSING_COLUMNS ensures columns added in the future are added to the target table. In this example, the RUN_INTERVAL specifies that the job executes every 1 minute.

Then, in the AS INSERT INTO command, you set the insertion table as the target table you created in the previous step. The MAP_COLUMNS_BY_NAME command tells Upsolver to match the source and target columns based on name.

In the SELECT statement, you can perform transformations on your data:

SELECT 
      orderid AS order_id, -- rename columns
      MD5(buyeremail) AS customer_id, -- hash or mask columns using built-in functions
      nettotal AS total, 
      $commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically cast to DATE

Examples of the transformations you can apply to your data include renaming columns, such as changing orderid to order_id, or populating a new column with data from a different one. In the above example, buyeremail is masked using the MD5 function, and a new column, customer_id, is created.

Lastly, you specify the table from which you are copying your data, usually your staging table:

 FROM default_glue_catalog.upsolver_samples.orders_raw_data
     WHERE eventtype = 'ORDER' 
     AND $commit_time BETWEEN run_start_time() AND run_end_time();

Query your data

Finally, to view your pipeline results, use a SELECT statement to query your table.

Here's the code to view your pipeline data:

SELECT * 
FROM default_glue_catalog.upsolver_samples.orders_transformed_data 
LIMIT 100;

Including the LIMIT option instructs the Upsolver query engine to limit the result set to 100 rows.

Last updated