SQLake pipeline basics

Read this guide to understand how pipelines work in Upsolver and how you can quickly and easily ingest your data.

Real-time ingestion and analytics in the data lake

Most organizations manage data that is continuously updated in real-time, such as the collection of clickstream events from websites to understand user interaction and improve personalization. This is called streaming data.

But companies also process and analyze data in large batches -- for example, the process of enriching user data with third-party information. This is batch data.

Both batch and streaming are integral to a company's data architecture. In this section, we illustrate how to implement both streaming and batch data analytics in the SQLake data lake.

Below is a simple diagram that shows a high-level architecture of a data pipeline you can use to implement data analytics:

How does Upsolver merge streaming and batch processing?

Upsolver SQLake enables you to ingest both streaming and batch data with just one tool, using only familiar SQL syntax. Let's zoom in to understand how Upsolver manages data.

SQLake enables you to define connectors. Connectors configure the authentication parameters that in turn enable Upsolver to make connections and read and write data to your systems. As data flows in from sources (such as events in a Kafka topic or files on an object store), SQLake stores it as an append-only log that is persisted to an object store. It then represents the data in tables with schemas that can either be auto-detected or manually configured. From these tables, you can select data to transform, join, or aggregate in ways that enable you to build unique datasets for your businesses to use. You can store the results in a data lake, data warehouse, or any other data target, from where you can easily analyze the data and take action.

Now, let's look at the core components of SQLake:

  • Connectors

  • Ingestion jobs

  • Transformation jobs

Connectors

SQLake connectors are containers that hold connectivity information. If you create multiple data pipelines that read and write to Amazon S3, you only need to create one Amazon S3 connector with the appropriate permissions and reuse it in your pipelines. This simplifies connection management.

For example:

CREATE S3 CONNECTION raw_s3_zone
	AWS_ROLE = 'arn:aws:iam::111111111111:role/raw_zone_role';
-- Here, you create an S3 connection titled "raw_s3_zone".
-- Create a connection to the data target on Snowflake data warehouse
CREATE SNOWFLAKE CONNECTION prod_snowflake_connection
	CONNECTION_STRING = 'jdbc:snowflake://ACCOUNT_WITH_REGION.snowflakecomputing.com?db=DB_NAME&warehouse=WAREHOUSE_NAME&role=ROLE_NAME'
	USER_NAME = 'username'
	PASSWORD = 'password'
	MAX_CONCURRENT_CONNECTIONS = 10;
-- Here, you create a Snowflake connection titled "prod_snowflake_connection", 
-- with specified parameters for a username, password, and a maximum amount of 
-- concurrent connections.

Ingestion jobs

SQLake ingestion jobs copy your source data into an append-only staging table, persisted in an object store, fully managed by Upsolver. The staging table represents the data in its original raw form. It enables you to build different pipelines, resulting in unique output. By storing the staging tables in an object store such as Amazon S3, you can make your data available to anyone who may need access to the raw data, such as for experimenting and training ML models.

Here's an example:

-- Create a job that ingests and stages the source data in the data lake
CREATE JOB ingest_raw_data
	COMPUTE_CLUSTER = 'ProductionCluster'
	CREATE_TABLE_IF_MISSING = TRUE
	STARTING_FROM = NOW
AS COPY FROM S3 'raw_s3_zone'
	BUCKET = 'company_data_lake'
	PREFIX = 'raw/sales_orders'
INTO catalog.prod_db.sales_orders_staging;

Transformation jobs

SQLake transformation jobs read from staged tables and transform them according to business logic. You can then reference the staged tables in your pipeline transformations just as you would reference normal tables in a database. You can transform data by adding custom columns -- joining, aggregating, and operating on fields with user-defined Python functions. Upsolver writes the results to the target of your choice, typically a data lake or data warehouse. SQLake then inserts, updates, and deletes records in the data lake and data warehouse based on the nature of the data.

For example:

-- Create a job to transform the staged data and load it into Snowflake
CREATE JOB transform_and_load
	COMPUTE_CLUSTER = 'ProductionCluster'
	EXECUTION_INTERVAL = 1 MINUTE
	STARTING_FROM = NOW
AS INSERT INTO SNOWFLAKE 'prod_snowflake_connection'
	TABLE_PATH = 'PROD'.'SALE_ORDERS_REPORT'
	SELECT
  	SUM(to_number(nettotal)) as sum_total_orders,
  	AVG(to_number(nettotal)) as avg_total_orders,
  	company_name
	FROM catalog.prod_db.sales_orders_staging
	WHERE ($time between execution_start_time() AND execution_end_time())
	GROUP BY company_name;

Benefits of SQLake Pipelines

1. Always on

SQLake pipelines are always on. One of the main benefits of a streaming-first design is that pipelines do not need external scheduling or orchestration. This reduces the complexity of deploying and maintaining pipelines. Instead, Upsolver infers the necessary transformations and task progression from the SQL you write. There are no directed acyclic graphs (DAGs) to create and maintain, and you don't need a third-party orchestration tool such as Dagster, Astronomer, or Apache Airflow.

2. Observability and data quality

If you can understand the source and output data -- its structure, schema, data types, value distribution, and whether key fields contain null values -- then you can deliver reliable, fresh, and consistent datasets. SQLake job monitoring provides graphs and metrics that indicate status at a glance. SQLake also exposes system tables that contain all the tasks executed at different stages of the pipeline, such as:

  • Reading from a source system

  • Writing to the staging table

  • Transforming the data

  • Maintaining it in the data lake.

You can analyze the system tables to identify slowness, issues, and areas of optimization in your pipelines. See the Job status guide for more information.

Last updated