Pipeline Basics

Read this guide to understand how pipelines work in Upsolver.

Real-time ingestion and analytics in the data lake

Most organizations manage data that is continuously updated in real-time, such as the collection of clickstream events from websites to understand user interaction and improve personalization. This is called streaming data.

But companies also process and analyze data in large batches -- for example, the process of enriching user data with third-party information. This is batch data.

Both batch and streaming are integral to a company's data architecture. In this section, we illustrate how to implement both streaming and batch data analytics in the Upsolver data lake.

Below is a simple diagram that shows a high-level architecture of a data pipeline you can use to implement data analytics:

How does Upsolver merge streaming and batch processing?

Upsolver enables you to ingest both streaming and batch data with just one tool, using only familiar SQL syntax. Let's zoom in to understand how Upsolver manages data.

Upsolver enables you to define connectors. Connectors configure the authentication parameters that in turn enable Upsolver to make connections and read and write data to your systems. As data flows in from sources (such as events in a Kafka topic or files on an object store), Upsolver stores it as an append-only log that is persisted to an object store. It then represents the data in tables with schemas that can either be auto-detected or manually configured. From these tables, you can select data to transform, join, or aggregate in ways that enable you to build unique datasets for your businesses to use. You can store the results in a data lake, data warehouse, or any other data target, from where you can easily analyze the data and take action.

Now, let's look at the core components of Upsolver:

Connectors

Upsolver connectors are containers that hold connectivity information. If you create multiple data pipelines that read and write to Amazon S3, you only need to create one Amazon S3 connector with the appropriate permissions and reuse it in your pipelines. This simplifies connection management.

For example:

CREATE S3 CONNECTION raw_s3_zone
	AWS_ROLE = 'arn:aws:iam::111111111111:role/raw_zone_role';
-- Here, you create an S3 connection titled "raw_s3_zone".
-- Create a connection to the data target on the Snowflake data warehouse
CREATE SNOWFLAKE CONNECTION prod_snowflake_connection
	CONNECTION_STRING = 'jdbc:snowflake://ACCOUNT_WITH_REGION.snowflakecomputing.com?db=DB_NAME&warehouse=WAREHOUSE_NAME&role=ROLE_NAME'
	USER_NAME = 'username'
	PASSWORD = 'password'
	MAX_CONCURRENT_CONNECTIONS = 10;
-- Here, you create a Snowflake connection titled "prod_snowflake_connection", 
-- with specified parameters for a username, password, and a maximum amount of 
-- concurrent connections.

Ingestion jobs

Upsolver ingestion jobs copy your source data into an append-only staging table, persisted in an object store, fully managed by Upsolver. The staging table represents the data in its original raw form. It enables you to build different pipelines, resulting in unique output. By storing the staging tables in an object store such as Amazon S3, you can make your data available to anyone who may need access to the raw data, such as for experimenting and training ML models.

Here's an example:

-- Create a job that ingests and stages the source data in the data lake
CREATE JOB ingest_raw_data
    COMPUTE_CLUSTER = 'ProductionCluster'
    CREATE_TABLE_IF_MISSING = TRUE
    STARTING_FROM = NOW
AS COPY FROM S3 'raw_s3_zone'
    BUCKET = 'company_data_lake'
    PREFIX = 'raw/sales_orders'
INTO catalog.prod_db.sales_orders_staging;

Transformation jobs

Upsolver transformation jobs read from staged tables and transform them according to business logic. You can then reference the staged tables in your pipeline transformations just as you would reference normal tables in a database. You can transform data by adding custom columns -- joining, aggregating, and operating on fields with user-defined Python functions. Upsolver writes the results to the target of your choice, typically a data lake or data warehouse. Upsolver then inserts, updates, and deletes records in the data lake and data warehouse based on the nature of the data.

For example:

-- Create a job to transform the staged data and load it into Snowflake
CREATE JOB transform_and_load
    COMPUTE_CLUSTER = 'ProductionCluster'
    EXECUTION_INTERVAL = 1 MINUTE
    STARTING_FROM = NOW
AS INSERT INTO SNOWFLAKE 'prod_snowflake_connection'
    TABLE_PATH = 'PROD'.'SALE_ORDERS_REPORT'
	SELECT
  	SUM(to_number(nettotal)) as sum_total_orders,
  	AVG(to_number(nettotal)) as avg_total_orders,
  	company_name
	FROM catalog.prod_db.sales_orders_staging
	WHERE ($time between execution_start_time() AND execution_end_time())
	GROUP BY company_name;

Benefits of Upsolver Pipelines

1. Always on

Upsolver pipelines are always on. One of the main benefits of a streaming-first design is that pipelines do not need external scheduling or orchestration. This reduces the complexity of deploying and maintaining pipelines. Instead, Upsolver infers the necessary transformations and task progression from the SQL you write. There are no directed acyclic graphs (DAGs) to create and maintain, and you don't need a third-party orchestration tool such as Dagster, Astronomer, or Apache Airflow.

2. Observability and data quality

If you can understand the source and output data -- its structure, schema, data types, value distribution, and whether key fields contain null values -- then you can deliver reliable, fresh, and consistent datasets. Upsolver job monitoring provides graphs and metrics that indicate status at a glance. Upsolver also exposes system tables that contain all the tasks executed at different stages of the pipeline, such as:

  • Reading from a source system

  • Writing to the staging table

  • Transforming the data

  • Maintaining it in the data lake.


Learn More

You can analyze the system tables to identify slowness, issues, and areas of optimization in your pipelines. See the Job Status section to learn how to use the built-in metrics to monitor and troubleshoot your jobs.

Last updated