What is Upsolver?

Upsolver enables you to build reliable, maintainable, and testable processing pipelines on batch and streaming data. Define your processing pipelines using SQL in three simple steps:

  1. Create connections to data sources and targets.

  2. Ingest source data into a staging location in your data lake where you can inspect events, validate quality, and ensure data freshness.

  3. Transform and refine the data using the full power of SQL. Then insert, update, and delete it in your target system.

Upsolver automatically manages the orchestration of tasks, scales compute resources up and down, and optimizes the output data, so you can deliver high-quality, fresh, and reliable data.

With Upsolver you can:

  • Lower the barrier to entry by developing pipelines and transformations using familiar SQL.

  • Improve reusability and reliability by managing pipelines as code.

  • Integrate pipeline development, testing, and deployment with existing CI/CD tools using a CLI.

  • Eliminate complex scheduling and orchestration with always-on, automated data pipelines.

  • Improve query performance with automated data lake management and optimization.

Here's an example of how to create a data pipeline in five simple steps:

/*
    1. Create an Amazon S3 connection to manage IAM credentials for jobs that 
        need to access data in Amazon S3
*/
CREATE S3 CONNECTION upsolver_s3_samples
    AWS_ROLE = 'arn:aws:iam::949275490180:role/upsolver_samples_role'
    EXTERNAL_ID = 'SAMPLES'
    READ_ONLY = TRUE;

/*
    2. Create an empty table to use as staging for the raw data.
*/
CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data;

/*
    3. Ingest raw data from your bucket into the staging table
*/
CREATE JOB load_orders_raw_data_from_s3
    CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples BUCKET = 'upsolver-samples' PREFIX = 'orders/' 
    INTO default_glue_catalog.upsolver_samples.orders_raw_data; 

/*
    4. Create an output table in the AWS Glue Data Catalog. The table will be accessible 
        in Athena once data has been added.
        Note: We've defined partition_date as a partition column of type date. 
        This column doesn't exist in the source table and will be created in the 
        next step.
*/
CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data(
    partition_date date
    )
    PARTITION BY partition_date;

/*
    5. Create a job to read from the staging table, apply business logic 
        transformations and insert the results into the output table.
        Note: It may take 3-4 minutes for the data to appear in your output table.
*/
CREATE JOB transform_orders_and_insert_into_athena
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = true	
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
    -- Use the SELECT statement to choose columns from the source and implement your business logic transformations.
    SELECT 
      orderid AS order_id, -- rename columns
      MD5(customer_email) AS customer_id, -- hash or mask columns using built-in functions
      customer_name,  -- computed field defined later in the query
      nettotal AS total, 
      $commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically cast to DATE type
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    LET customer_name = customer_firstname || ' ' || customer_lastname -- create a computed column
    WHERE ordertype = 'SHIPPING' 
    AND $commit_time BETWEEN run_start_time() AND run_end_time();

It's that simple. After the jobs are executed they run until stopped. There is no need to schedule or orchestrate them, and the compute cluster scales up and down automatically. This greatly simplifies the deployment and management of your pipelines.

Last updated