An INSERT job defines a query that pulls in a set of data based on the given SELECT statement and inserts it into the designated target. This query is then run periodically based on the RUN_INTERVAL defined within the job.
By default the INSERT job simply appends the new data to the target table. For data lake tables, those created using CREATE TABLE in Upsolver and managed by AWS Glue Data Catalog, when a primary key is defined the job will perform an upsert operation (update if PK exist, insert if it does not).
When writing to non-data lake targets, such as Snowflake and Redshift, the INSERT job will not recognize a primary key, if defined on the target table, and will only perform an INSERT operation. If you want to upsert, refer to the MERGE command.
Additionally, if two jobs writing to the same table rewrite the same primary key, it is nondeterministic as to which job's data ends up in the table and is thus not recommended.
The MAP_COLUMNS_BY_NAME keyword maps columns from the SELECT statement to the table by the names of the returned columns in the query. Columns listed after EXCEPT are excluded from the final table.
When MAP_COLUMNS_BY_NAME is set, the SELECT statement must provide an alias for each field or they must be a simple column reference or a star expression. The columns are then mapped to the fields of the same name in the table.
Otherwise columns are mapped by ordinal positions with reference to the provided column list or the order of the columns in the table.
When using a star query in the SELECT statement, MAP_COLUMNS_BY_NAME is required. Additionally, all primary key and partition columns must be mapped.
EXCEPT
Columns listed after the EXCEPT keyword are not written to your target table.
Examples
Amazon S3
CREATE SYNC JOB insert_into_s3
START_FROM = BEGINNING
FILE_FORMAT = (type = CSV)
COMPRESSION = GZIP
DATE_PATTERN = 'yyyy-MM-dd-HH-mm'
RUN_INTERVAL = 1 MINUTE
COMPUTE_CLUSTER = "Default Compute"
AS INSERT INTO S3 s3_output_connection
LOCATION = 's3://your-bucket-name/path/to/folder/'
-- Use the SELECT statement to choose columns from the source and
-- implement your business logic transformations.
SELECT
column1 AS s3_column1,
MD5(column2) AS s3_column2 -- hash or mask columns using built-in functions
FROM default_glue_catalog.your_schema.your_raw_data_table
WHERE time_filter();
Data lake table
CREATE SYNC JOB transform_orders_and_insert_into_athena
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = true
RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data
MAP_COLUMNS_BY_NAME
-- Use the SELECT statement to choose columns from the source and
-- implement your business logic transformations.
SELECT
-- rename columns
orderid AS order_id,
-- hash or mask columns using built-in functions
MD5(customer_email) AS customer_id,
-- computed field defined later in the query
customer_name,
nettotal AS total,
-- populate the partition column with the processing time of the event,
-- and automatically cast to DATE type
$commit_time AS partition_date
FROM default_glue_catalog.upsolver_samples.orders_raw_data
-- create a computed column
LET customer_name = customer_firstname || ' ' || customer_lastname
WHERE ordertype = 'SHIPPING'
AND time_filter();
Elasticsearch
CREATE SYNC JOB insert_into_elasticsearch_index
RUN_INTERVAL = 1 MINUTE
START_FROM = BEGINNING
COMMENT = 'insert data into Elasticsearch index'
AS INSERT INTO ELASTICSEARCH your_elasticsearch_connection
PREFIX = 'orders'
SELECT customer_id,
COUNT(DISTINCT order_id) AS num_orders,
SUM(net_total) AS total_spent,
MIN(order_date) AS first_purchase,
MAX(order_date) AS last_purchase
FROM default_glue_catalog.your_database.staging_table
WHERE time_filter()
GROUP BY customer_id;
PostgreSQL
CREATE JOB load_data_to_postgres
START_FROM = BEGINNING
AS INSERT INTO POSTGRES your_postgres_connection.your_schema.target_table
MAP_COLUMNS_BY_NAME
SELECT orderid AS app_name
FROM default_glue_catalog.your_database.orders
WHERE time_filter();
Snowflake
CREATE SYNC JOB load_data_to_snowflake
RUN_INTERVAL = 1 MINUTE
START_FROM = BEGINNING
COMMENT = 'insert data into snowflake target table'
AS INSERT INTO SNOWFLAKE your_snowflake_connection.your_schema.target_table
MAP_COLUMNS_BY_NAME
SELECT customer_id,
COUNT(DISTINCT order_id) AS num_orders,
SUM(net_total) AS total_spent,
MIN(order_date) AS first_purchase,
MAX(order_date) AS last_purchase
FROM default_glue_catalog.your_database.staging_table
WHERE time_filter()
GROUP BY customer_id;