LogoLogo
OverviewQuickstartsHow To GuidesReferenceArticlesSupport
How To Guides
How To Guides
  • How To Guides
  • SETUP
    • Deploy Upsolver on AWS
      • Deployment Guide
      • AWS Role Permissions
      • VPC Peering Guide
      • Role-Based AWS Credentials
    • Enable API Integration
    • Install the Upsolver CLI
  • CONNECTORS
    • Create Connections
      • Amazon Kinesis
      • Amazon Redshift
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • ClickHouse
      • Confluent Cloud
      • Elasticsearch
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • PostgreSQL
      • Snowflake
      • Tabular
    • Configure Access
      • Amazon Kinesis
      • Amazon S3
      • Apache Kafka
      • AWS Glue Data Catalog
      • Confluent Kafka
    • Enable CDC
      • Microsoft SQL Server
      • MongoDB
      • MySQL
      • PostgreSQL
  • JOBS
    • Basics
      • Real-time Data Ingestion — Amazon Kinesis to ClickHouse
      • Real-time Data Ingestion — Amazon S3 to Amazon Athena
      • Real-time Data Ingestion — Apache Kafka to Amazon Athena
      • Real-time Data Ingestion — Apache Kafka to Snowflake
    • Advanced Use Cases
      • Build a Data Lakehouse
      • Enriching Data - Amazon S3 to ClickHouse
      • Joining Data — Amazon S3 to Amazon Athena
      • Upserting Data — Amazon S3 to Amazon Athena
      • Aggregating Data — Amazon S3 to Amazon Athena
      • Managing Data Quality - Ingesting Data with Expectations
    • Database Replication
      • Replicate CDC Data into Snowflake
      • Replicate CDC Data to Multiple Targets in Snowflake
      • Ingest Your Microsoft SQL Server CDC Data to Snowflake
      • Ingest Your MongoDB CDC Data to Snowflake
      • Handle PostgreSQL TOAST Values
    • VPC Flow Logs
      • Data Ingestion — VPC Flow Logs
      • Data Analytics — VPC Flow Logs
    • Job Monitoring
      • Export Metrics to a Third-Party System
    • Data Observability
      • Observe Data with Datasets
  • DATA
    • Query Upsolver Iceberg Tables from Snowflake
  • APACHE ICEBERG
    • Analyze Your Iceberg Tables Using the Upsolver CLI
    • Optimize Your Iceberg Tables
Powered by GitBook
On this page
  • Before you begin
  • Step 1
  • Connecting to Apache Kafka in the raw zone
  • Step 2
  • Staging the data in the staging zone
  • Step 3
  • Ingest data from Kafka
  • Step 4
  • Create an output table in Amazon Athena
  • Step 5
  • Prepare your data for the refined zone
  • Read your data from Amazon Athena
  • Conclusion
  • Try it yourself
  1. JOBS
  2. Basics

Real-time Data Ingestion — Apache Kafka to Amazon Athena

This guide explores a common use case: optimizing and transforming event data from Apache Kafka to Amazon Athena.

Last updated 11 months ago

Apache Kafka is an open-source distributed event streaming platform set apart by its ability to handle a high volume of data from various systems and applications. Popular data warehouses such as Snowflake and Amazon Redshift take data from Kafka and enable you to run analytics and derive insight. However, these warehouses are designed to support batch processing. They also struggle with data consistency, particularly when there’s a constant flow of evolving data. This can increase both the cost of preparing data and the time it takes to design, implement, and maintain your data architecture.

This guide uses example data that contains nested fields. It shows how you move data from the raw zone of your data lake to the staging zone, and then to the refined zone for further analysis by engines like Snowflake.

The raw zone is used to store all data. As the name implies, the data in question is uncatalogued, uncleaned, unverified, and unoptimized for analysis. At this point, the data is only being collected and stored.

The staging zone is where Upsolver automatically catalogs data for easy discovery and converts it to an optimized file format. Within this file format, compression is improved, in turn reducing storage costs. It also increases data access, which improves the read performance. The data has still not been transformed, but you can define data pipelines that read and transform the data from this zone for storage in the refined zone.

Finally, Upsolver automatically optimizes data in the refined zone. First, it converts the files into Apache Parquet, then partitions the data by date. Following this, the schema evolves over time as Upsolver continuously compacts small files. The end result is a fully-optimized data lake that is prepared to produce queries in a matter of minutes instead of days.

The following steps explain how to create a data pipeline that reads from a Kafka raw zone, stages the data in the staging zone, and then performs basic transformations before writing the results to a refined zone such as Snowflake, from where you can query the results.

Before you begin

In this scenario, your cluster has the permissions to read and write to the AWS Glue Data Catalog and to S3. However, if your Kafka cluster is in a VPC different from the one created for Upsolver and isn’t accessible via a public IP, you must create a VPC peering connection.

The procedure for optimizing and transforming event data from Apache Kafka to Amazon Athena consists of 5 steps:

  1. Connecting to Apache Kafka in the raw zone

  2. Staging the data in the staging zone

  3. Copying from Kafka

  4. Creating an output table in Amazon Athena

  5. Preparing your data for the refined zone and reading your data from Amazon Athena

Step 1

Connecting to Apache Kafka in the raw zone

Create a Kafka connection to consume data from your cluster. This connection gives you the ability to configure the AWS IAM credentials, bucket names, and prefixes that Upsolver needs to access the data. To do this, all you need to know is familiar SQL syntax.

Create a Kafka connection:

CREATE KAFKA CONNECTION my_kafka_connection
   HOSTS = ('pkc-2396y.us-east-1.aws.confluent.cloud:9092')
   CONSUMER_PROPERTIES = '
      bootstrap.servers=pkc-2396y.us-east-1.aws.confluent.cloud:9092
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
           required username="XXXXXXXX"   password="-----------";
      ssl.endpoint.identification.algorithm=https
      sasl.mechanism=PLAIN;

Step 2

Staging the data in the staging zone

You can use Upsolver to stage your data. To achieve this, create a job in Upsolver that reads from Kafka and creates a table with the appropriate schema in the AWS Glue Data Catalog.

Here’s the code to create a staging table to store your raw data:

CREATE TABLE default_glue_catalog.default.kafka_staging_table;

Step 3

Ingest data from Kafka

Create a job to extract that data from Kafka and load it onto the staging table.

The following SQL statement creates a job that reads data from the Kafka topic to a staging location and catalogs the metadata in the AWS Glue Data Catalog. SQLake automatically creates a Glue Data Catalog table and uses the name you designate in the INTO statement for the database and table names. It also creates an S3 prefix in your source S3 bucket using the name given in the PREFIX statement.

Create an ingestion job:

CREATE JOB <KAFKA_STAGING_JOB>
    START_FROM = NOW
    CONTENT_TYPE = AUTO
AS COPY FROM KAFKA <KAFKA_CONNECTION_NAME> 
    TOPIC = '<KAFKA_TOPIC_NAME>' 
INTO default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>;

Step 4

Create an output table in Amazon Athena

With a typical installation, Upsolver automatically creates a connection to the AWS Glue Data Catalog, which displays in your navigation tree. However, you can manually create a new connection using the following SQL:

Create an output table in AWS Glue Data Catalog:

CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data
  (partition_date date)
PARTITION BY partition_date;

You don’t need to define all of your output columns, as Upsolver automatically adds any missing columns. If you do want to strictly control which columns to add, then when you use CREATE JOB, you can define them in the CREATE TABLE statement and set the ADD_MISSING_COLUMNS job property to FALSE.

Step 5

Prepare your data for the refined zone

  1. INSERT

  2. MERGE

  3. DELETE

To take full advantage of these jobs, we recommend that you explicitly create your output table, which enables you to define partitions, primary keys, and more.

After you define the output table, create an INSERT job to read data from the staging table and write the results into the target table inside your refined zone.

Create an INSERT job to read from staging and write to the refined zone:

CREATE JOB transform_orders_and_insert_into_athena_kinesis
    START_FROM = NOW
    ADD_MISSING_COLUMNS = TRUE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data 
  MAP_COLUMNS_BY_NAME
    SELECT 
       -- rename columns
       orderid AS order_id, 
       -- hash or mask columns using built-in functions
       MD5(buyeremail) AS customer_id, 
       nettotal AS total, 
       -- populate the partition column with the processing time of the event, 
       -- and automatically casted to DATE type
       $commit_time AS partition_date 
    FROM default_glue_catalog.default.kinesis_staging_table 
    WHERE eventtype = 'ORDER' 
    AND $commit_time BETWEEN execution_start_time() AND execution_end_time();

Read your data from Amazon Athena

To view your pipeline results, query your table using SELECT.

SELECT * 
FROM default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> 
LIMIT 10;Conclusion

Conclusion

This how-to guide explained how to take raw data from Kafka, stage the data, and then transform it to meet your business needs. The approach was modeled after a common data lake design pattern, in which data transitions through raw, staging, and refined zones. Using only familiar SQL syntax, you can use SQLake to create data pipelines and organize your data to more easily perform analytics and ML.

And as your business needs evolve, so can your data. In the future, you can create additional jobs that use the staging table as the source of creativity and innovation, while your pipelines indefinitely keep your data fresh.

Try it yourself

To launch the template:

  1. Sign up by entering your email address and setting a password.

  2. After you click through the onboarding panel, a panel with a list of template is displayed.

  3. Select the Kafka to Athena template.

Upsolver runs on AWS Virtual Private Cloud (VPC). Learn how to and .

When you first deploy Upsolver in the customer VPC, you create an Identity and Access Management (IAM) role that gives you access to any AWS resources you might require to build data pipelines. See for more information.

There are several types of jobs you can create, such as copying the data from the source to your destination. To ingest data to the staging zone, use the command.

When you build your data lake you should maintain schemas, partitions, and other relevant metadata information as new data arrives and existing data changes. It is ideal to maintain this metadata in a central catalog that data practitioners and query engines can access to find and query the data. The fulfils this purpose. The AWS Glue Data Catalog is a fully managed metadata store integrated with a wide range of tools and services, including Upsolver.

You’re ready to start transforming your data. To do this, you can choose from three types of jobs:

Launch Upsolver by navigating to

AWS Role Permissions
AWS Glue Data Catalog
https://db.upsolver.com
Deploy Upsolver on AWS
Start Your Free Trial
The process of transforming data in data lakes
transformation
COPY FROM