INSERT

An INSERT job defines a query that pulls in a set of data based on the given SELECT statement and inserts it into the designated target. This query is then run periodically based on the RUN_INTERVAL defined within the job.

By default the INSERT job simply appends the new data to the target table. For data lake tables, those created using CREATE TABLE in Upsolver and managed by AWS Glue Data Catalog, when a primary key is defined the job will perform an upsert operation (update if PK exist, insert if it does not).

When writing to non-data lake targets, such as Snowflake and Redshift, the INSERT job will not recognize a primary key, if defined on the target table, and will only perform an INSERT operation. If you want to upsert, refer to the MERGE command.

Additionally, if two jobs writing to the same table rewrite the same primary key, it is nondeterministic as to which job's data ends up in the table and is thus not recommended.

Syntax

CREATE [SYNC] JOB <job_identifier>
    [ COMMENT = '<comment>' ]
    { job_options }
AS INSERT INTO { target_definition }
        [ { (<column_name> [ , ... ] ) 
          | MAP_COLUMNS_BY_NAME [ EXCEPT <column_name> [, ...] ] } ]
        { <select_statement> 
        | UNNEST(<select_statement>) };

Jump to

Job identifier

Valid identifiers match the following format:

identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;

Job options

Target location

Target definition

{ <table_identifier>
| SNOWFLAKE <catalog_name>.<schema_name>.<table_name>
| REDSHIFT <catalog_name>.<schema_name>.<table_name>
| S3 BUCKET = <bucket_name>
| ELASTICSEARCH 
} 

MAP_COLUMNS_BY_NAME

The MAP_COLUMNS_BY_NAME keyword maps columns from the SELECT statement to the table by the names of the returned columns in the query. Columns listed after EXCEPT are excluded from the final table.

When MAP_COLUMNS_BY_NAME is set, the SELECT statement must provide an alias for each field or they must be a simple column reference or a star expression. The columns are then mapped to the fields of the same name in the table.

Otherwise columns are mapped by ordinal positions with reference to the provided column list or the order of the columns in the table.

When using a star query in the SELECT statement, MAP_COLUMNS_BY_NAME is required. Additionally, all primary key and partition columns must be mapped.

EXCEPT

Columns listed after the EXCEPT keyword are not written to your target table.


Examples

Amazon S3

CREATE SYNC JOB insert_into_s3
    START_FROM = BEGINNING
    FILE_FORMAT = (type = CSV)
    COMPRESSION = GZIP
    DATE_PATTERN = 'yyyy-MM-dd-HH-mm'
    RUN_INTERVAL = 1 MINUTE
    COMPUTE_CLUSTER = "Default Compute"
AS INSERT INTO S3 s3_output_connection 
    LOCATION = 's3://your-bucket-name/path/to/folder/'
    -- Use the SELECT statement to choose columns from the source and 
    -- implement your business logic transformations.
    SELECT 
      column1 AS s3_column1, 
      MD5(column2) AS s3_column2 -- hash or mask columns using built-in functions
    FROM default_glue_catalog.your_schema.your_raw_data_table
    WHERE time_filter();

Data lake table

CREATE SYNC JOB transform_orders_and_insert_into_athena
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = true	
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data 
    MAP_COLUMNS_BY_NAME
    -- Use the SELECT statement to choose columns from the source and 
    -- implement your business logic transformations.
    SELECT 
      -- rename columns
      orderid AS order_id, 
      -- hash or mask columns using built-in functions
      MD5(customer_email) AS customer_id, 
      -- computed field defined later in the query
      customer_name,  
      nettotal AS total, 
      -- populate the partition column with the processing time of the event, 
      -- and automatically cast to DATE type
      $commit_time AS partition_date 
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    -- create a computed column
    LET customer_name = customer_firstname || ' ' || customer_lastname 
    WHERE ordertype = 'SHIPPING' 
    AND time_filter();

Elasticsearch

CREATE SYNC JOB insert_into_elasticsearch_index
   RUN_INTERVAL = 1 MINUTE
   START_FROM = BEGINNING
   COMMENT = 'insert data into Elasticsearch index'
AS INSERT INTO ELASTICSEARCH your_elasticsearch_connection 
   PREFIX = 'orders'
      SELECT customer_id,
             COUNT(DISTINCT order_id) AS num_orders,
             SUM(net_total) AS total_spent,
             MIN(order_date) AS first_purchase,
             MAX(order_date) AS last_purchase
      FROM default_glue_catalog.your_database.staging_table
      WHERE time_filter()
      GROUP BY customer_id;

PostgreSQL

CREATE JOB load_data_to_postgres
    START_FROM = BEGINNING
AS INSERT INTO POSTGRES your_postgres_connection.your_schema.target_table 
    MAP_COLUMNS_BY_NAME            
      SELECT orderid AS app_name
      FROM default_glue_catalog.your_database.orders
      WHERE time_filter();

Snowflake

CREATE SYNC JOB load_data_to_snowflake
   RUN_INTERVAL = 1 MINUTE
   START_FROM = BEGINNING
   COMMENT = 'insert data into snowflake target table'
AS INSERT INTO SNOWFLAKE your_snowflake_connection.your_schema.target_table 
   MAP_COLUMNS_BY_NAME
      SELECT customer_id,
             COUNT(DISTINCT order_id) AS num_orders,
             SUM(net_total) AS total_spent,
             MIN(order_date) AS first_purchase,
             MAX(order_date) AS last_purchase
      FROM default_glue_catalog.your_database.staging_table
      WHERE time_filter()
      GROUP BY customer_id;

Last updated