Delays In Upsolver pipelines
Upsolver is a sequential pipeline platform; this means that data is processed minute by minute by default and each task is dependent upon the successful completion of previous tasks in the pipeline.
The general flow of a given pipeline is as follows:
Between the data source and the output, you can also have an Upsolver output, lookup table, or even both. Each of these steps (data source, Upsolver output, lookup table, output) is considered an operation.
An Upsolver output acts as both a data source and an output and can be found listed within both the Data Sources and Outputs pages.
A lookup table is a key-value store for faster lookups by key (e.g. use a lookup table to get all users who clicked on a specific ad in a specific timeframe) and is created with at least one key column and at least one aggregation.
In lookup tables, aggregation is performed in the following steps:
- 1.Aggregation load — Aggregates all data within each minute.
- 2.Aggregation — Rolls up the aggregated data from the previous step to the point where it can be loaded into memory. Data that has not yet been loaded into memory is considered delayed.
- 3.Aggregation compaction — Finishes rolling up the data to the required level of aggregation.
Aggregation within outputs is performed by executing a similar set of steps:
- 1.Aggregation load — Performs the aggregation on each minute of data. The
GROUP BYclauses are run at this stage.
- 2.Aggregation — Rolls up the data from the previous step up to the level of aggregation required.
- 3.Processing — Outputs the aggregated data to the target file. The
SELECTclauses are processed at this stage.
- 4.Writing — Writes the processed data to the target system. This step only occurs when writing to an external system that is not based in S3.
To search for any operation, you can press
command + shift + Kon Mac or
Control + shift + Kin Windows to open a general search bar, in which you can search by both the name and the ID.
The ID of any operation can be found within the URL of that operation's page.
At times you many encounter delays within your pipeline; this indicates that Upsolver is still processing data from a previous time period.
For example, if it's currently 12:00 PM and you see that your data has a delay of one hour. This means that the data from 11:00 AM is still processing.
When you come across a delay, you should first determine its origin by working backwards through your pipeline to see where the delay began.
For example, if you have a delay in the processing of your output and you see that there is also a delay in the data source, go to that data source page to check the cause there.
To easily check on an output's data source(s), click on Properties in the output page; under Data Sources within the General properties, you can click on any of the data source names to go to its corresponding page
Note that outputs with multiple data sources wait for all of them to progress, meaning that if one of the data sources is stuck or delayed, the output is also stuck or delayed.
If there is a similar delay in multiple operations running on the same cluster, the issue may also be that the cluster has insufficient resources.
After finding the source of the delay, the following tools and metrics can help you understand and troubleshoot its cause.
In each operation, there is an Errors tab under Monitoring you can check if there are errors there.
Occasional timeout errors can occur due to momentary cluster connection issues and can often be ignored. However, if the error is continuous, it should be checked even if it doesn't seem critical.
If the delay began in the data source's ingestion, the issue most likely stems from the system that the data source is reading from and we recommend that you check your connection status.
To see the connection info, go to the data sources' Properties:
You can also check on the connection by going to the connections page by clicking More > Connections.
If you see a connection with a warning triangle instead of a check mark, this means that there is an issue with the connection. You can hover over the warning sign to see the specific error message.
There is no concrete definition of what can be considered a "large amount of data" as it depends on the data itself, its transformation, and the size of your cluster.
However, if you see that the operation is still progressing, albeit rather slowly, you should consider adding shards to the operation.
For a streaming data source such as Kafka or Kinesis, the number of shards should be either equal to the number of partitions or a factor of the number of partitions. In other words, the number of partitions divided by the number of shards should equal a whole number, so each shard handles one or more whole partitions.
For outputs, it is recommended that the number of shards be either equal to or a factor of the number of shards in the related data source(s).
For example, if a data source has 8 shards, then we recommend that an output using that data source have either 8, 4, 2, or 1 shards.
If delays occur in multiple (or all) operations running on a specific cluster, then the issue most likely stems from the cluster itself.
You can check your cluster utilization under the Monitoring tab of any operation using the cluster in question.
Each annotation in the graph above represents an instance crashing, and the tasks running on that instance fail and restart with each crash. Note that the most common cause of crashing tends to be a lack of memory.
Tasks take longer to run and may fail more often when the CPU is too high (above 85%). If this is the case, you should add more instances or change the cluster type to one with more CPU units.
To do so, go the Clusters page and select the cluster in question. Under the Properties tab, go to the AWS section and click on the pencil icon on the right side of the row for Server Type; select the server type to use and click Update.
Additionally, you may want to check the scaling policy of the cluster. This policy defines when to add more instances while remaining in the range of Processing Units defined for your cluster (this range can be found under the same Properties tab under Upsolver).
The biggest variables in the scaling policy are the CPU and work backlog:
- Low cost — Looks exclusively at the work backlog, regardless of the CPU.
- Low latency — Scales up if the average CPU is over 80%, or if there is a work backlog.
- Consistent low latency — Scales up if the average CPU is over 60%, or if there is a work backlog.
- No scaling — Doesn't scale at all regardless of what the CPU or work backlog is.
If a scaling policy is updated but the cluster still doesn't scale, check the Processing Units range under the cluster properties. If the cluster has reached the range maximum, it cannot add any more instances.
To resolve this, you can click on the pencil icon to the right of the property and increase the range.
Here are some tools available to you that may be useful when troubleshooting.
Each operation's monitoring page contains a graph that shows the utilization of the cluster the operation is running on.
In the image above, the blue portion of the chart labelled Total represents the full cluster utilization. The purple Orders portion represents the approximate utilization by this specific operation (Orders).
Each operation's monitoring page also contains a graph that shows the cluster's memory consumption by instance.
In the image above, we can see each instance's memory consumption and when the instances changed. Each time the instances switch, all running tasks fail and restart on another instance; this may cause issues if the instances change too often (e.g. every few minutes).
As a short term fix, you can lock the cluster to avoid unnecessary changes. This is done within the Spot account under the cluster's group per instance. If the Spot account is managed by Upsolver, please contact support for assistance.
For a prolonged fix, you can use On Demand instances. This can be updated by going to the cluster's Properties; then click on the pencil icon next to On Demand Instances to update the property.
To avoid problems caused by high traffic or other availability zone issues, you can also go into your AWS account and add add additional availability zones. Note that once you've added these additional zones, you should send the subnet IDs to Upsolver Support so that we can make the appropriate updates.
Each operation's monitoring page also contains a graph that shows the operation's delays separated by task type.
Each operation's monitoring page also contains a graph that shows the number of events handled by the operation separated by task type.
To send metrics for external monitoring, click on More > Monitoring Reports > Add Monitor.
This is an advanced tool for users who wish to get some visibility on the specific tasks within their operations that are being delayed.
Upsolver updates the status of each operation task in a file called
s3://<Default Bucket or S3 connection>/tasks-cache/<Cluster ID>.csv.gzstored within your default S3 connection bucket.
To find your default S3 bucket, go to More > Connections; within your list of AWS S3 connections, your default connection should be highlighted in a box labelled Default Connection.
The S3 URI below the connection name is where your
tasks-cachecan be found.
This file contains all tasks for all operations running on that cluster. You can grep the tasks using the operation ID as follows:
aws s3 cp "s3://<Default Bucket or S3 connection>/tasks-cache/<Cluster ID>.csv.gz" - | gunzip | grep "<Operation ID>"
Note that the ID of any operation can be found within the URL of that operation's page.
The command returns a comma-delimited list of all the task info in the following order:
<task name> , <first missing minute>, <delay of the first missing minute>, <max delay of the task>, <start time>