Transforming data with SQL
This article goes over the various ways to transform your data with SQL in Upsolver.
Transform with SQL enables the following stream transformations:
Filter your raw data structures such as user activity, IoT and sensors data, application activity data, and online advertisements statistics.
These data sources will often contain more data than necessary for your analysis; Upsolver allows you to filter those data structures to have only the required data.
Stream transformations are configured when creating an Upsolver output.
Filter a stream using the WHERE
clause
WHERE
clauseTransform with SQL can filter a stream using the WHERE
clause just like in ANSI SQL.
For the following examples, assume that:
1. Three events stream into the data source heartbeat
over time:
user_id | device_id | epoch | heart_rate |
1 | 1234 | 1520672112456 | 81 |
2 | 5567 | 1520672112456 | 79 |
1 | 1234 | 1520672113456 | 102 |
2. Three events stream into data source location
over time:
user_id | epoch | latitude | longitude |
1 | 1573034761 | 28.545926 | 31.577451 |
2 | 1573034761 | 44.032321 | 1.356295 |
1 | 1573035761 | 28.545926 | 31.577451 |
Example 1:
The following query:
Results in the following output:
user_id | device_id | epoch | heart_rate |
1 | 1234 | 1520672112456 | 81 |
1 | 1234 | 1520672113456 | 102 |
We have filtered the data source ֿֿֿheartbeat
to get only the events where heart_rate
is bigger than 80.
Example 2:
The following query:
Results in the following output:
user_id | device_id | epoch | heart_rate |
1 | 1234 | 1520672112456 | 81 |
We have filtered the data source ֿֿֿheartbeat
to get only the events where heart_rate
is between 79 and 102.
Example 3:
The following query:
Results in the following output:
user_id | device_id | epoch | heart_rate |
1 | 1234 | 1520672112456 | 81 |
2 | 5567 | 1520672112456 | 79 |
1 | 1234 | 1520672113456 | 102 |
We have filtered the data source ֿֿֿheartbeat
to get only the events where heart_rate
is greater than 79 or has user_id
equal to 2.
Join data from several data sources
You can use Transform with SQL to combine data from your streaming data source with data arriving in other streams, historical aggregations, or reference data files by using the JOIN
syntax.
The result of the Transform with SQL join is a new table that is populated with the column values that you specify in the SELECT
statement.
With Transform with SQL, there is no need to write code in any programming language such as Java or Python to join data from various data sources—all you need is to use the Transform with SQL JOIN
.
JOIN
clause:
JOIN
clause:Transform with SQL JOIN
clause uses the following syntax:
For the following example, assume that we have the following events structure in a data source clicks over time:
user_id | campaign_id |
1 | "camp1" |
2 | "camp2" |
Also assume that we have the following events in a data source orders over time:
user_id | credit_card_type | order_price |
1 | "VISA" | 150 |
1 | "VISA" | 50 |
2 | "AMEX" | 20 |
If we define a table as:
We will have the following table as output:
user_id | campaign_id | user_orders |
1 | "camp1" | 2 |
2 | "camp2" | 1 |
Join requirements:
The join table must be either a lookup table or reference data.
Synchronize streams:
Since Upsolver is a streaming system, all joins are applied in stream time. We will illustrate how to use the following:
To demonstrate the concepts, assume we have the following data sources:
id | time |
1 | 14/10/2019 00:00 |
1 | 14/10/2019 00:01 |
1 | 14/10/2019 00:02 |
1 | 14/10/2019 00:04 |
1 | 14/10/2019 05:00 |
For the above data sources:
id
is an integertime
is a timestamp inDD/MM/YYYY HH:MM
formatrepresents the event's creation time
WAIT
interval
WAIT
intervalIn order to synchronize between streams, Transform with SQL supports the WAIT integer { MINUTE[S] HOUR[S] DAY[S] } [ ALIGNED ]
syntax.
If WAIT X MINUTES
(where X
is an integer) is specified, Upsolver ensures that the state of the joined look-up table is ready X
minutes ahead of the data being processed. This will cause the output to be delayed by X
minutes.
We will demonstrate this using the following SQL query:
The result of this query is as follows:
Note that while performing the JOIN
, we wait 1 minute for each event in main data source
before creating the output.
If we look at the first row in the above table, we can see that for time 14/10/2019 00:00
we take into consideration only the first event in lookup data source
which has the same value in the time field—that event arrived in 1 MINUTE
delay.
This is the behavior since for each event in main data source
, we look at the matching events in lookup data source
which has values in their time field which are less than 14/10/2019 00:00
plus 1 minute (due to WAIT 1 MINUTE
statement) which results in 00:01
as a time comparator. The only event which complies with this comparator is the first event in lookup data source
which has 00:00
in its time field.
Without using WAIT 1 MINUTE
, the result would be:
If we look again at the first row in the above table, we can see that for time 14/10/2019 00:00
, we have no event with time prior to 14/10/2019 00:00
(excluding this timestamp itself).
As such, since we did not use the WAIT
statement, for the timestamp 14/10/2019 00:00
in main data source
, we will look for matching events in lookup data source
with time less than 14/10/2019 00:00
. Since such an event does not exist, the value is null.
ALIGNED
ALIGNED
If the keyword ALIGNED
is used, calculating the query's result will wait for the next aligned window. For example, data arriving after 08:35
and before 08:40
will wait until 08:40
.
The alignment is done using unix epoch time, so WAIT 1 DAY
will wait until 00:00 UTC
of the following day.
We will demonstrate this using the following Transform with SQL query:
The result of this query is the following:
Note that while performing the join, we wait 5 hours to be aligned before creating the output. This means that for every event in our main data source
, we would wait for the event on 14/19/2019 05:00
to create the output. That is why the value
column in the output has the value 4 for all of the events.
LATEST
LATEST
When running a query over historical data, Upsolver maintains the time relation between streams the same way that it would when processing data that is up to date.
The LATEST
keyword is intended to handle situations where initial data is dumped into a lookup table after the source stream started running. This forces the query to use the state of the joined lookup table that exists when it is run for all historical data. Data that arrived after the query was run will not be affected by LATEST
.
We will demonstrate this using the following Transform with SQL query:
The result of this query depends on the time you run it relative to the event's time.
If the query run time is less than the event’s creation time field (e.g. if you run the query on 13/10/2019 09:00
), the result is:
If the query run time is greater than the event’s creation time, we relate to the query's run time instead of the event creation time field. For example, if you run the query on 15/10/2019 09:00
, the result is:
Perform calculations and conversions
Transform with SQL enables you to improve the quality of your data and ensure it follows standard conventions (e.g. convert time in Epoch to standard mm/dd/yyyy
).
Upsolver contains all the functions that exist in SQL as built-in functions, including special enrichment functions (e.g. IP2GEO, user agent parser), and you can also add your own User Defined Functions (UDFs) in Python.
Built-in functions and UDFs can be applied on either flat or hierarchical data. See: Query hierarchical data
For the following examples, we will assume that:
1. Three events stream into the data source heartbeat
over time:
user_id | device_id | epoch | heart_rate |
1 | 1234 | 1520672112456 | 81 |
2 | 5567 | 1520672112456 | 79 |
1 | 1234 | 1520672113456 | 102 |
2. Three events stream into data source location
over time:
user_id | epoch | latitude | longitude |
1 | 1573034761 | 28.545926 | 31.577451 |
2 | 1573034761 | 44.032321 | 1.356295 |
1 | 1573035761 | 28.545926 | 31.577451 |
We will use the built-in function UNIX_EPOCH_TO_DATE
which converts unix epoch date to a date as follows:
The following query creates an output which is based on a stream transformation using UNIX_EPOCH_TO_DATA
in the SELECT
clause:
The result of the query will be the following output:
user_id | device_id | date |
1 | 1234 | "2018-03-10T08:55:12.456Z" |
2 | 5567 | "2018-03-10T08:55:12.456Z" |
1 | 1234 | "2018-03-10T08:55:13.456Z" |
The following query creates an output which is based on a stream transformation using UNIX_EPOCH_TO_DATA
in two steps:
1. Calculate the transformation:
2. Use the calculated transformation in the query:
Last updated