Upsolver SQLake

SQL jobs

In SQLake, there are three kinds of jobs: ingestion, transformation, and materialized views. Each of these jobs may be created as either synchronized or non-synchronized jobs.
Ingestion jobs COPY FROM a supported source and insert the raw data into a staging table in your Amazon S3 based data lake. These jobs automatically detect and infer the schema and update the AWS Glue Data Catalog with column names, types and partition information.
Transformation jobs INSERT and MERGE data into tables in your data lake, data warehouse and other targets. Users can transform, model, join and aggregate data before inserting the results into their target system. Using a primary key, transformation jobs can insert, update and delete rows automatically.
Materialized views are a special type of job, one that represents a continuously updating table. Today, an MV can only be joined with another table, streaming or static.
Jobs allow you to move and transform your data. They can read from external sources and write the data into a table, or read from a table and write to another table or an external target.

SQL job commands

Understanding non-synchronized jobs

With non-synchronized jobs you are able to create pipelines where jobs are loosely dependent on each other. This means that each job in the pipeline will continue to run regardless of if the job before or after it has encountered errors or has been delayed.
Non-sync jobs are recommended when you need to prepare and process data that will be written to a target system where your business logic transformations are to be performed, like Snowflake or Redshift. For example, creating a pipeline to ingest raw CSV or JSON objects, auto-detect the schema, correct missing values and write the results to a data lake or data warehouse. In this example the transformations are stateless. The accuracy of transformations at each stage of the pipeline is not affected by the previous job’s output rate or whether it is falling behind or not.
Using non-sync jobs is simple.
  • Non-sync ingestion and transformation jobs are created using CREATE JOB statement
  • Non-sync materialized views are created using CREATE MATERIALIZED VIEW statement
  • Non-sync transformation jobs MUST include a WHERE clause of $commit_time BETWEEN run_start_time() AND run_end_time()
The following diagram illustrates how non-sync jobs read and process events over time.
In the diagram above, events are read in from the source by the ingest job, batched to a 1 minute interval and written to storage. Then the transform job picks up the events, processes them and writes them to the output (data lake, data warehouse, etc.). In this scenario, several events come in at minute 1, they are batched and written to S3. A task is executed at minute 2 by the transformation job to process those events and write them to the data lake. At minute 5 in the diagram we see a spike in the number of incoming events which takes longer to read and write to S3. However, the transform job is unaware of this delay and continues to execute every minute. Two minutes later, the ingest job finishes to write the data which becomes available for the transform job to process. As explained earlier, if the processing is stateless this is not an issue, the transform job can pick up data whenever it's available. However if processing is stateful, performing aggregation or a join, you run the risk of operating against incomplete data. This is why we need sync jobs.

Understanding synchronized jobs

With synchronized jobs you are able to create pipelines where jobs are strongly dependent on each other. This means that each job in the pipeline is synchronized with the previous job on a common $event_time timestamp column. This ensures that jobs don’t go out of sync, and if a job in the pipeline falls behind, the other jobs further in the pipeline will pause to allow it to catch up.
The following diagram illustrates how sync jobs read and process events over time.
Unlike non-sync jobs, sync jobs keeps the reading and processing of events in lock step. Once the ingestion job finishes writing the current batch of events to storage, the transformation job picks those events and processes them. Using the $event_time timestamp, the transformation job knows which events it should be processing and does not skip ahead. Looking at minute 5 again, the ingest job takes longer than usual to write to storage. Because both jobs are synchronized, the transform job waits until it sees events with the $event_time of 00:05:00 before it continues to process.
Let's take an example to better illustrate this behavior. Assume you are ingesting two Kafka topics, user visits to your signup page and actual signups to your product. Events between the two topics may arrive at different rates but usually within a few seconds of each other. To analyze the data, you create two ingestion jobs that read from the topics and stage them in your data lake as two separate tables. Next you create a materialized view that counts how many times a particular user visits the signup page. Then you create a transformation job that joins the "clicks" table with the "views per user" table, maybe aggregates a few values and writes the results as a new table to the data lake. The following diagram illustrates this scenario:
With all three jobs and materialized view defined as synchronous, data will be ingested, joined and aggregated in lock step. If the ingestion job reading the signup "click" event stream encounters errors due to an application issue upstream, but page visit events continue to flow in, SQLake will automatically pause the materialized view and transformation job until both source tables have data for the time range being processed. Once they are in sync, the transformation job resumes and the rows are processed.
Sync jobs are recommended for the majority of use cases and data pipelines, in particular when you expect stateful processing to take place at some point in the pipeline. You use sync jobs when you join and aggregate data from two or more sources. Sync jobs are also recommended if you expect other pipelines to depend on the output of these jobs and therefore would want them to be synchronized. You know you need sync jobs when your transformations include JOINs and GROUP BYs - i.e. you are joining and aggregating the data.
Using sync jobs is simple. The syntax is similar to non-sync jobs with two noticeable differences.
  • Sync ingestion, transformation and materialized view jobs MUST include the SYNC keyword in CREATE SYNC JOB and CREATE SYNC MATERIALIZED VIEW statements
  • Sync transformation jobs MUST include a WHERE clause such as $event_time BETWEEN run_start_time() AND run_end_time()

Understanding SQLake timeline properties

SQLake processes events on a timeline. The timestamps on this timeline are derived differently based on whether your job is synchronized or not. These timestamps are automatically appended to your data as a system column denoted with a dollar sign $. SQLake maintains additional system columns to aid in understanding your data better, columns like $file_name and $file_size that tell you which file the particular row is stored and how large is that file.
There are three types of timestamps:
Column name
Non-sync job
Sync job
A monotonically increasing time representing when the data was generated.
A monotonically increasing time representing when the data was generated.
Time the data was committed to the table by SQLake.
Time the data was committed to the table by SQLake.
For transformation jobs and materialized views, $event_time is set to the value of $commit_time.
For ingestion jobs, $event_time is set to $source_time. For transformation jobs and materialized views, $event_time is set to the$event_time column value of the source table it is reading from.

Combining sync and non-sync jobs

It is highly recommended to create pipelines that comprise either sync or non-sync jobs. However, there are use cases where mixing job types is beneficial.
For example, when a non-sync job “B” is added to a sync job “A”, the sync job “A” sets the $event_time to its $commit_time. This means that the timestamp used to synchronize between job “A” and job “B” is set to a future time ($commit_time is when the event was written by SQLake) compared to the time range ($event_time is when the event was generated by the source) being processed. When sync job “C” is created to read the output generated by job “B”, it will not wait for data to synchronize and process every new event. If job “B” falls behind, job “C” will continue to process and not wait.
It’s important to remember this behavior because there may be upstream jobs that depend on synchronous behavior, for example when joining and aggregating events, that could break if a non-sync job is accidentally inserted into a pipeline of sync jobs.