Real-time Data Ingestion — Amazon Kinesis to ClickHouse

This guide takes you through the steps for ingesting your streaming data from Amazon Kinesis into your ClickHouse database.

Upsolver's integration with ClickHouse enables customers to reliably ingest large volumes of data in near real-time. With support for the main industry data platforms, Upsolver can ingest from databases, streaming, and file sources into ClickHouse Cloud 50-100X faster than other solutions.

Purpose-built for big data, Upsolver easily moves high-volume, streaming data from your operational systems into your analytics target. Whether your application source is generating events at a rate of thousands per second, or your gaming or ad-tech company creates millions of events per second, Upsolver can ingest your stream into your data lakehouse and target analytics systems with ease.

Furthermore, Upsolver automates schema evolution, maintains performance, and ensures the strongly-ordered, exactly once-delivery of your data.

Ingest high scale, streaming events into ClickHouse with Upsolver.

Upsolver automatically detects changes in the source and streams only changed rows to ClickHouse. Orchestration and task scheduling is handled internally, eliminating the need for external tools like Airflow or AWS Lambda.

What you will learn

Upsolver enables you to deliver high-volume data to ClickHouse in a few simple steps. In this guide, you will learn how to connect to your source and target data platforms, create a simple but powerful job to ingest data from Amazon Kinesis to ClickHouse, and then observe your data to ensure data is delivered as expected.


Prerequisites

Before you begin, please read the guide on How To Configure Access to Amazon Kinesis to enable permissions for Upsolver to read the data from your Amazon Kinesis source.

The steps for ingesting your data from Amazon Kinesis to ClickHouse are as follows:

  1. Connect to Amazon Kinesis

  2. Connect to ClickHouse

  3. Create an ingestion job

  4. Observe your data


Step 1

Connect to Amazon Kinesis

The first step is to connect to your Amazon Kinesis source. When you create a connection in Upsolver, other users in your organization will be able to use this connection, and it is always on, meaning you don't need to reconnect if you want to create another job, for example.

Here's the code:

CREATE KINESIS CONNECTION my_kinesis_connection
    AWS_ROLE = 'arn:aws:iam::123456789012:role/upsolver-sqlake-role'
    REGION = 'us-east-1'
    COMMENT = 'Connection to all Kinesis streams';

Here, a connection named my_kinesis_connection is created with access to all streams. An optional comment was added to inform other users what this connection does and this is visible in the connections system table.

Step 2

Create a connection to ClickHouse

After you have successfully connected to your Amazon Kinesis stream, the next step is to create a connection to your target ClickHouse database. Again, this connection is visible and available to all users within your organization, and always on.

Here's the code:

CREATE CLICKHOUSE CONNECTION my_clickhouse_connection
    CONNECTION_STRING = 'http://x.us-east-1.aws.clickhouse.cloud:8123/sales_db'
    USER_NAME = 'my_username'
    PASSWORD = 'my_password'
    COMMENT = 'Connection to Sales database';

In the above example, a connection named my_clickhouse_connection is created to access the sales_db database. Again, an optional comment was added to inform other users as to what this connection does and this is visible in the connections system table.

Step 3

Create an ingestion job

Now that you have a connection to your source and target, you can create a job to ingest the data. As with the connections you created, the job will be visible to other users in your organization.

Here's the code to create the ingestion job:

CREATE SYNC JOB load_kinesis_to_clickhouse
    COMMENT = 'Ingest sales orders from Kinesis to Clickhouse
    START_FROM = BEGINNING
    CONTENT_TYPE = AUTO
    EXCLUDE_COLUMNS = ('customer.password') 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))       
    DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 4 HOURS)
    COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection
    STREAM = 'orders'  
INTO CLICKHOUSE my_clickhouse_connection.sales_db.orders_tbl
    WITH EXPECTATION exp_custid_not_null 
      EXPECT customer.customer_id IS NOT NULL ON VIOLATION DROP
    WITH EXPECTATION exp_taxrate 
      EXPECT taxrate = 0.12 ON VIOLATION WARN;

Let's take a look at what this code does. A job named load_kinesis_to_clickhouse is created with an optional COMMENT to describe its purpose. This is useful for future reference if you or other users in your organization query the jobs system table to lookup metadata for your jobs.

The job includes options that we can customize for our use case. Using START_FROM, we instruct Upsolver to stream events from a point in time. We could specify a date here, but instead are using BEGINNING to ingest all events discovered in the source.

The CONTENT_TYPE option enables us to determine which file types should be ingested, though we have specified AUTO so that Upsolver automatically detects the type for us.

In this example, we want to perform some data protection operations to ensure PII data is not visible in the target. Using the EXCLUDE_COLUMNS option, we can specify a list of columns that we don't want Upsolver to write to the target. In this case, the customer.password column is excluded from being written to our ClickHouse target.

Furthermore, we want to ingest the customer.email column but mask the contents. The COLUMN_TRANSFORMATIONS option enables us to create a new column named hashed_email in the target, mask the customer.email value using the MD5() function, and write the hashed value into our new column.

We can also prevent duplicates from reaching our ClickHouse target using DEDUPLICATE_WITH. This option accepts one or more columns and a time window over which any duplicate rows will be identified and removed. In our job, we instruct Upsolver to ignore any duplicate orderid values within a time window of 4 HOURS, so they don't get written to our target database.

The last job option specified is COMMIT_INTERVAL. This tells Upsolver how frequently the data should be written to the target. It can be more cost efficient to set a longer write time, depending on the target system. In our case, writing the data every 5 MINUTES is often enough to fulfill downstream business objectives, but you can adjust this to suit requirements for how fresh the data should be be, weighed against vendor costs for writing data.

Using the COPY FROM command, the job copies the data from the source, in this case the orders STREAM in the Kinesis source specified in the my_kinesis_connection connection we created above. The data is loaded into the orders_tbl table in the sales_db database in our my_clickhouse_connection connection.

A really useful way to prevent bad data reaching our downstream targets is to include data quality expectations in our job, to ensure the data reaching our target is of the quality that end-users anticipate. Using an EXPECTATION, we can check each row based on a predicate, and either create a warning for the row, or delete it. You can use any of the supported functions and operators in Upsolver's extensive library to create expectations.

In this example, the first expectation, exp_custid_not_null, checks that the customer.customer_id column is not NULL. If the value is NULL, the row is dropped and won't be loaded into the target. The second expectation, exp_taxrate, examines the value of the taxrate column. If the value is not equal to 0.12, a warning is raised, but the row is not dropped. In the next step, you will learn how to view the results of rows that don't match expectations.

Step 4

Observe your data

After the job has been executed and is loading data into ClickHouse, you can observe the stream using Datasets, which provide a wealth of information on your pipelines and data health. From the main menu, click Datasets. In the Connections tree, expand the nodes to display the ClickHouse connection.

In the Schema tab, notice the Overview and Written Rows Over Time cards that give instant insight into your pipeline. Scroll through the list of columns and rows in the Written Data Statistics table, which displays the ingested columns, the Type and Density, along with other useful metadata. Click on a column name to view the individual column statistics. Notice the Values by Frequency card, which displays a list of unique values within the column, and the percentage showing how often this value is seen across the dataset.

Click on the Data Violations tab. Here you will find the expectations created in Step 3. In the Expectations table, notice the two expectations for the load_kinesis_to_clickhouse job. If you have existing expectations, you can use the Search expectations box to filter on the job name to make them visible.

The Action column displays warn or drop, to indicate how Upsolver handles rows that do not meet the expectation. The Violations Today column displays a count of the number of times the expectation was violated since midnight (UTC time) today, alongside Total Violations, which is a count of all violations since the expectation was created.

Learn More

Learn about Expectations and then follow the how-to guide on Managing Data Quality - Ingesting Data with Expectations to discover how to create a report to monitor your expectations.


Conclusion

In this guide, you learned how to create connections to your source and target data platforms. Then you saw how easy it is to use straightforward SQL code to create an ingestion job to copy your streaming data into a ClickHouse database. You learned how to include job options to customize the job, protect PII, and ensure sensitive data was either excluded or masked before it landed in the target.

Finally, you explored how expectations can perform in-flight data quality checks, either by dropping disqualifying rows, or logged a warning in the system table. Finally you learned how to use Datasets to observe your data and uncover spikes or drops in volume, and check for rows not meeting your data expectation requirements.


Try it yourself

To ingest your streaming data from Amazon Kinesis to ClickHouse:

  1. Create a connection to your Amazon Kinesis source

  2. Create a connection to your ClickHouse database

  3. Write a job to ingest data from Kinesis to ClickHouse

    1. Create expectations to maintain data quality standards

  4. Observe your data with Datasets

Last updated