Understanding Sync and Non-Sync Jobs

This article describes the available jobs in Upsolver, explains the difference between synchronized and non-synchronized jobs, and illustrates how events are processed.

Understanding non-synchronized jobs

With non-synchronized jobs you can create pipelines where jobs are loosely dependent on each other. This means that each job in the pipeline will continue to run regardless of whether the job before or after it has encountered errors or has been delayed.

Non-sync jobs are recommended when you need to prepare and process data that will be written to a target system where your business logic transformations are to be performed, such as Snowflake or Amazon Redshift. For example, you can create a pipeline to ingest raw CSV or JSON objects, auto-detect the schema, correct missing values, and write the results to a data lake or data warehouse. In this example the transformations are stateless. The accuracy of transformations at each stage of the pipeline is not affected by the previous job’s output rate or whether it is falling behind or not.

Using non-sync jobs is simple:

  • Non-sync transformation jobs MUST include a WHERE clause of $commit_time BETWEEN run_start_time() AND run_end_time()

The following diagram illustrates how non-sync jobs read and process events over time:

In the diagram above, events are read in from the source by the ingestion job, batched to a 1-minute interval, and written to storage. Then the transform job picks up the events, processes them, and writes them to the output (data lake, data warehouse, etc.). In this scenario, several events come in at minute 1, they are batched and written to Amazon S3.

A task is executed at minute 2 by the transformation job to process those events and write them to the data lake. At minute 5 in the diagram, we see a spike in the number of incoming events, which take longer to read and write to Amazon S3.

However, the transform job is unaware of this delay and continues to execute every minute. Two minutes later, the ingestion job finishes writing the data, which becomes available for the transformation job to process.

As explained earlier, if the processing is stateless, this is not an issue, the transformation job can pick up data whenever it's available. However, if the processing is stateful, performing aggregation or a join, you run the risk of operating against incomplete data. This is when sync jobs are useful.

Understanding synchronized jobs

With synchronized jobs, you can create pipelines where jobs are strongly dependent on each other. This means that each job in the pipeline is synchronized with the previous job on a common $event_time timestamp column. This ensures that jobs don’t go out of sync, and if a job in the pipeline falls behind, the other jobs further in the pipeline will pause to allow it to catch up.

The following diagram illustrates how a sync jobs read and process events over time.

Unlike non-sync jobs, sync jobs keep the reading and processing of events in lockstep. After the ingestion job finishes writing the current batch of events to storage, the transformation job picks those events and processes them. Using the $event_time timestamp, the transformation job knows which events it should be processing and does not skip ahead.

Looking at minute 5 again, the ingest job takes longer than usual to write to storage. Because both jobs are synchronized, the transform job waits until it sees events with the $event_time of 00:05:00 before it continues to process.

Let's take an example to better illustrate this behavior. Assume you are ingesting two Kafka topics, user visits to your signup page and actual signups to your product. Events between the two topics may arrive at different rates but usually within a few seconds of each other. To analyze the data, you create two ingestion jobs that read from the topics and stage them in your data lake as two separate tables.

Next, you create a materialized view that counts how many times a particular user visits the signup page. Then you create a transformation job that joins the clicks table with the views per user table, maybe aggregates a few values, and writes the results as a new table to the data lake. The following diagram illustrates this scenario:

With all three jobs and the materialized view defined as synchronous, data will be ingested, joined, and aggregated in lockstep. If the ingestion job reading the signup click event stream encounters errors due to an application issue upstream, but page visit events continue to flow in, Upsolver will automatically pause the materialized view and transformation job until both source tables have data for the time range being processed. After they are in sync, the transformation job resumes and the rows are processed.

Sync jobs are recommended for the majority of use cases and data pipelines, in particular when you expect stateful processing to take place at some point in the pipeline. You use sync jobs when you join and aggregate data from two or more sources. Sync jobs are also recommended if you expect other pipelines to depend on the output of these jobs and therefore would want them to be synchronized.

You know you need sync jobs when your transformations include JOINs and GROUP BYs - i.e. you are joining and aggregating the data.

Using sync jobs is simple. The syntax is similar to non-sync jobs with two noticeable differences:

  • Sync ingestion, transformation, and materialized view jobs MUST include the SYNC keyword in CREATE SYNC JOB and CREATE SYNC MATERIALIZED VIEW statements

  • Sync transformation jobs MUST include a WHERE clause such as $event_time BETWEEN run_start_time() AND run_end_time()

Understanding Upsolver timeline properties

Upsolver processes events on a timeline. The timestamps on this timeline are derived differently based on whether your job is synchronized or not. These timestamps are automatically appended to your data as a system column denoted with a dollar sign $. Upsolver maintains additional system columns to aid in understanding your data better, columns like $file_name and $file_size that tell you which file the particular row is stored and how large is that file.

There are three types of timestamps:

Column nameNon-sync jobSync job

$source_time

A monotonically increasing time representing when the data was generated.

A monotonically increasing time representing when the data was generated.

$commit_time

Time the data was committed to the table by Upsolver.

Time the data was committed to the table by Upsolver.

$event_time

For transformation jobs and materialized views, $event_time is set to the value of $commit_time.

For ingestion jobs, $event_time is set to $source_time. For transformation jobs and materialized views, $event_time is set to the $event_time column value of the source table it is reading from.

Combining sync and non-sync jobs

It is highly recommended to create pipelines that comprise either sync or non-sync jobs. However, there are use cases where mixing job types is beneficial.

For example, when a non-sync job “B” is added to a sync job “A”, the sync job “A” sets the $event_time to its $commit_time. This means that the timestamp used to synchronize between job “A” and job “B” is set to a future time ($commit_time is when the event was written by Upsolver) compared to the time range ($event_time is when the event was generated by the source) being processed. When sync job “C” is created to read the output generated by job “B”, it will not wait for data to synchronize and process every new event. If job “B” falls behind, job “C” will continue to process and not wait.

It is important to remember this behavior because there may be upstream jobs that depend on synchronous behavior, for example when joining and aggregating events, that could break if a non-sync job is accidentally inserted into a pipeline of sync jobs.

Last updated