Real-time Data Ingestion — Amazon Kinesis to ClickHouse
This guide takes you through the steps for ingesting your streaming data from Amazon Kinesis into your ClickHouse database.
Upsolver's integration with ClickHouse enables customers to reliably ingest large volumes of data in near real-time. With support for the main industry data platforms, Upsolver can ingest from databases, streaming, and file sources into ClickHouse Cloud 50-100X faster than other solutions.
Purpose-built for big data, Upsolver easily moves high-volume, streaming data from your operational systems into your analytics target. Whether your application source is generating events at a rate of thousands per second, or your gaming or ad-tech company creates millions of events per second, Upsolver can ingest your stream into your data lakehouse and target analytics systems with ease.
Furthermore, Upsolver automates schema evolution, maintains performance, and ensures the strongly-ordered, exactly once-delivery of your data.
Upsolver automatically detects changes in the source and streams only changed rows to ClickHouse. Orchestration and task scheduling is handled internally, eliminating the need for external tools like Airflow or AWS Lambda.
What you will learn
Upsolver enables you to deliver high-volume data to ClickHouse in a few simple steps. In this guide, you will learn how to connect to your source and target data platforms, create a simple but powerful job to ingest data from Amazon Kinesis to ClickHouse, and then observe your data to ensure data is delivered as expected.
Prerequisites
Before you begin, please read the guide on How To Configure Access to Amazon Kinesis to enable permissions for Upsolver to read the data from your Amazon Kinesis source.
The steps for ingesting your data from Amazon Kinesis to ClickHouse are as follows:
Connect to Amazon Kinesis
Connect to ClickHouse
Create an ingestion job
Observe your data
Step 1
Connect to Amazon Kinesis
The first step is to connect to your Amazon Kinesis source. When you create a connection in Upsolver, other users in your organization will be able to use this connection, and it is always on, meaning you don't need to reconnect if you want to create another job, for example.
Here's the code:
Here, a connection named my_kinesis_connection is created with access to all streams. An optional comment was added to inform other users what this connection does and this is visible in the connections system table.
Step 2
Create a connection to ClickHouse
After you have successfully connected to your Amazon Kinesis stream, the next step is to create a connection to your target ClickHouse database. Again, this connection is visible and available to all users within your organization, and always on.
Here's the code:
In the above example, a connection named my_clickhouse_connection is created to access the sales_db database. Again, an optional comment was added to inform other users as to what this connection does and this is visible in the connections system table.
Step 3
Create an ingestion job
Now that you have a connection to your source and target, you can create a job to ingest the data. As with the connections you created, the job will be visible to other users in your organization.
Here's the code to create the ingestion job:
Let's take a look at what this code does. A job named load_kinesis_to_clickhouse is created with an optional COMMENT
to describe its purpose. This is useful for future reference if you or other users in your organization query the jobs system table to lookup metadata for your jobs.
The job includes options that we can customize for our use case. Using START_FROM
, we instruct Upsolver to stream events from a point in time. We could specify a date here, but instead are using BEGINNING to ingest all events discovered in the source.
The CONTENT_TYPE
option enables us to determine which file types should be ingested, though we have specified AUTO so that Upsolver automatically detects the type for us.
In this example, we want to perform some data protection operations to ensure PII data is not visible in the target. Using the EXCLUDE_COLUMNS
option, we can specify a list of columns that we don't want Upsolver to write to the target. In this case, the customer.password column is excluded from being written to our ClickHouse target.
Furthermore, we want to ingest the customer.email column but mask the contents. The COLUMN_TRANSFORMATIONS
option enables us to create a new column named hashed_email in the target, mask the customer.email value using the MD5()
function, and write the hashed value into our new column.
We can also prevent duplicates from reaching our ClickHouse target using DEDUPLICATE_WITH
. This option accepts one or more columns and a time window over which any duplicate rows will be identified and removed. In our job, we instruct Upsolver to ignore any duplicate orderid values within a time window of 4 HOURS, so they don't get written to our target database.
The last job option specified is COMMIT_INTERVAL
. This tells Upsolver how frequently the data should be written to the target. It can be more cost efficient to set a longer write time, depending on the target system. In our case, writing the data every 5 MINUTES is often enough to fulfill downstream business objectives, but you can adjust this to suit requirements for how fresh the data should be be, weighed against vendor costs for writing data.
Using the COPY FROM
command, the job copies the data from the source, in this case the orders STREAM
in the Kinesis source specified in the my_kinesis_connection connection we created above. The data is loaded into the orders_tbl table in the sales_db database in our my_clickhouse_connection connection.
A really useful way to prevent bad data reaching our downstream targets is to include data quality expectations in our job, to ensure the data reaching our target is of the quality that end-users anticipate. Using an EXPECTATION
, we can check each row based on a predicate, and either create a warning for the row, or delete it. You can use any of the supported functions and operators in Upsolver's extensive library to create expectations.
In this example, the first expectation, exp_custid_not_null, checks that the customer.customer_id column is not NULL. If the value is NULL, the row is dropped and won't be loaded into the target. The second expectation, exp_taxrate, examines the value of the taxrate column. If the value is not equal to 0.12, a warning is raised, but the row is not dropped. In the next step, you will learn how to view the results of rows that don't match expectations.
Step 4
Observe your data
After the job has been executed and is loading data into ClickHouse, you can observe the stream using Datasets, which provide a wealth of information on your pipelines and data health. From the main menu, click Datasets. In the Connections tree, expand the nodes to display the ClickHouse connection.
In the Schema tab, notice the Overview and Written Rows Over Time cards that give instant insight into your pipeline. Scroll through the list of columns and rows in the Written Data Statistics table, which displays the ingested columns, the Type and Density, along with other useful metadata. Click on a column name to view the individual column statistics. Notice the Values by Frequency card, which displays a list of unique values within the column, and the percentage showing how often this value is seen across the dataset.
Click on the Data Violations tab. Here you will find the expectations created in Step 3. In the Expectations table, notice the two expectations for the load_kinesis_to_clickhouse job. If you have existing expectations, you can use the Search expectations box to filter on the job name to make them visible.
The Action column displays warn or drop, to indicate how Upsolver handles rows that do not meet the expectation. The Violations Today column displays a count of the number of times the expectation was violated since midnight (UTC time) today, alongside Total Violations, which is a count of all violations since the expectation was created.
Learn More
Learn about Expectations and then follow the how-to guide on Managing Data Quality - Ingesting Data with Expectations to discover how to create a report to monitor your expectations.
Conclusion
In this guide, you learned how to create connections to your source and target data platforms. Then you saw how easy it is to use straightforward SQL code to create an ingestion job to copy your streaming data into a ClickHouse database. You learned how to include job options to customize the job, protect PII, and ensure sensitive data was either excluded or masked before it landed in the target.
Finally, you explored how expectations can perform in-flight data quality checks, either by dropping disqualifying rows, or logged a warning in the system table. Finally you learned how to use Datasets to observe your data and uncover spikes or drops in volume, and check for rows not meeting your data expectation requirements.
Try it yourself
To ingest your streaming data from Amazon Kinesis to ClickHouse:
Create a connection to your Amazon Kinesis source
Create a connection to your ClickHouse database
Write a job to ingest data from Kinesis to ClickHouse
Create expectations to maintain data quality standards
Observe your data with Datasets
Last updated