Upsolver Timeline

This article describes how Upsolver handles events in relation to time.

When working with your data in Upsolver and in order to ensure you achieve your intended results, it is important to understand how the data is handled, particularly in relation to time.

Data ingestion: How Upsolver parses data over time and how the event time is derived from your data

Data transformation: How Upsolver processes your data transformations over time and how to ensure you get the intended results when aggregating your data

Data ingestion

After you create an ingestion job that writes to your staging table in Upsolver, for every minute between the times specified to START_FROM and END_AT, that job first runs an ingestion task to either discover the data (for cloud storage sources) or read the raw bytes (for streaming system sources). It then executes a task to parse the data that had arrived within that minute.

Note that the parsing task can be parallelized while the ingestion task can not.

Each block within the diagram above represents data from a single minute. For example, if START_FROM is set to timestamp '2022-08-04 11:00:00.000', then block 1 would be data from 2022-08-04 11:00 to 2022-08-04 11:01.

If you check the tasks related to your job within the task_executions table, you can also see which minute of data a specific task is processing based on the task_start_time and task_end time. Following our earlier example, a task that worked on block 1 would have a task_start_time of 2022-08-04 11:00:00.000 and a task_end_time of 2022-08-04 11:01:00.000.

The COPY FROM job continues to ingest the data from each block until it reaches the END_AT time. If there is no END_AT time configured for the job, then data ingestion continues indefinitely as long as new events are arriving.

Commit time and event time

As your data is being written to the staging table, the time at which the data becomes visible within the table is stored as the $commit_time.

There are also two system columns $event_date and $event_timestamp that are added to the table based on the original event time derived from the source.

When copying from a storage-type source (e.g. Amazon S3), if the DATE_PATTERN job option is configured, then the date pattern in the file is used. Otherwise, the event time is determined based on the file's modified time.

Note that the events being read by any given task are based on the event time falling within that task's particular task_start_time and task_end_time window.

In the case of a streaming-type source (e.g. Kafka or Kinesis), the processing timestamp from the source is used as long as it is after the previous timestamps in the stream and before the current time.

If the event's timestamp is after the current time, it is not ingested until that time arrives, when it gets processed by a task with a task_start_time and task_end_time window containing that event's time. If the timestamp happens to be before a previous timestamp in the stream, then either that previous timestamp or the task_start_time is used, depending on whichever is later.

Data transformation

Transformation jobs operate similarly to data ingestion jobs, with some added flexibility to the job's run time window.

By default, the job option RUN_INTERVAL for transformation jobs is set to 1 minute, meaning that just like ingestion jobs, tasks are run every minute to process and write the data from between that minute.

However, if your use case does not require data to be written to the target location so frequently, you can set RUN_INTERVAL to a longer time period (e.g. RUN_INTERVAL could be 5 MINUTES, 1 HOUR, 7 DAYS, etc.).

Note that for any RUN_INTERVAL that's less than a day, the given value should evenly divide 1 day (24 hours). For example, you can set RUN_INTERVAL to 2 hours (the job runs 12 times per day), but trying to set RUN_INTERVAL to 5 hours would fail since 24 hours is not evenly divisible by 5.

Going back to this diagram, if you set RUN_INTERVAL = 5 MINUTES, then each block here now represents data from a 5 minute period.

Say you created the job with START_FROM = NOW and the current time is 2022-08-05 11:03; in order to align with the RUN_INTERVAL of 5 minutes, the task for block 1 processes data from 2022-08-05 11:03 up until 2022-08-05 11:05.

Similarly, if you set END_AT = NOW and the current time is 2022-08-05 11:03, then the task for block n processes data from 2022-08-05 10:55 up until 2022-08-05 11:00.

If there is no END_AT time configured for the job, however, then your data continues to be processed indefinitely as long as there are new events being written to the staging table.

As you can see, the RUN_INTERVAL is essentially what determines the run_start_time() and run_end_time() seen within the required filter WHERE $commit_time BETWEEN run_start_time() AND run_end_time().

Then, based on the WHERE filter, each task only processes the data that was committed to the source table between the given time range. Understanding how the RUN_INTERVAL and this WHERE filter interacts are key to ensuring that you get the expected results when creating aggregate jobs.

Aggregating data

When creating an aggregation, there are two options to define the time interval over which the data is aggregated:

  1. Adjust the WHERE filter to the desired time range by subtracting the proper time interval (e.g. WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('1d') AND run_end_time())

  2. Adjust the RUN_INTERVAL to the appropriate time interval (e.g. RUN_INTERVAL = 1 DAY)

To better understand the difference between these two methods, let's walk through an example using both.

In this scenario, let's say that we would like to calculate the average order value (AOV) per country for the past 30 days.

Case 1 — Sliding window

With the first option, we adjust the period that is aggregated over by subtracting a time interval in the WHERE filter:

    RUN_INTERVAL = 1 MINUTE -- this is also the default
AS INSERT INTO default_glue_catalog.orders.monthly_aov 
    SELECT country,
           avg(nettotal) as avg_order_value
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('30d') 
           AND run_end_time()
    GROUP BY country;

Keeping RUN_INTERVAL set to 1 minute means that this job executes a new task every minute, and the subtracted PARSE_DURATION('30d') means that the last 30 days of data are not filtered out from this task.

If we look at what this means in a diagram, each numbered block above represents the RUN_INTERVAL, or the time range of data that would be typically processed by a given task if we had not subtracted a time interval.

However, with the PARSE_DURATION('30d') included, that means each task also processes data from the time range of one of the colored blocks. For example, subtracting 30 days means that the task for block 1 would be working with data from the minute represented by that block as well as data from block a.

Notice that block a itself represents 30 days and ends before block 1 starts; this is because we are subtracting the time interval from run_start_time() instead of run_end_time(). This also means that in reality, this job is actually executing a new task every minute that aggregates the data from the last 30 days and 1 minute.

Note that this doesn't mean that Upsolver rescans the entire window of data each time; the aggregation for every new block is incrementally simply rolled up with the previously aggregated values from the time window.

Case 2 — Tumbling window

Let's say that we don't need the aggregated results to be written so frequently to our target location—just once every 30 days is enough. In that case, we can do this instead:

AS INSERT INTO default_glue_catalog.orders.monthly_aov 
    SELECT country,
           avg(nettotal) as avg_order_value
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE $commit_time BETWEEN run_start_time() AND run_end_time()
    GROUP BY country;

By setting RUN_INTERVAL = 30 DAYS, the run_start_time() and run_end_time() for each task are automatically adjusted to be 30 days apart. Changing the RUN_INTERVAL also means that a new task is only executed by this job every 30 days.

Within our diagram, this means that each block now represents 30 days of data. For example, if we set START_FROM = timestamp '2022-01-01 00:00:00', then the task processing block 1 would start from 2022-01-01 and end at 2022-01-30. This also means that the next task for block 2 wouldn't start until 2022-01-31.

Thus, the INSERT job above would write the aggregated average order value for the last 30 days to the target table once every 30 days.

Case 3

What if we wanted new results every 30 days but we also wanted to see the aggregated values for the last 90 days?

To do this, we can combine elements of both methods shown above by both adjusting the RUN_INTERVAL and subtracting a time interval within the WHERE filter.

AS INSERT INTO default_glue_catalog.orders.monthly_aov 
    SELECT country,
           avg(nettotal) as avg_order_value
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('60d') 
           AND run_end_time()
    GROUP BY country;

By definition, RUN_INTERVAL of 30 days means that the job writes new results for this aggregation every 30 days. Since the RUN_INTERVAL also determines the time difference between run_start_time() and run_end_time(), in order to get the desired data for the last 90 days, we subtract an additional 60 days from run_start_time().

We can see this within the diagram where each of the numbered blocks represents a 30-day time period corresponding to the RUN_INTERVAL and each of the colored blocks represents a 60-day time period corresponding to the subtracted time interval.

When the task for block 1 executes, it aggregates all the data from block a in addition to the data from block 1 – a total of a 90-day period. Similarly, the task for block 2 will aggregate over the data from block b and block 2.

Last updated