Upsolver
Search…
Transforming data with SQL
This article goes over the various ways to transform your data with SQL in Upsolver.
Transform with SQL enables the following stream transformations:
Filter a stream using the WHERE clause
Join data from several data sources
Perform calculations and conversions
Filter your raw data structures such as user activity, IoT and sensors data, application activity data, and online advertisements statistics.
These data sources will often contain more data than necessary for your analysis; Upsolver allows you to filter those data structures to have only the required data.
Combine data from multiple sources to gather deeper insights and create aggregated views.
Upsolver allows you to perform this with a SQL statement, much like a relational database.
Improve the quality of your data and ensure it follows standard conventions (convert time in Epoch to standard mm/dd/yyyy).
Upsolver contains all the functions that exist in SQL, including special enrichment functions (e.g. IP2GEO, user agent parser) and you can add your own UDFs in Python.
Stream transformations are configured when creating an Upsolver output.

Filter a stream using the WHERE clause

Transform with SQL can filter a stream using the WHERE clause just like in ANSI SQL.

For the following examples, assume that:

1. Three events stream into the data source heartbeat over time:
1
{ "user_id": 1, "device_id": 1234, "epoch" : 1520672112456, "heart_rate" : 81}
2
{ "user_id": 2, "device_id": 5567, "epoch" : 1520672112456, "heart_rate" : 79}
3
{ "user_id": 1, "device_id": 1234, "epoch" : 1520672113456, "heart_rate" : 102}
Copied!
user_id
device_id
epoch
heart_rate
1
1234
1520672112456
81
2
5567
1520672112456
79
1
1234
1520672113456
102
2. Three events stream into data source location over time:
1
{"user_id": 1, "epoch" : 1573034761, "latitude" : 28.545926, "longitude" : 31.577451}
2
{"user_id": 2, "epoch" : 1573034761, "latitude" : 44.032321, "longitude" : 1.356295}
3
{"user_id": 1, "epoch" : 1573035761, "latitude" : 28.545926, "longitude" : 31.577451}
Copied!
user_id
epoch
latitude
longitude
1
1573034761
28.545926
31.577451
2
1573034761
44.032321
1.356295
1
1573035761
28.545926
31.577451

Example 1:

The following query:
1
SELECT *
2
FROM heartbeat
3
WHERE heart_rate > 80
Copied!
Results in the following output:
user_id
device_id
epoch
heart_rate
1
1234
1520672112456
81
1
1234
1520672113456
102
We have filtered the data source ֿֿֿheartbeat to get only the events where heart_rate is bigger than 80.

Example 2:

The following query:
1
SELECT *
2
FROM heartbeat
3
WHERE heart_rate > 79 AND heart_rate < 102
Copied!
Results in the following output:
user_id
device_id
epoch
heart_rate
1
1234
1520672112456
81
We have filtered the data source ֿֿֿheartbeat to get only the events where heart_rate is between 79 and 102.

Example 3:

The following query:
1
SELECT *
2
FROM heartbeat
3
WHERE heart_rate > 79 OR user_id = 2
Copied!
Results in the following output:
user_id
device_id
epoch
heart_rate
1
1234
1520672112456
81
2
5567
1520672112456
79
1
1234
1520672113456
102
We have filtered the data source ֿֿֿheartbeat to get only the events where heart_rate is greater than 79 or has user_id equal to 2.

Join data from several data sources

You can use Transform with SQL to combine data from your streaming data source with data arriving in other streams, historical aggregations, or reference data files by using the JOIN syntax.
The result of the Transform with SQL join is a new table that is populated with the column values that you specify in the SELECT statement.
With Transform with SQL, there is no need to write code in any programming language such as Java or Python to join data from various data sources—all you need is to use the Transform with SQL JOIN.

JOIN clause:

Transform with SQL JOIN clause uses the following syntax:
1
[ { INNER | LEFT [ OUTER ] } ] JOIN [ LATEST ]
2
{ table_name ( query_expr ) } [ [ AS ] alias ] [ WAIT integer { MINUTE[S] HOUR[S] DAY[S] } [ ALIGNED ] ]
3
{ ON bool_expression | USING ( field_name [, ...] ) }
Copied!
For the following example, assume that we have the following events structure in a data source clicks over time:
1
{"user_id" : 1, "campaign_id" : "camp1"}
2
{"user_id" : 2, "campaign_id" : "camp2"}
Copied!
user_id
campaign_id
1
"camp1"
2
"camp2"
Also assume that we have the following events in a data source orders over time:
1
{"user_id" : 1, "credit_card_type" : "VISA", "order_price" : 150 }
2
{"user_id" : 1, "credit_card_type" : "VISA", "order_price" : 50}
3
{"user_id" : 2, "credit_card_type" : "AMEX", "order_price" : 20}
Copied!
user_id
credit_card_type
order_price
1
"VISA"
150
1
"VISA"
50
2
"AMEX"
20
If we define a table as:
1
SELECT user_id, campaign_id
2
FROM clicks JOIN (SELECT COUNT_DISTINCT(*) as user_orders,
3
User_id AS user_id
4
FROM orders
5
Group by user_id) user_purchases
6
ON user_purchases.user_id = user_id
Copied!
We will have the following table as output:
user_id
campaign_id
user_orders
1
"camp1"
2
2
"camp2"
1

Join requirements:

Join table
ON statement
The join table must be either a lookup table or reference data.
The ON statement must be in one of the following forms:
    lookup_key_1 = expression AND lookup_key_2 = expression ...
    lookup_key_1 IN (expression1, ...) AND lookup_key_2 IN (expression1, ...) ...
Note: Each key in the lookup table must be mapped to one or more expressions using either = or IN within the ON statement.

Synchronize streams:

Since Upsolver is a streaming system, all joins are applied in stream time. We will illustrate how to use the following:
To demonstrate the concepts, assume we have the following data sources:
main data source
lookup data source
id
time
1
14/10/2019 00:00
1
14/10/2019 00:01
1
14/10/2019 00:02
1
14/10/2019 00:04
1
14/10/2019 05:00
id
time
value
1
14/10/2019 00:00
1
1
14/10/2019 00:01
2
1
14/10/2019 00:02
3
1
14/10/2019 05:00
4
For the above data sources:
    id is an integer
    time is a timestamp in DD/MM/YYYY HH:MM format
      represents the event's creation time

WAIT interval

In order to synchronize between streams, Transform with SQL supports the WAIT integer { MINUTE[S] HOUR[S] DAY[S] } [ ALIGNED ] syntax.
If WAIT X MINUTES (where X is an integer) is specified, Upsolver ensures that the state of the joined look-up table is ready X minutes ahead of the data being processed. This will cause the output to be delayed by X minutes.
We will demonstrate this using the following SQL query:
1
SELECT id,
2
value,
3
Time
4
From "main data source"
5
LEFT JOIN LATEST
6
(
7
SELECT id, LAST(value) value
8
FROM "lookup data source"
9
GROUP BY id
10
) l WAIT 1 MINUTE
11
ON l.id = id
Copied!
The result of this query is as follows:
Note that while performing the JOIN, we wait 1 minute for each event in main data source before creating the output.
If we look at the first row in the above table, we can see that for time 14/10/2019 00:00 we take into consideration only the first event in lookup data source which has the same value in the time field—that event arrived in 1 MINUTE delay.
This is the behavior since for each event in main data source, we look at the matching events in lookup data source which has values in their time field which are less than 14/10/2019 00:00 plus 1 minute (due to WAIT 1 MINUTE statement) which results in 00:01 as a time comparator. The only event which complies with this comparator is the first event in lookup data source which has 00:00 in its time field.
Without using WAIT 1 MINUTE, the result would be:
If we look again at the first row in the above table, we can see that for time 14/10/2019 00:00, we have no event with time prior to 14/10/2019 00:00 (excluding this timestamp itself).
As such, since we did not use the WAIT statement, for the timestamp 14/10/2019 00:00 in main data source, we will look for matching events in lookup data source with time less than 14/10/2019 00:00. Since such an event does not exist, the value is null.

ALIGNED

If the keyword ALIGNED is used, calculating the query's result will wait for the next aligned window. For example, data arriving after 08:35 and before 08:40 will wait until 08:40.
The alignment is done using unix epoch time, so WAIT 1 DAY will wait until 00:00 UTC of the following day.
We will demonstrate this using the following Transform with SQL query:
1
SELECT id,
2
value,
3
Time
4
From "main data source"
5
LEFT JOIN
6
(
7
SELECT id, LAST(value) value
8
FROM "lookup data source"
9
GROUP BY id
10
) l WAIT 5 HOURS ALIGNED
11
ON l.id = id
Copied!
The result of this query is the following:
Note that while performing the join, we wait 5 hours to be aligned before creating the output. This means that for every event in our main data source, we would wait for the event on 14/19/2019 05:00 to create the output. That is why the value column in the output has the value 4 for all of the events.

LATEST

When running a query over historical data, Upsolver maintains the time relation between streams the same way that it would when processing data that is up to date.
The LATEST keyword is intended to handle situations where initial data is dumped into a lookup table after the source stream started running. This forces the query to use the state of the joined lookup table that exists when it is run for all historical data. Data that arrived after the query was run will not be affected by LATEST.
We will demonstrate this using the following Transform with SQL query:
1
SELECT id,
2
value,
3
Time
4
From "main data source"
5
LEFT JOIN LATEST
6
(
7
SELECT id, LAST(value) value
8
FROM "lookup data source"
9
GROUP BY id
10
) l WAIT 1 MINUTE
11
ON l.id = id
Copied!
The result of this query depends on the time you run it relative to the event's time.
If the query run time is less than the event’s creation time field (e.g. if you run the query on 13/10/2019 09:00), the result is:
If the query run time is greater than the event’s creation time, we relate to the query's run time instead of the event creation time field. For example, if you run the query on 15/10/2019 09:00, the result is:

Perform calculations and conversions

Transform with SQL enables you to improve the quality of your data and ensure it follows standard conventions (e.g. convert time in Epoch to standard mm/dd/yyyy).
Upsolver contains all the functions that exist in SQL as built-in functions, including special enrichment functions (e.g. IP2GEO, user agent parser), and you can also add your own User Defined Functions (UDFs) in Python.
Built-in functions and UDFs can be applied on either flat or hierarchical data. See: Query hierarchical data

For the following examples, we will assume that:

1. Three events stream into the data source heartbeat over time:
1
{ "user_id": 1, "device_id": 1234, "epoch" : 1520672112456, "heart_rate" : 81}
2
{ "user_id": 2, "device_id": 5567, "epoch" : 1520672112456, "heart_rate" : 79}
3
{ "user_id": 1, "device_id": 1234, "epoch" : 1520672113456, "heart_rate" : 102}
Copied!
user_id
device_id
epoch
heart_rate
1
1234
1520672112456
81
2
5567
1520672112456
79
1
1234
1520672113456
102
2. Three events stream into data source location over time:
1
{"user_id": 1, "epoch" : 1573034761, "latitude" : 28.545926, "longitude" : 31.577451}
2
{"user_id": 2, "epoch" : 1573034761, "latitude" : 44.032321, "longitude" : 1.356295}
3
{"user_id": 1, "epoch" : 1573035761, "latitude" : 28.545926, "longitude" : 31.577451}
Copied!
user_id
epoch
latitude
longitude
1
1573034761
28.545926
31.577451
2
1573034761
44.032321
1.356295
1
1573035761
28.545926
31.577451
We will use the built-in function UNIX_EPOCH_TO_DATE which converts unix epoch date to a date as follows:
UNIX_EPOCH_TO_DATE examples
The following query creates an output which is based on a stream transformation using UNIX_EPOCH_TO_DATA in the SELECT clause:
1
SELECT user_id, device_id, UNIX_EPOCH_TO_DATA(epoch) as date
2
FROM heartbeat
Copied!
The result of the query will be the following output:
user_id
device_id
date
1
1234
"2018-03-10T08:55:12.456Z"
2
5567
"2018-03-10T08:55:12.456Z"
1
1234
"2018-03-10T08:55:13.456Z"
The following query creates an output which is based on a stream transformation using UNIX_EPOCH_TO_DATA in two steps:
1. Calculate the transformation:
1
SET human_readable_date = UNIX_EPOCH_TO_DATA(epoch);
Copied!
2. Use the calculated transformation in the query:
1
SELECT user_id, device_id, human_readable_date
2
FROM heartbeat
Copied!
Last modified 1yr ago