Upsolver Timeline
This article describes how Upsolver handles events in relation to time.
Last updated
This article describes how Upsolver handles events in relation to time.
Last updated
When working with your data in Upsolver and in order to ensure you achieve your intended results, it is important to understand how the data is handled, particularly in relation to time.
Data ingestion: How Upsolver parses data over time and how the event time is derived from your data
Data transformation: How Upsolver processes your data transformations over time and how to ensure you get the intended results when aggregating your data
After you create an ingestion job that writes to your staging table in Upsolver, for every minute between the times specified to START_FROM
and END_AT
, that job first runs an ingestion task to either discover the data (for cloud storage sources) or read the raw bytes (for streaming system sources). It then executes a task to parse the data that had arrived within that minute.
Note that the parsing task can be parallelized while the ingestion task can not.
Each block within the diagram above represents data from a single minute. For example, if START_FROM
is set to timestamp '2022-08-04 11:00:00.000',
then block 1 would be data from 2022-08-04 11:00
to 2022-08-04 11:01
.
If you check the tasks related to your job within the task_executions
table, you can also see which minute of data a specific task is processing based on the task_start_time
and task_end
time. Following our earlier example, a task that worked on block 1 would have a task_start_time
of 2022-08-04 11:00:00.000
and a task_end_time
of 2022-08-04 11:01:00.000
.
The COPY FROM
job continues to ingest the data from each block until it reaches the END_AT
time. If there is no END_AT
time configured for the job, then data ingestion continues indefinitely as long as new events are arriving.
As your data is being written to the staging table, the time at which the data becomes visible within the table is stored as the $commit_time
.
There are also two system columns $event_date
and $event_timestamp
that are added to the table based on the original event time derived from the source.
When copying from a storage-type source (e.g. Amazon S3), if the DATE_PATTERN
job option is configured, then the date pattern in the file is used. Otherwise, the event time is determined based on the file's modified time.
Note that the events being read by any given task are based on the event time falling within that task's particular task_start_time
and task_end_time
window.
In the case of a streaming-type source (e.g. Kafka or Kinesis), the processing timestamp from the source is used as long as it is after the previous timestamps in the stream and before the current time.
If the event's timestamp is after the current time, it is not ingested until that time arrives, when it gets processed by a task with a task_start_time
and task_end_time
window containing that event's time. If the timestamp happens to be before a previous timestamp in the stream, then either that previous timestamp or the task_start_time
is used, depending on whichever is later.
Transformation jobs operate similarly to data ingestion jobs, with some added flexibility to the job's run time window.
By default, the job option RUN_INTERVAL
for transformation jobs is set to 1 minute, meaning that just like ingestion jobs, tasks are run every minute to process and write the data from between that minute.
However, if your use case does not require data to be written to the target location so frequently, you can set RUN_INTERVAL
to a longer time period (e.g. RUN_INTERVAL
could be 5 MINUTES
, 1 HOUR
, 7 DAYS
, etc.).
Note that for any RUN_INTERVAL
that's less than a day, the given value should evenly divide 1 day (24 hours). For example, you can set RUN_INTERVAL
to 2 hours (the job runs 12 times per day), but trying to set RUN_INTERVAL
to 5 hours would fail since 24 hours is not evenly divisible by 5.
Going back to this diagram, if you set RUN_INTERVAL = 5 MINUTES,
then each block here now represents data from a 5 minute period.
Say you created the job with START_FROM = NOW
and the current time is 2022-08-05 11:03
; in order to align with the RUN_INTERVAL
of 5 minutes, the task for block 1 processes data from 2022-08-05 11:03
up until 2022-08-05 11:05
.
Similarly, if you set END_AT = NOW
and the current time is 2022-08-05 11:03
, then the task for block n processes data from 2022-08-05 10:55
up until 2022-08-05 11:00
.
If there is no END_AT
time configured for the job, however, then your data continues to be processed indefinitely as long as there are new events being written to the staging table.
As you can see, the RUN_INTERVAL
is essentially what determines the run_start_time()
and run_end_time()
seen within the required filter WHERE $commit_time BETWEEN run_start_time() AND run_end_time()
.
Then, based on the WHERE
filter, each task only processes the data that was committed to the source table between the given time range. Understanding how the RUN_INTERVAL
and this WHERE
filter interacts are key to ensuring that you get the expected results when creating aggregate jobs.
When creating an aggregation, there are two options to define the time interval over which the data is aggregated:
Adjust the WHERE
filter to the desired time range by subtracting the proper time interval (e.g. WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('1d') AND run_end_time()
)
Adjust the RUN_INTERVAL
to the appropriate time interval (e.g. RUN_INTERVAL = 1 DAY
)
To better understand the difference between these two methods, let's walk through an example using both.
In this scenario, let's say that we would like to calculate the average order value (AOV) per country for the past 30 days.
With the first option, we adjust the period that is aggregated over by subtracting a time interval in the WHERE
filter:
Keeping RUN_INTERVAL
set to 1 minute means that this job executes a new task every minute, and the subtracted PARSE_DURATION('30d')
means that the last 30 days of data are not filtered out from this task.
If we look at what this means in a diagram, each numbered block above represents the RUN_INTERVAL
, or the time range of data that would be typically processed by a given task if we had not subtracted a time interval.
However, with the PARSE_DURATION('30d')
included, that means each task also processes data from the time range of one of the colored blocks. For example, subtracting 30 days means that the task for block 1 would be working with data from the minute represented by that block as well as data from block a.
Notice that block a itself represents 30 days and ends before block 1 starts; this is because we are subtracting the time interval from run_start_time()
instead of run_end_time()
. This also means that in reality, this job is actually executing a new task every minute that aggregates the data from the last 30 days and 1 minute.
Note that this doesn't mean that Upsolver rescans the entire window of data each time; the aggregation for every new block is incrementally simply rolled up with the previously aggregated values from the time window.
Let's say that we don't need the aggregated results to be written so frequently to our target location—just once every 30 days is enough. In that case, we can do this instead:
By setting RUN_INTERVAL = 30 DAYS
, the run_start_time()
and run_end_time()
for each task are automatically adjusted to be 30 days apart. Changing the RUN_INTERVAL
also means that a new task is only executed by this job every 30 days.
Within our diagram, this means that each block now represents 30 days of data. For example, if we set START_FROM = timestamp '2022-01-01 00:00:00'
, then the task processing block 1 would start from 2022-01-01
and end at 2022-01-30
. This also means that the next task for block 2 wouldn't start until 2022-01-31
.
Thus, the INSERT
job above would write the aggregated average order value for the last 30 days to the target table once every 30 days.
What if we wanted new results every 30 days but we also wanted to see the aggregated values for the last 90 days?
To do this, we can combine elements of both methods shown above by both adjusting the RUN_INTERVAL
and subtracting a time interval within the WHERE
filter.
By definition, RUN_INTERVAL
of 30 days means that the job writes new results for this aggregation every 30 days. Since the RUN_INTERVAL
also determines the time difference between run_start_time()
and run_end_time()
, in order to get the desired data for the last 90 days, we subtract an additional 60 days from run_start_time()
.
We can see this within the diagram where each of the numbered blocks represents a 30-day time period corresponding to the RUN_INTERVAL
and each of the colored blocks represents a 60-day time period corresponding to the subtracted time interval.
When the task for block 1
executes, it aggregates all the data from block a
in addition to the data from block 1
– a total of a 90-day period. Similarly, the task for block 2
will aggregate over the data from block b
and block 2
.
The data from each block is handled by multiple tasks that depend on each other before being written to your staging table. These tasks are the various stages denoted in the column of the table.
Note that run_start_time()
and run_end_time()
are equivalent to what you see for task_start_time
and task_end_time
respectively within the table.
stage_name