Managing data quality - ingesting data with expectations

This guide shows you how to create expectations in your jobs, and how to use the system tables in Upsolver to monitor the quality of your ingested data

Expectations enable you to define data quality conditions to prevent bad data from polluting your downstream data warehouse or lake. Using expectations, data is evaluated during the ingestion process, and you can choose to either drop a row containing data that does not meet expectations, or load the row into the target and trigger a warning. Expectations enable you to check for null values, ensure data falls within a range, or match a defined value or regular expression.

A major advantage of expectations is the ability to automatically perform an action on data during the ingestion process, saving time in fixing incorrect or low-quality data after it has landed in the target system.

An expectation includes one or more predicate conditions that return a Boolean value. You can include any conditions supported by the WHERE clause, but not aggregations.

This guide shows you how to create, alter, and drop expectations, and how to use the system tables to monitor rows violating the expectation rules. You will start by creating a connection to your Amazon S3 bucket and loading data into a staging table using a job that includes expectations. After your job is running, you will create another job to collect metrics from the system tables so that you can report on them. Let's get started.

Step 1

Connect to your Amazon S3 bucket

To transfer your data, you must create a SQLake connection. This connection gives you the ability to configure the AWS IAM credentials that SQLake needs to access the data.

When you deploy SQLake, the cluster will already have an IAM role attached that may or may not have permission to access the S3 path where your data resides. In the event that it lacks those permissions, you can assign a scope-down policy to the S3 connection. This is recommended to prevent failures, misconfigurations, and breaches.

Here's the code:

-- Create a connection to Amazon S3
CREATE S3 CONNECTION my_s3_connection
AWS_ROLE = 'arn:aws:iam::001234567890:role/upsolver-role-<id>'
      READ_ONLY = TRUE
      --PATH_DISPLAY_FILTERS = ('s3://quickstart-vpclogs-test', 's3://upsolver-tutorials-vpclogs')
      COMMENT = "my s3 connection";

For future reference, you can copy your AWS_ROLE from your AWS IAM user page.

Step 2

Create a staging table

Next, create a staging table in your data lake. SQLake automatically converts your files into the Apache Parquet format and creates a table with the appropriate schema in the AWS Glue Data Catalog.

Here’s the code to create the table:

-- Create a staging table in the data lake
CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data()
PARTITIONED BY $event_date;

Step 3

Create a job to load the data

Now that you have your staging table, you can create a job to load data into this table, and include expectations in the job to check the quality of data as it is ingested.

Here's the code to create the job, including two expectations:

-- Create a job with expectations
CREATE SYNC JOB load_orders_with_expectations
   CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples LOCATION = 's3://upsolver-samples/orders/' 
   INTO default_glue_catalog.upsolver_samples.orders_raw_data
   WITH EXPECTATION exp_orderid_not_null EXPECT orderid IS NOT NULL OR orderid <> '' ON VIOLATION DROP
   WITH EXPECTATION exp_nettotal_not_zero EXPECT nettotal > 0 ON VIOLATION WARN; 

Let's understand what this job does:

  1. Automatically infers the schema and populates the column names and types in the Glue Data Catalog.

  2. Converts the source S3 files to Apache Parquet, which stores the data in a columnar format optimized for faster reads. You can learn more about the properties you can pass to the CREATE JOB operation here.

  3. Creates the exp_orderid_not_null expectation to check each row has an orderid value. The ON VIOLATION DROP setting specifies that a row is dropped from the ingestion stream if the condition is violated because the value is either NULL or an empty string.

  4. Creates the exp_nettotal_not_zero expectation to check that the nettotal value is greater than zero, and raises a warning when this condition returns false. However, because the defined action is WARN, the row is loaded into the target.

Note that if a row violates both of the above conditions, the system tables reflect that two expectations were triggered, but only one row was dropped.

Step 4

Add an expectation to the job

Your job is now running and loading data into the staging table. However, you realize that you need to add another check to the job: all orders should have a value of USA for the country, but sometimes orders arrive that are outside of this region. You want to keep the data in the staging table, but you need to know when an order is not in the USA.

Here's the code to add an expectation to your existing job:

-- Alter a job and add a new expectation
ALTER JOB load_orders_with_expectations
ADD EXPECTATION exp_country 
    EXPECT customer.address.country = 'USA' ON VIOLATION WARN;

Let's understand what this code does:

  1. Creates a new expectation named exp_country to the job you already created. The expectation checks that the customer.address.country value equals USA. The WARN action allows the row to be loaded into the staging table when the condition is violated, however, the system tables are incremented for each row that triggers the alert.

  2. The job now has three expectations checking the data on ingestion.

Step 5

Drop an expectation from the job

An expectation becomes redundant and you want to remove it from the job. You can use the ALTER JOB command to drop the expectation.

Here's the code to drop the expectation:

-- Drop an expectation from a job
ALTER JOB load_orders_with_expectations 
DROP EXPECTATION exp_nettotal_not_zero;

Not only does this code drop the expectation from the job, but it also removes the records from the system tables. In Steps 6-8, you will create a job to record these metrics so they are retained even when an expectation is dropped.

Step 6

Create a reporting table to collect metrics

You can monitor the quality of ingested data by automatically collecting the count of rows that violated the conditions you created in your expectations and storing the metrics in a data lake table.

First, let's create a reporting table to record the metrics generated by your expectations. This table will be used for all jobs and expectations that exist now and in the future.

Here's the code to create the reporting table:

-- Create a table to store the reporting data
CREATE TABLE default_glue_catalog.upsolver_samples.expectations_report (
      job_id STRING,
      job_name STRING,
      expectation_name STRING,
      action STRING,
      triggered_today BIGINT, 
      triggered_total BIGINT,
      partition_date DATE
    )
    PARTITIONED BY partition_date;

Step 7

Create a job to collect metrics

Now that you have created your reporting table, you want to create a job to gather the metrics from the system.monitoring.expectations table and load the data in the table.

Here's the code to create the job:

-- Create a job to collect the metrics over time
CREATE JOB populate_expectations_report
        COMMENT = 'Collect stats for expectations in all jobs'
        RUN_INTERVAL = 1 HOUR
AS INSERT INTO default_glue_catalog.upsolver_samples.expectations_report
        MAP_COLUMNS_BY_NAME
        SELECT  job_id,
                job_name,
                expectation_name,
                action,
                triggered_today, 
                triggered_total,
                run_end_time() AS partition_date
        FROM system.monitoring.expectations; 

Let's take a look at what this code does:

  1. Creates a job that runs hourly. The RUN_INTERVAL option can be adjusted to collect the data on a frequency that meets your reporting requirements.

  2. Inserts all job expectations and their trigger counts into the reporting table.

  3. A WHERE clause has not been included, so all job expectations are loaded into the report table. Depending on your requirements, you can easily add a WHERE clause to filter by job(s), and/or expectation(s):

Step 8

Query the report table

The next step is to query the reporting table to view the quality of the data you are ingesting. Let's look at the data for the last week.

Here's the code to return the results:

-- Return the reporting data for the last week
SELECT  job_name, expectation_name, action, 
        triggered_today, 
        triggered_total,
        "$commit_time"
FROM    default_glue_catalog.upsolver_samples.expectations_report
WHERE   "$commit_time" >= DATE_ADD('day', -7, "$commit_time") 
ORDER   BY job_name, expectation_name, "$commit_time";

Let's understand what this code does:

  1. Returns the job and expectation name, and the action that should happen when the condition is violated.

  2. Returns the number of times the condition was violated today (UTC time) since midnight.

  3. Returns the number of times the condition was violated since the job started.

  4. Returns the time the counts were recorded.

  5. Filters the results to the last seven days, though you can easily change this depending on your reporting needs.

By querying the system tables and creating your own reporting tables and jobs, you can customize the observability level and frequency, and extract the results for use in your own dashboards or analytical tools.

Conclusion

In this guide, you learned how to add expectations to a new job and also to an existing job. By adding expectations, you can observe the quality of ingested data, and decide if rows should reach the target system.

Expectations provide you with the control to ensure that downstream targets are unpolluted by missing or invalid data. In addition, you saw how expectations can easily be dropped without impacting the execution of the job.

Furthermore, you were introduced to the system tables designed specifically to monitor your expectations, and you saw how you can create a job to collect metrics over time.

Try it yourself

To manage data quality:

  1. Create an expectation on a new job, or add a new expectation to an existing job.

  2. Monitor your expectations using the system tables.

  3. Create a job to collect metrics and build custom reports.

For more information and syntax examples, see the section on creating Expectations in your ingestion jobs.

Last updated