MERGE

A MERGE job defines a query that pulls in a set of data based on the given SELECT statement and inserts into, replaces, or deletes the data from the designated target based on the job definition. This query is then run periodically based on the RUN_INTERVAL defined within the job.

Using the condition provided in the ON clause, the query can insert records that do not meet the condition and replace those that do. Additionally, records that match the ON condition and a delete condition can be deleted.

Additionally, if two jobs writing to the same table rewrite the same record, it is nondeterministic as to which job's data ends up in the table.

Note that MERGE statements are only supported for target tables with primary key constraints.

Use the MERGE command if you need to:

  • Delete records

  • Update based on a unique constraint

  • Update data in a non-data lake table, like Snowflake or Elasticsearch

Syntax

CREATE [SYNC] JOB <job_identifier>
    [ COMMENT = '<comment>' ]
    { job_options }
AS MERGE INTO <target_definition> [ [ AS ] <alias> ]
    USING (<select_statement>) [ [ AS ] <alias> ]
    [ ON <column_condition> [ AND <column_condition> ... ] ]
    [ WHEN MATCHED AND <delete_condition> THEN DELETE ]
    WHEN MATCHED THEN REPLACE
    WHEN NOT MATCHED THEN INSERT 
        [ { MAP_COLUMNS_BY_NAME [ EXCEPT <column_name> [, ...] ]
         | (<column_name> [, ...]) values (<expression> [, ...])] }];

Jump to

Job identifier

Valid identifiers match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

Target location

Target definition

{ <table_identifier>
| | { S3 | REDSHIFT | SNOWFLAKE }  
    <catalog_name>.<schema_name>.<table_name>
} 

ON clause

If the ON statement is omitted, there must be a natural join for all of the primary key and partition columns. Otherwise, there should be a column_condition for each primary key of the target table.

For tables without globally unique keys, there must also be a column_condition for each partition column.

{ <column_name> = <expression>
| <column_name> IN (<expression> [, ...])
| <expression> = <column_name> };                                         

column_name must be a primary key of the target table.

expression must only reference columns in the SELECT statement.

MAP_COLUMNS_BY_NAME

The MAP_COLUMNS_BY_NAME keyword maps columns from the SELECT statement to the table by the names of the returned columns in the query. Columns listed after EXCEPT are excluded from the final table.

When using MAP_COLUMNS_BY_NAME, the columns matched from the target and source tables in the ON condition (if not omitted) must share the same name.

If a column list is provided instead of MAP_COLUMNS_BY_NAME, it should contain all primary keys and partition columns, and their mapping should be identical to the mapping in the ON clause.

If nothing is specified, fields are mapped by ordinal position in the query, and fields mapped to each special column must match exactly the ON clause.

More information on MAP_COLUMNS_BY_NAME.

EXCEPT

Columns listed after the EXCEPT keyword are not written to your target table.

For example, you may have a column to_delete; in order to use this column as your delete_condition, you need to include it within the SELECT statement. However, there is likely no meaning in having this column itself within your final target table, so using EXCEPT to_delete allows you to have it excluded from the final output.

Example

Write into a data lake table

CREATE SYNC JOB merge_orders_upsert
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = TRUE
    RUN_INTERVAL = 1 MINUTE
AS MERGE INTO 
	default_glue_catalog.upsolver_samples.orders_upsert_with_merge AS target
    /*
    	Use the SELECT statement below to choose your columns and 
        performed the desired transformations.	
	
	In this example, we aggregate the sample orders data by customer and 
	filter it to only include repeat purchasers.
    */            
    USING (SELECT customer_email, 
	          COUNT(DISTINCT orderid) AS number_of_orders,
	          SUM(nettotal) AS total_sales,
   	          MIN(orderdate) AS first_purchase,
	          MAX(orderdate) AS last_purchase
	       FROM default_glue_catalog.upsolver_samples.orders_raw_data
	       WHERE time_filter()
	       GROUP BY 1
	       HAVING COUNT(DISTINCT orderid::string) > 1) source
     ON (target.customer_email = source.customer_email)
     WHEN MATCHED THEN REPLACE -- Update if primary keys match
     WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

Note that since there is a selected column matching the name of the primary key column in the table we are merging into, the ON clause here is optional.

Last updated