Upsolver
Contact Support
  • Welcome to Upsolver
  • Getting Started
    • Start using Upsolver for free
    • Get started as a Upsolver user
      • Upsolver in 5 minutes
        • Upsolver Quickstart in 5 minutes
          • Additional sandbox fun
        • Amazon Athena data output
        • MySQL (AWS RDS) data output
        • Local MySQL data output
      • Upsolver free training
        • Introduction to Upsolver
          • Transform and write data to Amazon Athena
          • Pre-aggregate data for efficiency and performance
          • UPSERT streaming data to Amazon Athena
      • Prerequisites for AWS deployment
      • AWS integration
      • Deploy Upsolver on your AWS account
      • Prerequisites for Azure Deployment
      • Azure Integration
        • Prerequisites for Azure Users
        • Log into Upsolver
        • Log into Azure & Authenticate
        • Set Up and Deploy Azure Resources
        • Delegate Resource Group, and Deploy Upsolver in Azure
        • Integrate Azure with Upsolver
    • Upsolver concepts
      • Deployment models
      • Upsolver components
      • Data ingestion
    • Upsolver Amazon AWS deployment guide
      • Private VPC
      • Upsolver VPC
      • AWS role permissions
      • VPC peering
    • Tutorials and FAQ
      • Tutorials
        • How To Re-process Data
        • Create an Amazon S3 data source
        • Create an Amazon Athena data output
        • Join multiple data streams for real-time analytics
        • Use Upsolver to index less data into Splunk
        • Upsert and delete use case
        • AWS S3 to Athena use case
        • Merge data use case
        • Full vs. Partial Inbound Data Records
      • FAQ
      • Infrastructure
        • What is a dry-run cluster?
    • Glossary
      • Language guide
        • SQL syntax reference
        • Functions
          • Aggregation Functions
            • APPROX_COUNT_DISTINCT
            • APPROX_COUNT_DISTINCT_EACH
            • AVG
            • AVG_EACH
            • AVG_TIME_SERIES
            • COLLECT_SET
            • COLLECT_SET_EACH
            • COUNT
            • COUNT(*)
            • COUNT_DISTINCT
            • COUNT_EACH
            • COUNT_IF
            • DECAYED_SUM
            • DYNAMIC_SESSIONS
            • FIRST
            • FIRST_ARRAY
            • FIRST_EACH
            • FIRST_TIME_SERIES
            • LAST
            • LAST_ARRAY
            • LAST_EACH
            • LAST_K
            • LAST_K_EACH
            • LAST_TIME_SERIES
            • MAX
            • MAX_BY
            • MAX_EACH
            • MAX_TIME_SERIES
            • MIN
            • MIN_BY
            • MIN_EACH
            • MIN_TIME_SERIES
            • SESSION_COUNT
            • STD_DEV
            • STD_DEV_EACH
            • STRING_MAX_EACH
            • STRING_MIN_EACH
            • SUM
            • SUM_EACH
            • SUM_TIME_SERIES
            • WEIGHTED_AVERAGE
          • Calculated functions
            • Aerospike functions
            • Array functions
            • Conditional functions
            • Date functions
            • External API functions
            • Filter functions
            • Numeric functions
            • Spatial functions
            • String functions
            • Structural functions
              • ZIP
            • Type conversion functions
      • Data formats
      • Data types and features
      • Database output options
      • Upsolver shards
      • Permissions list
      • Index
    • Troubleshooting
      • My CloudFormation stack failed to deploy
      • My private API doesn't start or I can't connect to it
        • Elastic IPs limit reached
        • EC2 Spot Instance not running
        • DNS cache
        • Security group not open
      • My compute cluster doesn't start
      • I can't connect to my Kafka cluster
      • I can't create an S3 data source
      • Data doesn't appear in Athena table
      • I get an exception when querying my Athena table
      • Unable to define a JDBC (Postgres) connection
  • Connecting data sources
    • Amazon AWS data sources
      • Amazon S3 data source
        • Quick guide: S3 data source
        • Full guide: S3 data source
      • Amazon Kinesis Stream data source
      • Amazon S3 over SQS data source
      • Amazon AppFlow data source
        • Setup Google Analytics client ID and client secret.
    • Microsoft Azure data sources
      • Azure Blob storage data source
      • Azure Event Hubs data source
    • Kafka data source
    • Google Cloud Storage data source
    • File upload data source
    • CDC data sources (Debezium)
      • MySQL CDC data source
        • Binlog retention in MySQL
      • PostgreSQL CDC database replication
    • JDBC data source
    • HDFS data source
    • Data source UI
    • Data source properties
  • Data outputs and data transformation
    • Data outputs
      • Amazon AWS data outputs
        • Amazon S3 data output
        • Amazon Athena data output
          • Quick guide: Athena data output
          • Full guide: Athena data output
          • Output all data source fields to Amazon Athena
        • Amazon Kinesis data output
        • Amazon Redshift data output
        • Amazon Redshift Spectrum data output
          • Connect Redshift Spectrum to Glue Data Catalog
        • Amazon SageMaker data output
      • Data lake / database data outputs
        • Snowflake data output
          • Upsert data to Snowflake
        • MySQL data output
        • PostgreSQL data output
        • Microsoft SQL Server data output
        • Elasticsearch data output
        • Dremio
        • PrestoDB
      • Upsolver data output
      • HDFS data output
      • Google Storage data output
      • Microsoft Azure Storage data output
      • Qubole data output
      • Lookup table data output
        • Lookup table alias
        • API Playground
        • Serialization of EACH aggregations
      • Kafka data output
    • Data transformation
      • Transform with SQL
        • Mapping data to a desired schema
        • Transforming data with SQL
        • Aggregate streaming data
        • Query hierarchical data
      • Work with Arrays
      • View outputs
      • Create an output
        • Modify an output in SQL
          • Quick guide: SQL data transformation
        • Add calculated fields
        • Add filters
        • Add lookups
          • Add lookups from data sources
          • Add lookups from lookup tables
          • Adding lookups from reference data
        • Output properties
          • General output properties
      • Run an output
      • Edit an output
      • Duplicate an output
      • Stop an output
      • Delete an output
  • Guide for developers
    • Upsolver REST API
      • Create a data source
      • Modify a data source
      • API content formats
    • CI/CD on Upsolver
  • Administration
    • Connections
      • Amazon S3 connection
      • Amazon Kinesis connection
      • Amazon Redshift connection
      • Amazon Athena connection
      • Amazon S3 over SQS connection
      • Google Storage connection
      • Azure Blob storage connection
      • Snowflake connection
      • MySQL connection
      • Elasticsearch connection
      • HDFS connection
      • Qubole connection
      • PostgreSQL connection
      • Microsoft SQL Server connection
      • Spotinst Private VPC connection
      • Kafka connection
    • Clusters
      • Cluster types
        • Compute cluster
        • Query cluster
        • Local API cluster
      • Monitoring clusters
      • Cluster tasks
      • Cluster Elastic IPs
      • Cluster properties
      • Uploading user-provided certificates
    • Python UDF
    • Reference data
    • Workspaces
    • Monitoring
      • Credits
      • Delays In Upsolver pipelines
      • Monitoring reports
        • Monitoring system properties
        • Monitoring metrics
    • Security
      • IAM: Identity and access management
        • Manage users
        • Manage groups
        • Manage policies
      • Git integration
      • Single sign-on with SAML
        • Microsoft Azure AD with SAML sign-on
        • Okta with SAML sign-on
        • OneLogin with SAML sign-on
      • AMI security updates
  • Support
    • Upsolver support portal
  • Change log
  • Legal
Powered by GitBook
On this page
  • Filter a stream using the WHERE clause
  • Join data from several data sources
  • JOIN clause:
  • Join requirements:
  • Synchronize streams:
  • Perform calculations and conversions

Was this helpful?

  1. Data outputs and data transformation
  2. Data transformation
  3. Transform with SQL

Transforming data with SQL

This article goes over the various ways to transform your data with SQL in Upsolver.

Transform with SQL enables the following stream transformations:

Filter your raw data structures such as user activity, IoT and sensors data, application activity data, and online advertisements statistics.

These data sources will often contain more data than necessary for your analysis; Upsolver allows you to filter those data structures to have only the required data.

Combine data from multiple sources to gather deeper insights and create aggregated views.

Upsolver allows you to perform this with a SQL statement, much like a relational database.

Improve the quality of your data and ensure it follows standard conventions (convert time in Epoch to standard mm/dd/yyyy).

Upsolver contains all the functions that exist in SQL, including special enrichment functions (e.g. IP2GEO, user agent parser) and you can add your own UDFs in Python.

Stream transformations are configured when creating an Upsolver output.

Filter a stream using the WHERE clause

Transform with SQL can filter a stream using the WHERE clause just like in ANSI SQL.

For the following examples, assume that:

1. Three events stream into the data source heartbeat over time:

{ "user_id": 1, "device_id": 1234, "epoch" : 1520672112456, "heart_rate" : 81}
{ "user_id": 2, "device_id": 5567, "epoch" : 1520672112456, "heart_rate" : 79}
{ "user_id": 1, "device_id": 1234, "epoch" : 1520672113456, "heart_rate" : 102}

user_id

device_id

epoch

heart_rate

1

1234

1520672112456

81

2

5567

1520672112456

79

1

1234

1520672113456

102

2. Three events stream into data source location over time:

{"user_id": 1, "epoch" : 1573034761, "latitude" : 28.545926, "longitude" : 31.577451}
{"user_id": 2, "epoch" : 1573034761, "latitude" : 44.032321, "longitude" : 1.356295}
{"user_id": 1, "epoch" : 1573035761, "latitude" : 28.545926, "longitude" : 31.577451}

user_id

epoch

latitude

longitude

1

1573034761

28.545926

31.577451

2

1573034761

44.032321

1.356295

1

1573035761

28.545926

31.577451

Example 1:

The following query:

SELECT * 
FROM heartbeat
WHERE heart_rate > 80

Results in the following output:

user_id

device_id

epoch

heart_rate

1

1234

1520672112456

81

1

1234

1520672113456

102

We have filtered the data source Ö¿Ö¿Ö¿heartbeat to get only the events where heart_rate is bigger than 80.

Example 2:

The following query:

SELECT * 
FROM heartbeat
WHERE heart_rate > 79 AND heart_rate < 102 

Results in the following output:

user_id

device_id

epoch

heart_rate

1

1234

1520672112456

81

We have filtered the data source Ö¿Ö¿Ö¿heartbeat to get only the events where heart_rate is between 79 and 102.

Example 3:

The following query:

SELECT * 
FROM heartbeat
WHERE heart_rate > 79 OR user_id = 2 

Results in the following output:

user_id

device_id

epoch

heart_rate

1

1234

1520672112456

81

2

5567

1520672112456

79

1

1234

1520672113456

102

We have filtered the data source Ö¿Ö¿Ö¿heartbeat to get only the events where heart_rate is greater than 79 or has user_id equal to 2.

Join data from several data sources

You can use Transform with SQL to combine data from your streaming data source with data arriving in other streams, historical aggregations, or reference data files by using the JOIN syntax.

The result of the Transform with SQL join is a new table that is populated with the column values that you specify in the SELECT statement.

With Transform with SQL, there is no need to write code in any programming language such as Java or Python to join data from various data sources—all you need is to use the Transform with SQL JOIN.

JOIN clause:

Transform with SQL JOIN clause uses the following syntax:

[ { INNER | LEFT [ OUTER ] } ] JOIN [ LATEST ]
    { table_name  ( query_expr ) } [ [ AS ] alias ] [ WAIT integer { MINUTE[S]  HOUR[S]  DAY[S] } [ ALIGNED ] ]
    { ON bool_expression | USING ( field_name [, ...] ) }

For the following example, assume that we have the following events structure in a data source clicks over time:

{"user_id" : 1, "campaign_id" : "camp1"}
{"user_id" : 2, "campaign_id" : "camp2"}

user_id

campaign_id

1

"camp1"

2

"camp2"

Also assume that we have the following events in a data source orders over time:

{"user_id" : 1, "credit_card_type" : "VISA", "order_price" : 150 }
{"user_id" : 1, "credit_card_type" : "VISA", "order_price" : 50}
{"user_id" : 2, "credit_card_type" : "AMEX", "order_price" : 20}

user_id

credit_card_type

order_price

1

"VISA"

150

1

"VISA"

50

2

"AMEX"

20

If we define a table as:

SELECT user_id, campaign_id
FROM clicks JOIN (SELECT COUNT_DISTINCT(*) as user_orders,
User_id AS user_id
    FROM orders
    Group by user_id) user_purchases
    ON user_purchases.user_id = user_id

We will have the following table as output:

user_id

campaign_id

user_orders

1

"camp1"

2

2

"camp2"

1

Join requirements:

The join table must be either a lookup table or reference data.

The ON statement must be in one of the following forms:

  • lookup_key_1 = expression AND lookup_key_2 = expression ...

  • lookup_key_1 IN (expression1, ...) AND lookup_key_2 IN (expression1, ...) ...

Note: Each key in the lookup table must be mapped to one or more expressions using either = or IN within the ON statement.

Synchronize streams:

Since Upsolver is a streaming system, all joins are applied in stream time. We will illustrate how to use the following:

To demonstrate the concepts, assume we have the following data sources:

id

time

1

14/10/2019 00:00

1

14/10/2019 00:01

1

14/10/2019 00:02

1

14/10/2019 00:04

1

14/10/2019 05:00

id

time

value

1

14/10/2019 00:00

1

1

14/10/2019 00:01

2

1

14/10/2019 00:02

3

1

14/10/2019 05:00

4

For the above data sources:

  • id is an integer

  • time is a timestamp in DD/MM/YYYY HH:MM format

    • represents the event's creation time

WAIT interval

In order to synchronize between streams, Transform with SQL supports the WAIT integer { MINUTE[S] HOUR[S] DAY[S] } [ ALIGNED ] syntax.

If WAIT X MINUTES (where X is an integer) is specified, Upsolver ensures that the state of the joined look-up table is ready X minutes ahead of the data being processed. This will cause the output to be delayed by X minutes.

We will demonstrate this using the following SQL query:

SELECT id,				
	   value,
	   Time
From "main data source"
LEFT JOIN LATEST
	(
	  SELECT id, LAST(value) value
	  FROM "lookup data source"
	  GROUP BY id
	) l WAIT 1 MINUTE
	ON l.id = id

The result of this query is as follows:

Note that while performing the JOIN, we wait 1 minute for each event in main data source before creating the output.

If we look at the first row in the above table, we can see that for time 14/10/2019 00:00 we take into consideration only the first event in lookup data source which has the same value in the time field—that event arrived in 1 MINUTE delay.

This is the behavior since for each event in main data source, we look at the matching events in lookup data source which has values in their time field which are less than 14/10/2019 00:00 plus 1 minute (due to WAIT 1 MINUTE statement) which results in 00:01 as a time comparator. The only event which complies with this comparator is the first event in lookup data source which has 00:00 in its time field.

Without using WAIT 1 MINUTE, the result would be:

If we look again at the first row in the above table, we can see that for time 14/10/2019 00:00, we have no event with time prior to 14/10/2019 00:00 (excluding this timestamp itself).

As such, since we did not use the WAIT statement, for the timestamp 14/10/2019 00:00 in main data source, we will look for matching events in lookup data source with time less than 14/10/2019 00:00. Since such an event does not exist, the value is null.

ALIGNED

If the keyword ALIGNED is used, calculating the query's result will wait for the next aligned window. For example, data arriving after 08:35 and before 08:40 will wait until 08:40.

The alignment is done using unix epoch time, so WAIT 1 DAY will wait until 00:00 UTC of the following day.

We will demonstrate this using the following Transform with SQL query:

SELECT id,				
	   value,
	   Time
From "main data source"
LEFT JOIN
	(
	  SELECT id, LAST(value) value
	  FROM "lookup data source"
	  GROUP BY id
	) l WAIT 5 HOURS ALIGNED
	ON l.id = id

The result of this query is the following:

Note that while performing the join, we wait 5 hours to be aligned before creating the output. This means that for every event in our main data source, we would wait for the event on 14/19/2019 05:00 to create the output. That is why the value column in the output has the value 4 for all of the events.

LATEST

When running a query over historical data, Upsolver maintains the time relation between streams the same way that it would when processing data that is up to date.

The LATEST keyword is intended to handle situations where initial data is dumped into a lookup table after the source stream started running. This forces the query to use the state of the joined lookup table that exists when it is run for all historical data. Data that arrived after the query was run will not be affected by LATEST.

We will demonstrate this using the following Transform with SQL query:

SELECT id,				
	   value,
	   Time
From "main data source"
LEFT JOIN LATEST
	(
	  SELECT id, LAST(value) value
	  FROM "lookup data source"
	  GROUP BY id
	) l WAIT 1 MINUTE 
						ON l.id = id 

The result of this query depends on the time you run it relative to the event's time.

If the query run time is less than the event’s creation time field (e.g. if you run the query on 13/10/2019 09:00), the result is:

If the query run time is greater than the event’s creation time, we relate to the query's run time instead of the event creation time field. For example, if you run the query on 15/10/2019 09:00, the result is:

Perform calculations and conversions

Transform with SQL enables you to improve the quality of your data and ensure it follows standard conventions (e.g. convert time in Epoch to standard mm/dd/yyyy).

For the following examples, we will assume that:

1. Three events stream into the data source heartbeat over time:

{ "user_id": 1, "device_id": 1234, "epoch" : 1520672112456, "heart_rate" : 81}
{ "user_id": 2, "device_id": 5567, "epoch" : 1520672112456, "heart_rate" : 79}
{ "user_id": 1, "device_id": 1234, "epoch" : 1520672113456, "heart_rate" : 102}

user_id

device_id

epoch

heart_rate

1

1234

1520672112456

81

2

5567

1520672112456

79

1

1234

1520672113456

102

2. Three events stream into data source location over time:

{"user_id": 1, "epoch" : 1573034761, "latitude" : 28.545926, "longitude" : 31.577451}
{"user_id": 2, "epoch" : 1573034761, "latitude" : 44.032321, "longitude" : 1.356295}
{"user_id": 1, "epoch" : 1573035761, "latitude" : 28.545926, "longitude" : 31.577451}

user_id

epoch

latitude

longitude

1

1573034761

28.545926

31.577451

2

1573034761

44.032321

1.356295

1

1573035761

28.545926

31.577451

We will use the built-in function UNIX_EPOCH_TO_DATE which converts unix epoch date to a date as follows:

The following query creates an output which is based on a stream transformation using UNIX_EPOCH_TO_DATA in the SELECT clause:

SELECT user_id, device_id, UNIX_EPOCH_TO_DATA(epoch) as date
FROM heartbeat

The result of the query will be the following output:

user_id

device_id

date

1

1234

"2018-03-10T08:55:12.456Z"

2

5567

"2018-03-10T08:55:12.456Z"

1

1234

"2018-03-10T08:55:13.456Z"

The following query creates an output which is based on a stream transformation using UNIX_EPOCH_TO_DATA in two steps:

1. Calculate the transformation:

SET human_readable_date = UNIX_EPOCH_TO_DATA(epoch);

2. Use the calculated transformation in the query:

SELECT user_id, device_id, human_readable_date
FROM heartbeat
PreviousMapping data to a desired schemaNextAggregate streaming data

Last updated 3 years ago

Was this helpful?

Upsolver contains all the functions that exist in SQL as , including special enrichment functions (e.g. IP2GEO, user agent parser), and you can also add your own (UDFs) in Python.

Built-in functions and UDFs can be applied on either flat or hierarchical data. See:

built-in functions
User Defined Functions
Query hierarchical data
WAIT
ALIGNED
LATEST
UNIX_EPOCH_TO_DATE examples