Comment on page
INSERT
An
INSERT
job defines a query that pulls in a set of data based on the given SELECT
statement and inserts it into the designated target. This query is then run periodically based on the RUN_INTERVAL
defined within the job.By default the
INSERT
job simply appends the new data to the target table. For data lake tables, those created using CREATE TABLE
in SQLake and managed by AWS Glue Data Catalog, when a primary key is defined the job will perform an upsert operation (update if PK exist, insert if it does not).When writing to non-data lake targets, like Snowflake and Redshift, the
INSERT
job will not recognize a primary key, if defined on the target table, and will only perform an INSERT
operation. If you want to upsert, refer to the MERGE
command.Additionally, if two jobs writing to the same table rewrite the same primary key, it is nondeterministic as to which job's data ends up in the table and is thus not recommended.
See also:
CREATE [SYNC] JOB <job_identifier>
[ COMMENT = '<comment>' ]
{ job_options }
AS INSERT INTO { target_definition }
[ { (<column_name> [ , ... ] )
| MAP_COLUMNS_BY_NAME [ EXCEPT <column_name> [, ...] ] } ]
{ <select_statement>
| UNNEST(<select_statement>) };
Valid identifiers match the following format:
identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;
{ <table_identifier>
| SNOWFLAKE <catalog_name>.<schema_name>.<table_name>
| REDSHIFT <catalog_name>.<schema_name>.<table_name>
| S3 BUCKET = <bucket_name>
| ELASTICSEARCH
}
The
MAP_COLUMNS_BY_NAME
keyword maps columns from the SELECT
statement to the table by the names of the returned columns in the query. Columns listed after EXCEPT
are excluded from the final table.When
MAP_COLUMNS_BY_NAME
is set, the SELECT
statement must provide an alias for each field or they must be a simple column reference or a star expression. The columns are then mapped to the fields of the same name in the table.Otherwise columns are mapped by ordinal positions with reference to the provided column list or the order of the columns in the table.
When using a star query in the
SELECT
statement, MAP_COLUMNS_BY_NAME
is required. Additionally, all primary key and partition columns must be mapped.Columns listed after the
EXCEPT
keyword are not written to your target table.CREATE SYNC JOB insert_into_s3
START_FROM = BEGINNING
FILE_FORMAT = (type = CSV)
COMPRESSION = GZIP
DATE_PATTERN = 'yyyy-MM-dd-HH-mm'
RUN_INTERVAL = 1 MINUTE
COMPUTE_CLUSTER = "Default Compute"
AS INSERT INTO S3 s3_output_connection LOCATION = 's3://your-bucket-name/path/to/folder/'
-- Use the SELECT statement to choose columns from the source and implement your business logic transformations.
SELECT
column1 AS s3_column1,
MD5(column2) AS s3_column2 -- hash or mask columns using built-in functions
FROM default_glue_catalog.your_schema.your_raw_data_table
WHERE time_filter();
CREATE SYNC JOB transform_orders_and_insert_into_athena
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = true
RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
-- Use the SELECT statement to choose columns from the source and implement your business logic transformations.
SELECT
orderid AS order_id, -- rename columns
MD5(customer_email) AS customer_id, -- hash or mask columns using built-in functions
customer_name, -- computed field defined later in the query
nettotal AS total,
$commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically cast to DATE type
FROM default_glue_catalog.upsolver_samples.orders_raw_data
LET customer_name = customer_firstname || ' ' || customer_lastname -- create a computed column
WHERE ordertype = 'SHIPPING'
AND time_filter();
CREATE SYNC JOB insert_into_elasticsearch_index
RUN_INTERVAL = 1 MINUTE
START_FROM = BEGINNING
COMMENT = 'insert data into Elasticsearch index'
AS INSERT INTO ELASTICSEARCH your_elasticsearch_connection PREFIX = 'orders'
SELECT customer_id,
COUNT(DISTINCT order_id) AS num_orders,
SUM(net_total) AS total_spent,
MIN(order_date) AS first_purchase,
MAX(order_date) AS last_purchase
FROM default_glue_catalog.your_database.staging_table
WHERE time_filter()
GROUP BY customer_id;
CREATE JOB load_data_to_postgres
START_FROM = BEGINNING
AS INSERT INTO POSTGRES your_postgres_connection.your_schema.target_table MAP_COLUMNS_BY_NAME
SELECT orderid AS app_name
FROM default_glue_catalog.your_database.orders
WHERE time_filter();
CREATE SYNC JOB load_data_to_snowflake
RUN_INTERVAL = 1 MINUTE
START_FROM = BEGINNING
COMMENT = 'insert data into snowflake target table'
AS INSERT INTO SNOWFLAKE your_snowflake_connection.your_schema.target_table MAP_COLUMNS_BY_NAME
SELECT customer_id,
COUNT(DISTINCT order_id) AS num_orders,
SUM(net_total) AS total_spent,
MIN(order_date) AS first_purchase,
MAX(order_date) AS last_purchase
FROM default_glue_catalog.your_database.staging_table
WHERE time_filter()
GROUP BY customer_id;
Last modified 2mo ago