Comment on page
Join two data streams
This quickstart explains how to join multiple data streams into one table.
Ensure you have created a connection to each data source. Then create a staging table for each source, and a job for each table to ingest the data into the respective table. Having completed these steps, you can then join the two data streams using a materialized view.
To join two tables in Upsolver, you must convert one of them into a materialized view that maps a set of key columns to aggregated values. When you later join your main table with the materialized view, Upsolver matches the records based on the lookup table's key column values. You can then add the values from the aggregated columns to the matching record in the main table.
In this example, convert the sales table, named sales_info_raw_data, into a materialized view to later join with the orders data in your main table.
Here's the code to create a materialized view:
CREATE MATERIALIZED VIEW default_glue_catalog.upsolver_samples.physical_store_orders_materialized_view AS
LAST(saleinfo_source) AS source,
LAST(saleinfo_store_location_country) AS country,
LAST(saleinfo_store_location_name) AS name,
LAST(saleinfo_store_servicedby_employeeid) AS employeeid,
LAST(saleinfo_store_servicedby_firstname) AS firstname,
LAST(saleinfo_store_servicedby_lastname) AS lastname
GROUP BY orderid;
Let's understand what this code does.
Note the lines of code beginning with
LAST. With these lines, you are selecting the last piece of data in each of the categories in parentheses and renaming them. For example, you select the last entry in the saleinfo_source column and rename it to country. The query groups the aggregations by the orderid column.
Before you join your data, you must create a table to write the data into. This table is created in the AWS Glue Data Catalog and is available to query within Amazon Athena.
Here's the code to create the table:
CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data;
Note: You must have data in your staging table before running this job.
The last step is to join your data. Here you use a transformation job that uses an
INSERTstatement to join the orders staging table and the sales lookup table. The join matches the key column in the lookup table to the corresponding column in the orders staging table. The job then inserts the data into the target Athena table you previously created.
Here’s the code:
CREATE SYNC JOB join_two_tables_orders_with_last_employee
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = true
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
l.employeeid as employeeid,
l.firstname as firstname,
l.lastname as lastname
LEFT JOIN physical_store_orders_lookup l
ON l.orderid = orderid
AS OF $event_time + INTERVAL '3' MINUTES
WHERE l.source = 'Store'
AND $commit_time between execution_start_time() AND execution_end_time();
You have successfully created a table joining physical store orders with the employee who completed the order.