Real-time data ingestion in Snowflake — Apache Kafka to Snowflake

This guide explores a common use case: optimizing and transforming event data in JSON format.

Apache Kafka is an open-source distributed event streaming platform set apart by its ability to handle a high volume of data from various systems and applications. Popular data warehouses such as Snowflake and Amazon Redshift take data from Kafka and enable you to run analytics and derive insight. But these warehouses are designed to support batch processing. They also struggle with data consistency, particularly when there’s a constant flow of evolving data. This can increase both the cost of preparing data and the time it takes to design, implement, and maintain your data architecture.

You can structure data lakes in many different ways. Most commonly, though, you structure your data in three logical zones: the Raw Zone, the Staging Zone, and the Refined Zone. This is Upsolver's recommended approach. It's simple to get started, scales with your data, and easily extends to support new use cases.

The raw zone stores all data. However, as the name implies, the data in question is uncleaned, unverified, and unoptimized for analysis. At this point, the data is only being collected, stored, and cataloged.

In the staging zone, you expose data for easy discovery and convert it to an optimized file format such as Apache Parquet. It's also compressed -- this reduces the cost of storage -- and automatically compacted, in which small files merge into larger, efficient ones. These optimizations improve the hygiene of your data and the performance of query engines. This results in reliable and fresh query results. While you can query the data now, it hasn't yet been transformed to match exact business needs. You define data pipelines that read and transform the data in the staging zone and store the results in the refined zone.

SQLake automatically optimizes data in the refined zone. Here's how:

  1. It converts the files into Apache Parquet

  2. It partitions the data by date

  3. It continuously compacts small files. The schema evolves over time as this occurs.

The end result is a prepared and fully-optimized data lake that you can query and get results from in a matter of minutes instead of days.

In this scenario, we explore a common use case: optimizing and transforming event data in the JSON format. The data, which contains nested fields, moves from the raw zone of your data to the staging zone, and then to the refined zone for further analysis by engines such as Snowflake.

The following steps illustrate how to create a data pipeline that reads from a Kafka raw zone, stages the data in the staging zone, and then performs basic transformations before writing the results to a refined zone such as Snowflake, from where you can query the results.

  1. Connect to Kafka in the raw zone

  2. Create a staging table

  3. Copy from Kafka

  4. Connect to your Snowflake EDW

  5. Create an output table for refined data

  6. Prepare your data for the refined zone

  7. Read your data from Snowflake

Before you begin

SQLake runs on AWS Virtual Private Cloud (VPC). An Upsolver Customer Service representative can help you set up your user account and deploy SQLake on AWS.

When you first deploy SQLake in the customer VPC, you create an Identity and Access Management (IAM) role that gives you access to the AWS resources you might need to build data pipelines. You can find more about these permissions here. In this scenario, our cluster has permissions to read and write to the AWS Glue Data Catalog and to S3.

If your Kafka cluster is in a VPC different from the one created for Upsolver and isn’t accessible via a public IP, you must create a VPC peering connection. Here’s a guide explaining how to do so.

Step 1

Connecting to Kafka in the raw zone

Create a Kafka connection to consume data from your cluster. This connection gives you the ability to configure the AWS IAM credentials, bucket names, and prefixes that SQLake needs to access the data. All you need to know to do this is familiar SQL syntax.

Create a Kafka connection:

CREATE KAFKA CONNECTION my_kafka_connection
   HOSTS = ('pkc-2396y.us-east-1.aws.confluent.cloud:9092')
   CONSUMER_PROPERTIES = '
     bootstrap.servers=pkc-2396y.us-east-1.aws.confluent.cloud:9092
     security.protocol=SASL_SSL
     sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username="XXXXXXXX"   password="-----------";
     ssl.endpoint.identification.algorithm=https
     sasl.mechanism=PLAIN;

Step 2

Creating a staging table

You can use SQLake to stage your data. To do this, create a job in SQLake that reads from Kafka and creates a table with the appropriate schema in the AWS Glue Data Catalog.

Here’s the code to create a staging table to store your raw data:

CREATE TABLE default_glue_catalog.default.kafka_staging_table;

Step 3

Copying from Kafka

Create a job to extract the raw data from Kafka and load it into the staging table.

There are several types of jobs you can create to do this -- for example, copying the data from the source to your destination. To copy the data to the staging zone, use the COPY FROM job type.

The SQL statement below creates a job that reads data from the Kafka topic to a staging location and catalogs the metadata in the Glue Data Catalog. Note that Upsolver automatically creates a Glue Data Catalog table and uses the name you designate in the INTO statement for the database and table names. It also creates an S3 prefix in your source S3 bucket using the name given in the PREFIX statement.

Create a COPY FROM job:

CREATE JOB <KAFKA_STAGING_JOB>
    START_FROM = NOW
    CONTENT_TYPE = AUTO
    /* You're copying from a Kafka topic and streaming the data to the staging table created in step #2 */
    AS COPY FROM KAFKA <KAFKA_CONNECTION_NAME> TOPIC = '<KAFKA_TOPIC_NAME>' 
    INTO default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>; 

Step 4

Connecting to your Snowflake EDW

Upsolver uses a JDBC connection to write data to Snowflake. This connection creates temporary tables in Snowflake while the data is continuously streamed. It also merges delta lakes into Snowflake tables. Upsolver automatically manages the merging process. This essentially means that you no longer must maintain Delta files or tables. Utlimately this eliminates the need for large-scale, full table scans on Snowflake that can be slow or expensive.

Here’s the code for creating that connection:

CREATE SNOWFLAKE CONNECTION SFCommerce
       CONNECTION_STRING = 'jdbc:snowflake://baa12345.us-east-1.snowflakecomputing.com/?db=DEMO_DB'
       USER_NAME = 'demouser'
       PASSWORD = 'demopass';

Step 5

Creating an output table for refined data

To start transforming your data, choose from 3 types of transformation jobs:

  1. INSERT

  2. MERGE

  3. DELETE

To take full advantage of these jobs, we recommend that you explicitly create your output table, which enables you to define partitions, primary keys, and more.

You don’t need to define all of your output columns, as SQLake automatically adds any missing columns. If you do want to strictly control which columns to add, then when you CREATE JOB you can define them in the CREATE TABLE statement and set the ADD_MISSING_COLUMNS job property to FALSE.

Here's example code for creating an output table in Snowflake:

CREATE or REPLACE TABLE <snowflake schema>.<snowflake table> (
    <column_name> <data_type>, 
    <column_name> <data_type>, 
    <column_name> <data_type> 

Step 6

Prepare your data for the refined zone

Now that you have defined the output table, create an INSERT job to read data from the staging table and write the results into the target table inside your refined zone.

Here's the code for creating an INSERT job to read from staging and write to the refined zone:

CREATE JOB "export aggregated orders to Snowflake 3"
       RUN_INTERVAL = 1 MINUTE
       START_FROM = NOW
    AS INSERT INTO SNOWFLAKE "SFCommerce"."DEMO"."ORDERS_AGGREGATED" MAP_COLUMNS_BY_NAME
    SELECT buyeremail as BUYER_EMAIL,
       AVG(nettotal) as AVG_ORDER_TOTAL, 
       SUM(nettotal) as SUM_ORDER_TOTAL
    FROM Athena.commerce.orders_staging
    WHERE ($commit_time between execution_start_time() AND execution_end_time())
    AND eventtype = 'ORDER'
    GROUP BY buyeremail;

Step 7

Read your data in Snowflake

To view your pipeline results you can query in Snowflake. Use SELECT to query your table.

SELECT * FROM <SNOWFLAKE_SCHEMA>.<SNOWFLAKE_TABLE_NAME> LIMIT 10;

Conclusion

This guide showed how to take raw data from Kafka, stage the data, and then transform it to your specific business needs. We modeled this approach after a common data lake design pattern, in which data transitions through the raw, staging, and refined zones. By adopting and implementing familiar SQL syntax, you can use SQLake to create data pipelines and organize your data to more easily perform analytics and ML. And as your business needs evolve, so can your data. In the future, you can create additional jobs that use the staging table as the source of creativity and innovation, while your pipelines indefinitely keep your data fresh.

Try it yourself

To launch the template:

  1. Launch SQLake by navigating to https://db.upsolver.com

  2. Sign up by entering your email address and setting a password.

  3. After you click through the onboarding panel, a panel with a list of templates display.

  4. Select the Kafka to Snowflake template.

Last updated