Managing Data Quality - Ingesting Data with Expectations
This guide shows you how to create expectations in your jobs, and how to use the system tables in Upsolver to monitor the quality of your ingested data
Expectations enable you to define data quality conditions to prevent bad data from polluting your downstream data warehouse or lake. Using expectations, data is evaluated during the ingestion process, and you can choose to either drop a row containing data that does not meet expectations, or load the row into the target and trigger a warning. Expectations enable you to check for null values, ensure data falls within a range, or match a defined value or regular expression.
A major advantage of expectations is the ability to automatically perform an action on data during the ingestion process, saving time in fixing incorrect or low-quality data after it has landed in the target system.
An expectation includes one or more predicate conditions that return a Boolean value. You can include any conditions supported by the WHERE
clause, but not aggregations.
This guide shows you how to create, alter, and drop expectations, and how to use the system tables to monitor rows violating the expectation rules. You will start by creating a connection to your Amazon S3 bucket and loading data into a staging table using a job that includes expectations. After your job is running, you will create another job to collect metrics from the system tables so that you can report on them. Let's get started.
Step 1
Connect to your Amazon S3 bucket
To transfer your data, you must create a Upsolver connection. This connection gives you the ability to configure the AWS IAM credentials that Upsolver needs to access the data.
When you deploy Upsolver, the cluster will already have an IAM role attached that may or may not have permission to access the S3 path where your data resides. In the event that it lacks those permissions, you can assign a scope-down policy to the S3 connection. This is recommended to prevent failures, misconfigurations, and breaches.
Here's the code:
For future reference, you can copy your AWS_ROLE
from your AWS IAM user page.
Step 2
Create a staging table
Next, create a staging table in your data lake. Upsolver automatically converts your files into the Apache Parquet format and creates a table with the appropriate schema in the AWS Glue Data Catalog.
Here’s the code to create the table:
Step 3
Create a job to load the data
Now that you have your staging table, you can create a job to load data into this table, and include expectations in the job to check the quality of data as it is ingested.
Here's the code to create the job, including two expectations:
Let's understand what this job does:
Automatically infers the schema and populates the column names and types in the Glue Data Catalog.
Converts the source S3 files to Apache Parquet, which stores the data in a columnar format optimized for faster reads. You can learn more about the properties you can pass to the
CREATE JOB
operation here.Creates the exp_orderid_not_null expectation to check each row has an orderid value. The
ON VIOLATION DROP
setting specifies that a row is dropped from the ingestion stream if the condition is violated because the value is either NULL or an empty string.Creates the exp_nettotal_not_zero expectation to check that the nettotal value is greater than zero, and raises a warning when this condition returns false. However, because the defined action is
WARN
, the row is loaded into the target.
Note that if a row violates both of the above conditions, the system tables reflect that two expectations were triggered, but only one row was dropped.
Step 4
Add an expectation to the job
Your job is now running and loading data into the staging table. However, you realize that you need to add another check to the job: all orders should have a value of USA for the country, but sometimes orders arrive that are outside of this region. You want to keep the data in the staging table, but you need to know when an order is not in the USA.
Here's the code to add an expectation to your existing job:
Let's understand what this code does:
Creates a new expectation named exp_country to the job you already created. The expectation checks that the customer.address.country value equals USA. The
WARN
action allows the row to be loaded into the staging table when the condition is violated, however, the system tables are incremented for each row that triggers the alert.The job now has three expectations checking the data on ingestion.
Step 5
Drop an expectation from the job
An expectation becomes redundant and you want to remove it from the job. You can use the ALTER JOB
command to drop the expectation.
Here's the code to drop the expectation:
Not only does this code drop the expectation from the job, but it also removes the records from the system tables. In Steps 6-8, you will create a job to record these metrics so they are retained even when an expectation is dropped.
Step 6
Create a reporting table to collect metrics
You can monitor the quality of ingested data by automatically collecting the count of rows that violated the conditions you created in your expectations and storing the metrics in a data lake table.
First, let's create a reporting table to record the metrics generated by your expectations. This table will be used for all jobs and expectations that exist now and in the future.
Here's the code to create the reporting table:
Step 7
Create a job to collect metrics
Now that you have created your reporting table, you want to create a job to gather the metrics from the system.monitoring.expectations table and load the data in the table.
Here's the code to create the job:
Let's take a look at what this code does:
Creates a job that runs hourly. The
RUN_INTERVAL
option can be adjusted to collect the data on a frequency that meets your reporting requirements.Inserts all job expectations and their trigger counts into the reporting table.
A
WHERE
clause has not been included, so all job expectations are loaded into the report table. Depending on your requirements, you can easily add aWHERE
clause to filter by job(s), and/or expectation(s):
Step 8
Query the report table
The next step is to query the reporting table to view the quality of the data you are ingesting. Let's look at the data for the last week.
Here's the code to return the results:
Let's understand what this code does:
Returns the job and expectation name, and the action that should happen when the condition is violated.
Returns the number of times the condition was violated today (UTC time) since midnight.
Returns the number of times the condition was violated since the job started.
Returns the time the counts were recorded.
Filters the results to the last seven days, though you can easily change this depending on your reporting needs.
By querying the system tables and creating your own reporting tables and jobs, you can customize the observability level and frequency, and extract the results for use in your own dashboards or analytical tools.
Conclusion
In this guide, you learned how to add expectations to a new job and also to an existing job. By adding expectations, you can observe the quality of ingested data, and decide if rows should reach the target system.
Expectations provide you with the control to ensure that downstream targets are unpolluted by missing or invalid data. In addition, you saw how expectations can easily be dropped without impacting the execution of the job.
Furthermore, you were introduced to the system tables designed specifically to monitor your expectations, and you saw how you can create a job to collect metrics over time.
Try it yourself
To manage data quality:
Create an expectation on a new job, or add a new expectation to an existing job.
Monitor your expectations using the system tables.
Create a job to collect metrics and build custom reports.
Learn More
Last updated