Comment on page
Insert data into your target table
This quickstart guide describes how to insert data from a staging table to a target table in Upsolver.
To start transforming your data, choose from two types of transformation jobs:
INSERT
or MERGE
. To take full advantage of these jobs, we recommend that you explicitly create your target table, which enables you to define partitions, primary keys, and more. The following example creates a target table in the data lake.Here's the code to create a target table in the data lake:
// Syntax
CREATE TABLE default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> (
partition_date date)
PARTITIONED BY partition_date;
// Example
CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data(
partition_date date)
PARTITIONED BY partition_date;
You don’t need to define all of your output columns, as Upsolver automatically adds any missing columns. If you do want to control which columns are added, you can define them in the
CREATE TABLE
statement and then set the ADD_MISSING_COLUMNS
job property to FALSE in the job options.Now that you have created the target table, let's create a transformation job to read data from the staging table, transform it, and write the results into the target table inside your refined zone.
Note: You must have data in your staging table to run this job.
Create a transformation job to read from staging and write to the target:
// Syntax
CREATE JOB <TRANSFORM_JOB_NAME>
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = TRUE
RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> MAP_COLUMNS_BY_NAME
SELECT
<source_column_name> as <target_column_name>,
MD5(<source_column_name>) as <target_column_name>,
<new_variable> as <target_column_name>,
<source_column_name> as <target_column_name>,
$commit_time AS partition_date
FROM default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>
WHERE time_filter()
AND <column_name> = '<filter_by_value>';
// Example
CREATE JOB transform_orders_and_insert_into_athena
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = true
RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
-- Use the SELECT statement to choose columns from the source and implement your business logic transformations.
SELECT
orderid AS order_id, -- rename columns
MD5(buyeremail) AS customer_id, -- hash or mask columns using built-in functions
nettotal AS total,
$commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically cast to DATE
FROM default_glue_catalog.upsolver_samples.orders_raw_data
WHERE eventtype = 'ORDER'
AND time_filter();
Let's walk through this job:
CREATE JOB transform_orders_and_insert_into_athena
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = true
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
First, we create a job named transform_orders_and_insert_into_athena. The
START_FROM
option instructs Upsolver to include all data going back in time, and ADD_MISSING_COLUMNS
ensures columns added in the future are added to the target table. In this example, the RUN_INTERVAL
specifies that the job executes every 1 minute.Then, in the
AS INSERT INTO
command, you set the insertion table as the target table you created in the previous step. The MAP_COLUMNS_BY_NAME
command tells Upsolver to match the source and target columns based on name.In the
SELECT
statement, you can perform transformations on your data:SELECT
orderid AS order_id, -- rename columns
MD5(buyeremail) AS customer_id, -- hash or mask columns using built-in functions
nettotal AS total,
$commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically cast to DATE
Examples of the transformations you can apply to your data include renaming columns, such as changing orderid to order_id, or populating a new column with data from a different one. In the above example, buyeremail is masked using the
MD5
function, and a new column, customer_id, is created.Lastly, you specify the table from which you are copying your data, usually your staging table:
FROM default_glue_catalog.upsolver_samples.orders_raw_data
WHERE eventtype = 'ORDER'
AND time_filter();
Finally, to view your pipeline results, use a
SELECT
statement to query your table.Here's the code to view your pipeline data:
SELECT *
FROM default_glue_catalog.upsolver_samples.orders_transformed_data
LIMIT 100;
Including the
LIMIT
option instructs the Upsolver query engine to limit the result set to 100 rowsLast modified 3mo ago