Join Two Data Streams

This quickstart explains how to join multiple data streams into one table.

Before you start

Ensure you have created a connection to each data source. Then create a staging table for each source, and a job for each table to ingest the data into the respective table. Having completed these steps, you can then join the two data streams using a materialized view.

To join two tables in Upsolver, you must convert one of them into a materialized view that maps a set of key columns to aggregated values. When you later join your main table with the materialized view, Upsolver matches the records based on the lookup table's key column values. You can then add the values from the aggregated columns to the matching record in the main table.

Create a materialized view

In this example, convert the sales table, named sales_info_raw_data, into a materialized view to later join with the orders data in your main table.

Here's the code to create a materialized view:

CREATE MATERIALIZED VIEW 
   default_glue_catalog.upsolver_samples.physical_store_orders_materialized_view 
AS 
    SELECT orderid, 
       LAST(saleinfo_source) AS source,
       LAST(saleinfo_store_location_country) AS country,
       LAST(saleinfo_store_location_name) AS name,
       LAST(saleinfo_store_servicedby_employeeid) AS employeeid,
       LAST(saleinfo_store_servicedby_firstname) AS firstname,
       LAST(saleinfo_store_servicedby_lastname) AS lastname
    FROM default_glue_catalog.upsolver_samples.sales_info_raw_data
    GROUP BY orderid;

Let's understand what this code does.

Note the lines of code beginning with LAST. With these lines, you are selecting the last piece of data in each of the categories in parentheses and renaming them. For example, you select the last entry in the saleinfo_source column and rename it to country. The query groups the aggregations by the orderid column.

Create a target table for your joined data

Before you join your data, you must create a table to write the data into. This table is created in the AWS Glue Data Catalog and is available to query within Amazon Athena.

Here's the code to create the table:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data;

Join orders with the last store employee who dealt with the order

You must have data in your staging table before running this job.

The last step is to join your data. Here you use a transformation job that uses an INSERT statement to join the orders staging table and the sales lookup table. The join matches the key column in the lookup table to the corresponding column in the orders staging table. The job then inserts the data into the target Athena table you previously created.

Here’s the code:

CREATE SYNC JOB join_two_tables_orders_with_last_employee
   START_FROM = BEGINNING
   ADD_MISSING_COLUMNS = true
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data 
 MAP_COLUMNS_BY_NAME
   SELECT orderid,
      l.employeeid as employeeid, 
      l.firstname as firstname, 
      l.lastname as lastname
      FROM default_glue_catalog.upsolver_samples.orders_raw_data
      LEFT JOIN physical_store_orders_lookup l
      ON l.orderid = orderid
      AS OF $event_time + INTERVAL '3' MINUTES
      WHERE l.source = 'Store'
      AND $commit_time between execution_start_time() AND execution_end_time();

You have successfully created a table joining physical store orders with the employee who completed the order.

Last updated