ClickHouse
Job options
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ START_FROM = { NOW | BEGINNING | timestamp } ]
[ END_AT = { NOW | timestamp } ]
[ RUN_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ RUN_PARALLELISM = <integer> ]
Jump to
General job options:
Prerequisites
Ensure your ClickHouse connection is set up and accessible
Pre-define your target ClickHouse table, keeping in mind that schema evolution is not currently supported. Fields that might not always have values should be marked as nullable. Learn more about nullable fields in ClickHouse.
Currently tested with ClickHouse Server Version 24.1
Creating a target table in ClickHouse
Before executing your job, the target table in ClickHouse needs to be predefined.
Example:
CREATE TABLE default.orders
(
`netTotal` Float64,
`orderDate` DateTime64(3),
`orderId` String,
`orderType` Nullable(String),
`taxRate` Nullable(Float64),
`customer` Tuple(email String, firstName String, lastName String, address Tuple(address1 String, address2 Nullable(String), city String, postCode String, state String)),
`data` Tuple(items Array(Tuple(category String, itemId Int64, name String, quantity Int64, unitPrice Float64))),
`shippingInfo` Tuple(method Nullable(String), recipient Tuple(firstName Nullable(String), lastName Nullable(String)))
)
ENGINE = MergeTree
ORDER BY orderId
SETTINGS non_replicated_deduplication_window = 300;
Supported ClickHouse data types
Upsolver supports writing to the following defined types:
Basic Types:
Integers (Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64)
Floating-Point Numbers (Float32, Float64)
Decimal Numbers (Decimal32, Decimal64, Decimal128)
Date and Time (Date, DateTime, DateTime64)
Strings (String, FixedString)
Bool
UUID
Enums (Enum8, Enum16)
LowCardinality
Complex Types:
Arrays
Tuples
Maps
Parallelism support
To prevent data duplication during retries, we utilize a deduplication token that relies on the "deduplication window" setting. This setting must be defined for target tables and specifies the number of the most recently inserted blocks for which hash sums are stored. This mechanism restricts the number of concurrent insert operations by checking for duplicates using these hash sums.
The higher the deduplication window number, the greater the concurrency that can be safely supported, ensuring both data integrity and operational efficiency.
Required Setting:
Non-Replicated Tables:
non_replicated_deduplication_window
Replicated Tables:
replicated_deduplication_window
Upsert support
Selecting an engine such as ReplacingMergeTree
, CollapsingMergeTree
, or VersionedCollapsingMergeTree
enables ClickHouse to perform upserts, where duplicates are merged based on the table's sorting key. While this ensures data integrity by preventing duplicate entries, it may limit parallelism as data must be inserted in sequence to maintain order.
Examples
Ingestion job
CREATE SYNC JOB copy_s3_to_clickhouse
CONTENT_TYPE = PARQUET
DATE_PATTERN = 'yyyy/MM/dd/HH/'
START_FROM = BEGINNING
AS COPY FROM s3_connection
LOCATION = 's3://bucket_name/data/'
INTO clickhouse_connection.db_name.target_table;
Transformation job
CREATE SYNC JOB insert_into_clickhouse
START_FROM = BEGINNING
RUN_INTERVAL = 1 MINUTE
RUN_PARALLELISM = 20
AS INSERT INTO CLICKHOUSE clickhouse_conn.db_name.target_table
MAP_COLUMNS_BY_NAME
SELECT
orders,
MD5(customer_email) AS customer_id,
ARRAY_SUM(data.items[].quantity * data.items[].price) AS net_total
FROM default_glue_catalog.schema_name.clickhouse_staging
WHERE TIME_FILTER();
Last updated