Jobs

This article describes the available jobs in Upsolver, explains the difference between synchronized and non-synchronized jobs, and illustrates how events are processed.

In SQLake, there are four types of jobs: ingestion, replication, transformation, and materialized views. Each of these jobs may be created as either synchronized or non-synchronized jobs.

  1. Ingestion jobs use the COPY FROM command to copy data from a supported source and insert the raw data into a staging table in your Amazon S3-based data lake, or directly into supported targets. These jobs automatically detect and infer the schema and, when writing to data lake tables, update the AWS Glue Data Catalog with column names, types, and partition information. Furthermore, you can apply transformations if your data requires alteration prior to loading into your target.

  2. Replication jobs are a type of ingestion job that enables you to replicate your change data capture (CDC) tables to multiple targets using a single entity. Multiple replication groups share the source data, yet are configured to operate independently.

  3. Transformation jobs use INSERT and MERGE statements to copy data into tables in your data lake, data warehouse, and other targets. You can transform, model, join, and aggregate data before inserting the results into the target system. Using a primary key, transformation jobs can insert, update, and delete rows automatically.

  4. Materialized views are a special type of job that represents a continuously updating table. A materialized view can only be joined with another table, either streaming or static.

Jobs enable you to ingest and transform your data. They can read from external sources and write the data into a table, or read from a table and write to another table or an external target.

Job commands

Understanding non-synchronized jobs

With non-synchronized jobs you can create pipelines where jobs are loosely dependent on each other. This means that each job in the pipeline will continue to run regardless of whether the job before or after it has encountered errors or has been delayed.

Non-sync jobs are recommended when you need to prepare and process data that will be written to a target system where your business logic transformations are to be performed, such as Snowflake or Amazon Redshift. For example, you can create a pipeline to ingest raw CSV or JSON objects, auto-detect the schema, correct missing values, and write the results to a data lake or data warehouse. In this example the transformations are stateless. The accuracy of transformations at each stage of the pipeline is not affected by the previous job’s output rate or whether it is falling behind or not.

Using non-sync jobs is simple.

  • Non-sync ingestion and transformation jobs are created using CREATE JOB statement

  • Non-sync materialized views are created using CREATE MATERIALIZED VIEW statement

  • Non-sync transformation jobs MUST include a WHERE clause of time_filter()

The following diagram illustrates how non-sync jobs read and process events over time:

In the diagram above, events are read in from the source by the ingestion job, batched to a 1-minute interval, and written to storage. Then the transform job picks up the events, processes them, and writes them to the output (data lake, data warehouse, etc.). In this scenario, several events come in at minute 1, they are batched and written to Amazon S3.

A task is executed at minute 2 by the transformation job to process those events and write them to the data lake. At minute 5 in the diagram, we see a spike in the number of incoming events, which take longer to read and write to Amazon S3.

However, the transform job is unaware of this delay and continues to execute every minute. Two minutes later, the ingestion job finishes writing the data, which becomes available for the transformation job to process.

As explained earlier, if the processing is stateless, this is not an issue, the transformation job can pick up data whenever it's available. However, if the processing is stateful, performing aggregation or a join, you run the risk of operating against incomplete data. This is when sync jobs are useful.

Understanding synchronized jobs

With synchronized jobs, you can create pipelines where jobs are strongly dependent on each other. This means that each job in the pipeline is synchronized with the previous job on a common $event_time timestamp column. This ensures that jobs don’t go out of sync, and if a job in the pipeline falls behind, the other jobs further in the pipeline will pause to allow it to catch up.

The following diagram illustrates how a sync jobs read and process events over time.

Unlike non-sync jobs, sync jobs keep the reading and processing of events in lockstep. After the ingestion job finishes writing the current batch of events to storage, the transformation job picks those events and processes them. Using the $event_time timestamp, the transformation job knows which events it should be processing and does not skip ahead.

Looking at minute 5 again, the ingest job takes longer than usual to write to storage. Because both jobs are synchronized, the transform job waits until it sees events with the $event_time of 00:05:00 before it continues to process.

Let's take an example to better illustrate this behavior. Assume you are ingesting two Kafka topics, user visits to your signup page and actual signups to your product. Events between the two topics may arrive at different rates but usually within a few seconds of each other. To analyze the data, you create two ingestion jobs that read from the topics and stage them in your data lake as two separate tables.

Next, you create a materialized view that counts how many times a particular user visits the signup page. Then you create a transformation job that joins the "clicks" table with the "views per user" table, maybe aggregates a few values, and writes the results as a new table to the data lake. The following diagram illustrates this scenario:

With all three jobs and the materialized view defined as synchronous, data will be ingested, joined, and aggregated in lockstep. If the ingestion job reading the signup "click" event stream encounters errors due to an application issue upstream, but page visit events continue to flow in, SQLake will automatically pause the materialized view and transformation job until both source tables have data for the time range being processed. After they are in sync, the transformation job resumes and the rows are processed.

Sync jobs are recommended for the majority of use cases and data pipelines, in particular when you expect stateful processing to take place at some point in the pipeline. You use sync jobs when you join and aggregate data from two or more sources. Sync jobs are also recommended if you expect other pipelines to depend on the output of these jobs and therefore would want them to be synchronized.

You know you need sync jobs when your transformations include JOINs and GROUP BYs - i.e. you are joining and aggregating the data.

Using sync jobs is simple. The syntax is similar to non-sync jobs with two noticeable differences:

  • Sync ingestion, transformation, and materialized view jobs MUST include the SYNC keyword in CREATE SYNC JOB and CREATE SYNC MATERIALIZED VIEW statements

  • Sync transformation jobs MUST include a WHERE clause such as time_filter()

Understanding SQLake timeline properties

SQLake processes events on a timeline. The timestamps on this timeline are derived differently based on whether your job is synchronized or not. These timestamps are automatically appended to your data as a system column denoted with a dollar sign $. SQLake maintains additional system columns to aid in understanding your data better, columns like $file_name and $file_size that tell you which file the particular row is stored and how large is that file.

There are three types of timestamps:

Column nameNon-sync jobSync job

$source_time

A monotonically increasing time representing when the data was generated.

A monotonically increasing time representing when the data was generated.

$commit_time

Time the data was committed to the table by SQLake.

Time the data was committed to the table by SQLake.

$event_time

For transformation jobs and materialized views, $event_time is set to the value of $commit_time.

For ingestion jobs, $event_time is set to $source_time. For transformation jobs and materialized views, $event_time is set to the $event_time column value of the source table it is reading from.

Combining sync and non-sync jobs

It is highly recommended to create pipelines that comprise either sync or non-sync jobs. However, there are use cases where mixing job types is beneficial.

For example, when a non-sync job “B” is added to a sync job “A”, the sync job “A” sets the $event_time to its $commit_time. This means that the timestamp used to synchronize between job “A” and job “B” is set to a future time ($commit_time is when the event was written by SQLake) compared to the time range ($event_time is when the event was generated by the source) being processed. When sync job “C” is created to read the output generated by job “B”, it will not wait for data to synchronize and process every new event. If job “B” falls behind, job “C” will continue to process and not wait.

It is important to remember this behavior because there may be upstream jobs that depend on synchronous behavior, for example when joining and aggregating events, that could break if a non-sync job is accidentally inserted into a pipeline of sync jobs.

Last updated