Upserting data in SQLake — Amazon S3 to Amazon Athena

This how-to guide shows how you can use the MERGE function to 'upsert' data.

Upserting is a term we use to describe row transformation. When you create a job to upsert data, you perform one of two functions:

  1. the job inserts a row in your target table with new information; or

  2. the job updates an existing row with the new data

SQLake templates primarily use the INSERT function. However, another method of upserting is the MERGE function. In the MERGE function you merge data from one data stream with another reference data stream.

Before you begin

Create a connection to your data source in S3

To join your data, you must first create an S3 connection. This connection enables you to configure the AWS IAM credentials that SQLake needs to access the data.

Use your Access Key ID and your Secret Access Key to connect to your specific data lake.

Here’s the code:

CREATE S3 CONNECTION my_sample_data
       AWS_ROLE = 'arn:aws:iam::111111111111:role/<upsolver-role-*'
       EXTERNAL_ID = '12345678'
       READ_ONLY = TRUE;

When you deploy SQLake, the cluster will already have IAM role attached that may or may not have permission to access the S3 path where your data resides. In the event that it lacks those permissions, you can assign a scope-down policy to the S3 connection. Upsolver recommends this to prevent failures, misconfigurations, and breaches.

Create a staging table to ingest your data

You must create a job that reads from the source S3 bucket and writes to a staging table.

SQLake automatically converts your files into Apache Parquet format and creates a table with the appropriate schema in the AWS Glue Data Catalog.

Here’s the code to create the staging table:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data;

Here's the code to create a job to load data from the your data source into your staging table:

CREATE JOB load_orders_raw_data_from_s3
    START_FROM = NOW
    CONTENT_TYPE = JSON
    AS COPY FROM S3 upsolver_s3_samples 
        BUCKET = 'upsolver-samples' 
        PREFIX = 'orders/' 
    INTO default_glue_catalog.upsolver_samples.orders_raw_data;

After your data is prepared for transformation, you can take one of two paths to refine it:

  • INSERT

  • MERGE

Use INSERT to transform your data

Using INSERT to transform your data involves 4 steps:

  1. Create a table

  2. Transform and INSERT data into the newly-created table

  3. Query the data

Step 1 - Create a table to hold your data

To upsert your data, your target table must have at least one primary key. Use this key to match rows between your target and source tables.

If your target table is an Upsolver-managed table, you can define a primary key as follows:

Here's the code:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_upsert_with_insert(
    customer_email string)
    PRIMARY KEY customer_email;

Step 2 - Apply transformations and insert data into your table

You can now apply transformations to your data and write the results into your target table.

To do this, you can create an INSERT job. In this scenario, the name of your target table is "default_glue_catalog.upsolver_samples.orders_upsert_with_insert" and we aggregate the data by customer and then filter the data to include only repeat customers.

Here's the code:

CREATE JOB insert_orders_upsert
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = TRUE
    RUN_INTERVAL = 1 MINUTE
    AS INSERT INTO default_glue_catalog.upsolver_samples.orders_upsert_with_insert MAP_COLUMNS_BY_NAME
    /*
    	Use the SELECT statement below to choose your columns and performed the desired transformations.
	
	In this example, we aggregate the sample orders data by customer and filter it to only include repeat purchasers.
    */       
    SELECT customer.email AS customer_email, 
       COUNT(DISTINCT orderid) AS number_of_orders,
       SUM(nettotal) AS total_sales,
       MIN(orderdate) AS first_purchase,
       MAX(orderdate) AS last_purchase
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE time_filter()
    GROUP BY 1
    HAVING COUNT(DISTINCT orderid::string) > 1;

Let's explain each new column:

  • number_of_orders: This column totals the number of orders made by each distinct customer, or orderid.

  • total_sales: This column takes the sum of the values in nettotal from the original data.

  • first_purchase: This column lists the date of each customer's initial purchase by filtering through the orderdate column in the original data.

  • last_purchase: This column lists the date of each customer's last purchase by filtering through the orderdate column in the original data.

By grouping our orders by 1 and then specifying that the count must be greater than 1, you filter out all non-repeat customers.

Step 3 - Query the data

Here's the code:

SELECT * FROM default_glue_catalog.upsolver_samples.orders_upsert_with_insert LIMIT 10;

Use MERGE to transform your data

Merging data provides flexibility with regard to when and how rows are updated, inserted, or deleted.

Using MERGE to transform your data involves 4 steps:

  1. Create a table

  2. Apply transformations and MERGE data into the newly-created table

  3. Query the data

Step 1 - Create a table to hold your data

To use your data, you must create a table to hold it.

To upsert your data, your target table should have at least one primary key that is used to match rows between your target and source tables. If your target table is an Upsolver-managed table, you can define a primary key as follows:

Here's the code:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_upsert_with_merge(
    customer_email string)
    PRIMARY KEY customer_email;

Step 2 - Apply transformations and MERGE data into your table

You can now apply transformations on your data and write the results into your target table by creating a MERGE job.

In this scenario, the name of your target table is "default_glue_catalog.upsolver_samples.orders_upsert_with_merge" and we aggregate the data by customer and then filter the data so that the table includes only repeat customers.

Here's the code:

CREATE JOB merge_orders_upsert
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = TRUE
    RUN_INTERVAL = 1 MINUTE
    AS MERGE INTO default_glue_catalog.upsolver_samples.orders_upsert_with_merge AS target
    /*
    	Use the SELECT statement below to choose your columns and performed the desired transformations.
	
	In this example, we aggregate the sample orders data by customer and filter it to only include repeat purchasers.
    */            
    USING (SELECT customer.email AS customer_email, 
	     COUNT(DISTINCT orderid) AS number_of_orders,
	     SUM(nettotal) AS total_sales,
   	     MIN(orderdate) AS first_purchase,
	     MAX(orderdate) AS last_purchase
	  FROM default_glue_catalog.upsolver_samples.orders_raw_data
	  WHERE time_filter()
	  GROUP BY 1
	  HAVING COUNT(DISTINCT orderid::string) > 1) source
          ON (target.customer_email = source.customer_email)
          WHEN MATCHED THEN REPLACE -- Update if primary keys match
          WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME; -- Insert if primary key is unique (new record)

Let's explain each new column.

  • number_of_orders: This column totals the number of orders made by each distinct customer, or orderid.

  • total_sales: This column takes the sum of the values in nettotal from the original data.

  • first_purchase: This column lists the date of each customer's initial purchase by filtering through the orderdate column in the original data.

  • last_purchase: This column lists the date of each customer's last purchase by filtering through the orderdate column in the original data.

By grouping your orders by 1 and then specifying that the count must be greater than 1, you filter out all non-repeat customers from the target table.

The condition in the ON statement tells SQLake which columns to match between the target and source tables. If you omit the ON statement, there should be a natural join for all of the primary key and partition columns. Otherwise, there must be a column_condition for each primary key of the target table.

Step 3 - Query the data

The final step is to query your data.

Here's the code:

SELECT * FROM default_glue_catalog.upsolver_samples.orders_upsert_with_merge LIMIT 10;

Conclusion

This guide explained how to take raw data from S3 and refine it using both INSERT and MERGE jobs.

Using only familiar SQL syntax, you can use SQLake to create data pipelines and organize your data to more easily perform analytics and ML.

As your business needs evolve, so can your data. In the future, you can create additional jobs that use the same staging table as the source of creativity and innovation, while your pipelines indefinitely keep your data fresh.

Try it yourself

To launch the template:

  1. Launch SQLake by navigating to https://db.upsolver.com

  2. Sign up by entering your email address and setting a password.

  3. When you have finished clicking through the onboarding panel, a panel with a list of templates displays.

  4. Click the Upserting rows (sample data) template.

Last updated