ClickHouse

Job options

[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ START_FROM = { NOW | BEGINNING | timestamp } ]
[ END_AT = { NOW | timestamp } ]
[ RUN_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ RUN_PARALLELISM = <integer> ]

Jump to

General job options:

Prerequisites

  • Ensure your ClickHouse connection is set up and accessible

  • Pre-define your target ClickHouse table, keeping in mind that schema evolution is not currently supported. Fields that might not always have values should be marked as nullable. Learn more about nullable fields in ClickHouse.

  • Currently tested with ClickHouse Server Version 24.1

Creating a Target Table in ClickHouse

Before executing your job, the target table in ClickHouse needs to be predefined.

Example:

CREATE TABLE default.orders
(
    `netTotal` Float64,
    `orderDate` DateTime64(3),
    `orderId` String,
    `orderType` Nullable(String),
    `taxRate` Nullable(Float64),
    `customer` Tuple(email String, firstName String, lastName String, address Tuple(address1 String, address2 Nullable(String), city String, postCode String, state String)),
    `data` Tuple(items Array(Tuple(category String, itemId Int64, name String, quantity Int64, unitPrice Float64))),
    `shippingInfo` Tuple(method Nullable(String), recipient Tuple(firstName Nullable(String), lastName Nullable(String)))
)
ENGINE = MergeTree
ORDER BY orderId
SETTINGS non_replicated_deduplication_window = 300;

Supported ClickHouse Data Types

Upsolver supports writing to the following defined types:

  • Basic Types:

    • Integers (Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64)

    • Floating-Point Numbers (Float32, Float64)

    • Decimal Numbers (Decimal32, Decimal64, Decimal128)

    • Date and Time (Date, DateTime, DateTime64)

    • Strings (String, FixedString)

    • Bool

    • UUID

    • Enums (Enum8, Enum16)

    • LowCardinality

  • Complex Types:

    • Arrays

    • Tuples

    • Maps

Parallelism Support

To prevent data duplication during retries, we utilize a deduplication token that relies on the "deduplication window" setting. This setting must be defined for target tables and specifies the number of the most recently inserted blocks for which hash sums are stored. This mechanism restricts the number of concurrent insert operations by checking for duplicates using these hash sums.

The higher the deduplication window number, the greater the concurrency that can be safely supported, ensuring both data integrity and operational efficiency.

Required Setting:

  • Non-Replicated Tables: non_replicated_deduplication_window

  • Replicated Tables: replicated_deduplication_window

Upsert Support

Selecting an engine such as ReplacingMergeTree, CollapsingMergeTree, or VersionedCollapsingMergeTree enables ClickHouse to perform upserts, where duplicates are merged based on the table's sorting key. While this ensures data integrity by preventing duplicate entries, it may limit parallelism as data must be inserted in sequence to maintain order.

Examples

Ingestion job

CREATE SYNC JOB copy_s3_to_clickhouse
    CONTENT_TYPE = PARQUET
    DATE_PATTERN = 'yyyy/MM/dd/HH/'
    START_FROM = BEGINNING
AS COPY FROM s3_connection 
    LOCATION = 's3://bucket_name/data/'
INTO clickhouse_connection.db_name.target_table;

Transformation job

CREATE SYNC JOB insert_into_clickhouse
  START_FROM = BEGINNING
  RUN_INTERVAL = 1 MINUTE
  RUN_PARALLELISM = 20
AS INSERT INTO CLICKHOUSE clickhouse_conn.db_name.target_table 
  MAP_COLUMNS_BY_NAME
    SELECT 
      orders,
      MD5(customer_email) AS customer_id,
      ARRAY_SUM(data.items[].quantity * data.items[].price) AS net_total
    FROM default_glue_catalog.schema_name.clickhouse_staging
    WHERE TIME_FILTER();

Last updated