Comment on page
Managing data quality - ingesting data with expectations
This guide shows you how to create expectations in your jobs, and how to use the system tables in Upsolver to monitor the quality of your ingested data
Expectations enable you to define data quality conditions to prevent bad data from polluting your downstream data warehouse or lake. Using expectations, data is evaluated during the ingestion process, and you can choose to either drop a row containing data that does not meet expectations, or load the row into the target and trigger a warning. Expectations enable you to check for null values, ensure data falls within a range, or match a defined value or regular expression.
A major advantage of expectations is the ability to automatically perform an action on data during the ingestion process, saving time in fixing incorrect or low-quality data after it has landed in the target system.
An expectation includes one or more predicate conditions that return a Boolean value. You can include any conditions supported by the
WHEREclause, but not aggregations.
This guide shows you how to create, alter, and drop expectations, and how to use the system tables to monitor rows violating the expectation rules. You will start by creating a connection to your Amazon S3 bucket and loading data into a staging table using a job that includes expectations. After your job is running, you will create another job to collect metrics from the system tables so that you can report on them. Let's get started.
To transfer your data, you must create a SQLake connection. This connection gives you the ability to configure the AWS IAM credentials that SQLake needs to access the data.
When you deploy SQLake, the cluster will already have an IAM role attached that may or may not have permission to access the S3 path where your data resides. In the event that it lacks those permissions, you can assign a scope-down policy to the S3 connection. This is recommended to prevent failures, misconfigurations, and breaches.
Here's the code:
-- Create a connection to Amazon S3
CREATE S3 CONNECTION my_s3_connection
AWS_ROLE = 'arn:aws:iam::001234567890:role/upsolver-role-<id>'
READ_ONLY = TRUE
--PATH_DISPLAY_FILTERS = ('s3://quickstart-vpclogs-test', 's3://upsolver-tutorials-vpclogs')
COMMENT = "my s3 connection";
Next, create a staging table in your data lake. SQLake automatically converts your files into the Apache Parquet format and creates a table with the appropriate schema in the AWS Glue Data Catalog.
Here’s the code to create the table:
-- Create a staging table in the data lake
CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data()
PARTITIONED BY $event_date;
Now that you have your staging table, you can create a job to load data into this table, and include expectations in the job to check the quality of data as it is ingested.
Here's the code to create the job, including two expectations:
-- Create a job with expectations
CREATE SYNC JOB load_orders_with_expectations
CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples LOCATION = 's3://upsolver-samples/orders/'
WITH EXPECTATION exp_orderid_not_null EXPECT orderid IS NOT NULL OR orderid <> '' ON VIOLATION DROP
WITH EXPECTATION exp_nettotal_not_zero EXPECT nettotal > 0 ON VIOLATION WARN;
Let's understand what this job does:
- 1.Automatically infers the schema and populates the column names and types in the Glue Data Catalog.
- 2.Converts the source S3 files to Apache Parquet, which stores the data in a columnar format optimized for faster reads. You can learn more about the properties you can pass to the
CREATE JOBoperation here.
- 3.Creates the exp_orderid_not_null expectation to check each row has an orderid value. The
ON VIOLATION DROPsetting specifies that a row is dropped from the ingestion stream if the condition is violated because the value is either NULL or an empty string.
- 4.Creates the exp_nettotal_not_zero expectation to check that the nettotal value is greater than zero, and raises a warning when this condition returns false. However, because the defined action is
WARN, the row is loaded into the target.
Note that if a row violates both of the above conditions, the system tables reflect that two expectations were triggered, but only one row was dropped.
Your job is now running and loading data into the staging table. However, you realize that you need to add another check to the job: all orders should have a value of USA for the country, but sometimes orders arrive that are outside of this region. You want to keep the data in the staging table, but you need to know when an order is not in the USA.
Here's the code to add an expectation to your existing job:
-- Alter a job and add a new expectation
ALTER JOB load_orders_with_expectations
ADD EXPECTATION exp_country
EXPECT customer.address.country = 'USA' ON VIOLATION WARN;
Let's understand what this code does:
- 1.Creates a new expectation named exp_country to the job you already created. The expectation checks that the customer.address.country value equals USA. The
WARNaction allows the row to be loaded into the staging table when the condition is violated, however, the system tables are incremented for each row that triggers the alert.
- 2.The job now has three expectations checking the data on ingestion.
Here's the code to drop the expectation:
-- Drop an expectation from a job
ALTER JOB load_orders_with_expectations
DROP EXPECTATION exp_nettotal_not_zero;
Not only does this code drop the expectation from the job, but it also removes the records from the system tables. In Steps 6-8, you will create a job to record these metrics so they are retained even when an expectation is dropped.
You can monitor the quality of ingested data by automatically collecting the count of rows that violated the conditions you created in your expectations and storing the metrics in a data lake table.
First, let's create a reporting table to record the metrics generated by your expectations. This table will be used for all jobs and expectations that exist now and in the future.
Here's the code to create the reporting table:
-- Create a table to store the reporting data
CREATE TABLE default_glue_catalog.upsolver_samples.expectations_report (
PARTITIONED BY partition_date;
Now that you have created your reporting table, you want to create a job to gather the metrics from the
system.monitoring.expectationstable and load the data in the table.
Here's the code to create the job:
-- Create a job to collect the metrics over time
CREATE JOB populate_expectations_report
COMMENT = 'Collect stats for expectations in all jobs'
RUN_INTERVAL = 1 HOUR
AS INSERT INTO default_glue_catalog.upsolver_samples.expectations_report
run_end_time() AS partition_date
Let's take a look at what this code does:
- 1.Creates a job that runs hourly. The
RUN_INTERVALoption can be adjusted to collect the data on a frequency that meets your reporting requirements.
- 2.Inserts all job expectations and their trigger counts into the reporting table.
WHEREclause has not been included, so all job expectations are loaded into the report table. Depending on your requirements, you can easily add a
WHEREclause to filter by job(s), and/or expectation(s):
The next step is to query the reporting table to view the quality of the data you are ingesting. Let's look at the data for the last week.
Here's the code to return the results:
-- Return the reporting data for the last week
SELECT job_name, expectation_name, action,
WHERE "$commit_time" >= DATE_ADD('day', -7, "$commit_time")
ORDER BY job_name, expectation_name, "$commit_time";
Let's understand what this code does:
- 1.Returns the job and expectation name, and the action that should happen when the condition is violated.
- 2.Returns the number of times the condition was violated today (UTC time) since midnight.
- 3.Returns the number of times the condition was violated since the job started.
- 4.Returns the time the counts were recorded.
- 5.Filters the results to the last seven days, though you can easily change this depending on your reporting needs.
By querying the system tables and creating your own reporting tables and jobs, you can customize the observability level and frequency, and extract the results for use in your own dashboards or analytical tools.
In this guide, you learned how to add expectations to a new job and also to an existing job. By adding expectations, you can observe the quality of ingested data, and decide if rows should reach the target system.
Expectations provide you with the control to ensure that downstream targets are unpolluted by missing or invalid data. In addition, you saw how expectations can easily be dropped without impacting the execution of the job.
Furthermore, you were introduced to the system tables designed specifically to monitor your expectations, and you saw how you can create a job to collect metrics over time.
To manage data quality:
- 1.Create an expectation on a new job, or add a new expectation to an existing job.
- 2.Monitor your expectations using the system tables.
- 3.Create a job to collect metrics and build custom reports.