LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
Quickstarts
Quickstarts
  • Quickstarts
  • DATA INGESTION WIZARD
    • Using the Wizard
      • Source Set-up
        • Amazon Kinesis
        • Amazon S3
        • Apache Kafka
        • Confluent Cloud
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • PostgreSQL
      • Target Set-up
        • Amazon Redshift
        • AWS Glue Data Catalog
        • ClickHouse
        • Polaris Catalog
        • Snowflake
      • Job Configuration
        • Job Configuration
        • Job Configuration for CDC
      • Review and Run Job
  • CONNECTORS
    • Connectors
      • Amazon Kinesis
      • Amazon Redshift
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • ClickHouse
      • Confluent Cloud
      • Elasticsearch
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • Polaris Catalog
      • PostgreSQL
      • Snowflake
  • JOBS
    • Ingestion
      • Job Basics
        • Ingest to a Staging Table
        • Output to a Target Table
      • Stream and File Sources
        • Amazon Kinesis
        • Amazon S3
        • Apache Kafka
        • Confluent Kafka
      • CDC Sources
        • Microsoft SQL Server
        • MongoDB
        • MySQL
        • PostgreSQL
    • Transformation
      • Updating Data
        • Upsert Data to a Target Table
        • Delete Data from a Target Table
        • Aggregate and Output Data
        • Join Two Data Streams
      • Data Targets
        • Output to Amazon Athena
        • Output to Amazon Redshift
        • Output to Amazon S3
        • Output to Elasticsearch
        • Output to Snowflake
  • APACHE ICEBERG
    • Optimize Your Iceberg Tables
    • Install the Iceberg Table Analyzer
Powered by GitBook
On this page
  1. JOBS
  2. Transformation
  3. Updating Data

Aggregate and Output Data

This quickstart article shows you how to aggregate data when transforming data in the staging zone.

Using familiar SQL syntax in your transformation job, you can aggregate the data in your staging table and load it into your target table.

Here's the code to create a job that loads aggregated data into your target table:

CREATE JOB group_testing
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    ADD_MISSING_COLUMNS = TRUE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_aggregated_data 
    MAP_COLUMNS_BY_NAME
       SELECT customer.address.city AS customer_address_city, 
          SUM(nettotal) AS total_revenue,
          COUNT(DISTINCT orderid) AS num_orders
       FROM default_glue_catalog.upsolver_samples.orders_raw_data
       WHERE $commit_time BETWEEN run_start_time() AND run_end_time()
       GROUP BY customer.address.city;

Let's understand what this code does.

In this example, you create a job named group_testing, and set the target table for your data as orders_aggregated_data. The job option START_FROM instructs Upsolver to include all historical data, and the RUN_INTERVAL option specifies that the job should execute every 1 minute. The option ADD_MISSING_COLUMNS ensures that all columns that are added to the source table in the future, are also added to the target table.

In the SELECT statement, we are aggregating our sample orders by customer.address.city, calculating the total revenue per city. Let's break it down:

  • SUM(nettotal) AS total_revenue: in this line, we sum the nettotal column and rename it to total_revenue. This provides us with the total revenue by city.

  • COUNT(DISTINCT orderid) AS num_orders: we then count each the number of orders by city into a new column named num_orders.

  • GROUP BY customer.address.city: this line groups our previous columns by customer.address.city, enabling us to view total revenue and the number of orders by city.

When the job is running and we have data in our target table, the final step is to query the data.

Here's the code for querying your data:

SELECT * 
FROM default_glue_catalog.upsolver_samples.orders_aggregated_data 
LIMIT 10;

Last updated 11 months ago