Output to Amazon Athena

Prerequisites

Writing to an Amazon Athena table is equivalent to writing to an Upsolver-managed table.

This means that you can create the target table within Upsolver itself, rather than having to do it in Athena beforehand, and it also gives you the option of automatically adding columns in your job that are missing from your target table.

Furthermore, you need a storage connection that has access to the bucket you would like the job to use to store the intermediate files used while running the job as well as a storage connection that has access to the bucket to store your target table's underlying files. These do not necessarily need to be separate connections. Finally, you should also have a staging table already created that contains your ingested data.

Create a job writing to Athena

After you have fulfilled the prerequisites and created your target table, you can create an INSERT job that writes to that table as follows:

CREATE SYNC JOB transform_orders_and_insert_into_athena
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = TRUE	
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data 
  MAP_COLUMNS_BY_NAME
    -- Use the SELECT statement to choose columns from the source and 
    -- implement your business logic transformations.
    SELECT 
      -- rename columns
      orderid AS order_id,
      -- hash or mask columns using built-in functions
      MD5(customer_email) AS customer_id, 
      -- computed field defined later in the query
      customer_name,  
      nettotal AS total, 
      -- populate the partition column with the processing time of the event, 
      -- and automatically cast to DATE type
      $commit_time AS partition_date 
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    -- create a computed column
    LET customer_name = customer_firstname || ' ' || customer_lastname 
    WHERE ordertype = 'SHIPPING' 
    AND time_filter();

This example only uses a subset of all job options available when writing to a data lake table. Depending on your use case, you may want to configure different options. For instance, if you'd like to insert data up to a specific point in time, you should configure the END_AT option.

Alter a job writing to Athena

Some job options are considered mutable, meaning that in some cases, you can run a SQL command to alter an existing transformation job rather than having to create a new one.

For example, take the job we created as an example earlier:

CREATE SYNC JOB transform_orders_and_insert_into_athena
    START_FROM = BEGINNING
    ADD_MISSING_COLUMNS = TRUE	
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data 
  MAP_COLUMNS_BY_NAME
    -- Use the SELECT statement to choose columns from the source and 
    -- implement your business logic transformations.
    SELECT 
      -- rename columns
      orderid AS order_id,
      -- hash or mask columns using built-in functions
      MD5(customer_email) AS customer_id, 
      -- computed field defined later in the query
      customer_name,  
      nettotal AS total, 
      -- populate the partition column with the processing time of the event, 
      -- and automatically cast to DATE type
      $commit_time AS partition_date 
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    -- create a computed column
    LET customer_name = customer_firstname || ' ' || customer_lastname 
    WHERE ordertype = 'SHIPPING' 
    AND time_filter();

If you want to keep the job as is but just change the cluster that is running the job, you can run the following command:

ALTER JOB transform_orders_and_insert_into_athena
    SET COMPUTE_CLUSTER = high_memory_cluster;

Note that some options such as RUN_INTERVAL cannot be altered once the connection has been created.

Drop a job writing to Athena

If you no longer need a certain job, you can easily drop it with the following SQL command:

DROP JOB transform_orders_and_insert_into_athena;

Learn More

Last updated