An INSERT job defines a query that pulls in a set of data based on the given SELECT statement and inserts it into the designated target. This query is then run periodically based on the RUN_INTERVAL defined within the job.
By default the INSERT job simply appends the new data to the target table. For data lake tables, those created using CREATE TABLE in Upsolver and managed by AWS Glue Data Catalog, when a primary key is defined the job will perform an upsert operation (update if PK exist, insert if it does not).
When writing to non-data lake targets, such as Snowflake and Redshift, the INSERT job will not recognize a primary key, if defined on the target table, and will only perform an INSERT operation. If you want to upsert, refer to the MERGE command.
Additionally, if two jobs writing to the same table rewrite the same primary key, it is nondeterministic as to which job's data ends up in the table and is thus not recommended.
The MAP_COLUMNS_BY_NAME keyword maps columns from the SELECT statement to the table by the names of the returned columns in the query. Columns listed after EXCEPT are excluded from the final table.
When MAP_COLUMNS_BY_NAME is set, the SELECT statement must provide an alias for each field or they must be a simple column reference or a star expression. The columns are then mapped to the fields of the same name in the table.
Otherwise columns are mapped by ordinal positions with reference to the provided column list or the order of the columns in the table.
When using a star query in the SELECT statement, MAP_COLUMNS_BY_NAME is required. Additionally, all primary key and partition columns must be mapped.
EXCEPT
Columns listed after the EXCEPT keyword are not written to your target table.
Examples
Amazon S3
CREATE SYNC JOB insert_into_s3 START_FROM = BEGINNINGFILE_FORMAT= (type= CSV)COMPRESSION= GZIP DATE_PATTERN ='yyyy-MM-dd-HH-mm' RUN_INTERVAL =1MINUTE COMPUTE_CLUSTER ="Default Compute"ASINSERT INTO S3 s3_output_connection LOCATION='s3://your-bucket-name/path/to/folder/'-- Use the SELECT statement to choose columns from the source and -- implement your business logic transformations.SELECT column1 AS s3_column1, MD5(column2) AS s3_column2 -- hash or mask columns using built-in functionsFROM default_glue_catalog.your_schema.your_raw_data_tableWHERE time_filter();
Data lake table
CREATE SYNC JOB transform_orders_and_insert_into_athena START_FROM = BEGINNING ADD_MISSING_COLUMNS = true RUN_INTERVAL =1MINUTEASINSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME-- Use the SELECT statement to choose columns from the source and -- implement your business logic transformations.SELECT-- rename columns orderid AS order_id, -- hash or mask columns using built-in functions MD5(customer_email) AS customer_id, -- computed field defined later in the query customer_name, nettotal AS total, -- populate the partition column with the processing time of the event, -- and automatically cast to DATE type $commit_time AS partition_date FROM default_glue_catalog.upsolver_samples.orders_raw_data-- create a computed column LET customer_name = customer_firstname ||' '|| customer_lastname WHERE ordertype ='SHIPPING'AND time_filter();
Elasticsearch
CREATE SYNC JOB insert_into_elasticsearch_index RUN_INTERVAL =1MINUTE START_FROM = BEGINNING COMMENT ='insert data into Elasticsearch index'ASINSERT INTO ELASTICSEARCH your_elasticsearch_connection PREFIX ='orders'SELECT customer_id,COUNT(DISTINCT order_id) AS num_orders,SUM(net_total) AS total_spent,MIN(order_date) AS first_purchase,MAX(order_date) AS last_purchaseFROM default_glue_catalog.your_database.staging_tableWHERE time_filter()GROUP BY customer_id;
PostgreSQL
CREATE JOB load_data_to_postgres START_FROM = BEGINNINGASINSERT INTO POSTGRES your_postgres_connection.your_schema.target_table MAP_COLUMNS_BY_NAME SELECT orderid AS app_nameFROM default_glue_catalog.your_database.ordersWHERE time_filter();
Snowflake
CREATE SYNC JOB load_data_to_snowflake RUN_INTERVAL =1MINUTE START_FROM = BEGINNING COMMENT ='insert data into snowflake target table'ASINSERT INTO SNOWFLAKE your_snowflake_connection.your_schema.target_table MAP_COLUMNS_BY_NAMESELECT customer_id,COUNT(DISTINCT order_id) AS num_orders,SUM(net_total) AS total_spent,MIN(order_date) AS first_purchase,MAX(order_date) AS last_purchaseFROM default_glue_catalog.your_database.staging_tableWHERE time_filter()GROUP BY customer_id;