Links
Comment on page

Should I build pipelines from scratch?

When you log in, Upsolver displays some pipeline templates to help you get started with popular use cases.
You can chose among multiple templates based on use cases.
You can also use SQL snippets to build your own pipelines if you prefer.
Blank worksheet.
Find examples of snippets that you can use in your Worksheet in the SQLake UI:
SQL Snippets Menu
Pipeline creation process example for S3 connection:
1. Create a connection:
CREATE S3 CONNECTION <S3_CONNECTION_NAME>
AWS_ROLE = <AWS ROLE NAME>
EXTERNAL_ID = <EXTERNAL ID FROM TRUST POLICY>;
// example
CREATE S3 CONNECTION my_s3_connection
AWS_ROLE = 'arn:aws:iam::111111111111:role/<upsolver-role-*'
EXTERNAL_ID = '12345678'
READ_ONLY = TRUE;
2. Create a connection to AWS Glue Catalog:
CREATE GLUE_CATALOG CONNECTION production_catalog
AWS_ROLE = 'arn:aws:iam::111111111111:role/<upsolver-role-*>';
3. Create a staging table:
// Some codeCREATE TABLE default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>()
PARTITIONED BY $event_date;
-- TABLE_DATA_RETENTION = 30 days
//example
CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data;
4. Create a job to load your data:
CREATE JOB "<STAGING_JOB_NAME>"
CONTENT_TYPE = JSON
AS COPY FROM S3 <S3 CONNECTION FROM STEP 1>
/* example: BUCKET = 'upsolver-samples' */
BUCKET = '<BUCKET NAME>'
/* example: PREFIX = 'orders/' Note: the trailing slash is required */
PREFIX = '<PREFIX NAME>'
/* this is the table you created in step 1 */
INTO default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>;
//example
CREATE JOB load_orders_raw_data_from_s3
CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples BUCKET = 'upsolver-samples' PREFIX = 'orders/'
INTO default_glue_catalog.upsolver_samples.orders_raw_data;
5. Create an output table:
CREATE TABLE default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> (
partition_date date)
PARTITIONED BY partition_date;
//example
CREATE TABLE default_glue_catalog.upsolver_samples.orders_transformed_data(
partition_date date)
PARTITIONED BY partition_date;
6. Create a job to read from staging and write to refined zone:
CREATE JOB <TRANSFORM_JOB_NAME>
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = TRUE
RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.<DB_NAME>.<TRANSFORMED_TABLE_NAME> MAP_COLUMNS_BY_NAME
SELECT
<source_column_name> as <target_column_name>,
MD5(<source_column_name>) as <target_column_name>,
<new_variable> as <target_column_name>,
<source_column_name> as <target_column_name>,
$commit_time AS partition_date
FROM default_glue_catalog.<DB_NAME>.<STAGING_TABLE_NAME>
WHERE time_filter()
AND <column_name> = '<filter_by_value>';
//example
CREATE JOB transform_orders_and_insert_into_athena
START_FROM = BEGINNING
ADD_MISSING_COLUMNS = true
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME
-- Use the SELECT statement to choose columns from the source and implement your business logic transformations.
SELECT
orderid AS order_id, -- rename columns
MD5(buyeremail) AS customer_id, -- hash or mask columns using built-in functions
nettotal AS total,
$commit_time AS partition_date -- populate the partition column with the processing time of the event, automatically casted to DATE type
FROM default_glue_catalog.upsolver_samples.orders_raw_data
WHERE eventtype = 'ORDER' AND time_filter();
7. Query your data:
SELECT * FROM default_glue_catalog.upsolver_samples.orders_transformed_data limit 100;