Amazon S3
This page describes the job options for ingesting data from Amazon S3.
CREATE [SYNC] JOB <job_name>
[{ job_options }]
AS COPY FROM S3
<connection_identifier>
[{ source_options }]
INTO <table_identifier>;
The following properties can be configured when ingesting data from Amazon S3.
To quickly and efficiently find data in Amazon S3, SQLake needs to list the files in the specified bucket and prefixes. SQLake utilizes the folder structure and naming, i.e. partitions, to optimize reading data before it is processed.
When reading events, SQLake organizes them on a timeline that it uses to synchronize between jobs to ensure data is joined and aggregated correctly. Therefore, storing raw data using date-formatted partitions such as
/bucket/data/2022/11/01/
, enables SQLake to optimize and speed up ingestion. To better understand how SQLake reads and processes data using date partitions, jump to working with date patterns.
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CONTENT_TYPE = { AUTO
| CSV
| JSON
| PARQUET
| TSV
| AVRO
| AVRO_SCHEMA_REGISTRY
| FIXED_WIDTH
| REGEX
| SPLIT_LINES
| ORC
| XML } ]
[ COMPRESSION = { AUTO
| GZIP
| SNAPPY
| LZO
| NONE
| SNAPPY_UNFRAMED
| KCL } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...) | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ DELETE_FILES_AFTER_LOAD = { TRUE | FALSE } ]
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ FILE_PATTERN = '<pattern>' ]
[ RUN_PARALLELISM = <integer> ]
Source-specific job options:
More information and syntax examples for job options applicable to all sources, can be found on the Ingestion jobs page.
Values:
{ AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }
Default:
AUTO
(Optional) The compression of the source.
Values:
{ AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }
Default:
AUTO
(Optional) The file format of the content being read.
Note that
AUTO
only works when reading Avro, JSON, or Parquet.Type:
boolean
Default:
false
(Optional) When
true
, files are deleted from the storage source once they have been ingested into the target location within your metastore.This allows Upsolver to discover new files immediately, regardless of how many files are in the source, or what file names and patterns are used.
Values:
{ NOW | <timestamp> }
Default: Never
(Optional) Configures the time to stop ingesting data. Files after the specified time are ignored. Timestamps should be based on the UTC time format.
Type:
text
Default:
''
(Optional) Only files that match the provided regex pattern are loaded.
Use this option to filter out irrelevant data. For example, you could filter by a suffix to only keep
.parquet
files in a folder that may have some additional files that should not be ingested.Type:
integer
Default:
1
(Optional) The number of parser jobs to run in parallel per minute.
Before you create your landing zone where raw data will be staged for SQLake to ingest, you need to be aware of a couple of things:
- 1.Date values should be lexicographically ordered and include a leading zero when applicable. For example, use
/2022/02/04
for February 4th 2022, over/22/2/4
. - 2.Avoid including a dynamically changing prefix, like application ID or job cluster ID, as the first part of your prefix. If you need to include it, add it at the end. For example, use
/2022/02/04/X34TFA2
instead of/X34TFA2/2022/02/04
.
To ingest date-partitioned Amazon S3 data, you can configure the
DATE_PATTERN
property of your ingestion job. For Apache Hive partitions, the value is prefixed with a keyword representing its meaning. For example, /year=2022/month=02/day=04/
. To support this format, you include the prefixes in the DATE_PATTERN
property surrounded by two single quotes (not a double quote). Using the above example, the
DATE_PATTERN
will be ’
’’year=’’
yyyy
’’/month=’’
MM
’’/day=’’
dd
’
. As you can see, the string literals in the pattern, like ‘’year=’’
and ‘’month=’’
are wrapped in two single quote characters, not double quotes. If there is a single quote in your folder path that needs to be represented in the path literal, you need to surround it with two single quotes. For example,
year(‘0’)=2000
would be represented as ’’’year(’’’’0’’’’)=’’yyyy/’
. This could be confusing, so ensure you’re quoting the string literal parts of the pattern and not the date value patterns.Note that when there is no date pattern defined, you cannot define a time to start ingestion; all available data is ingested by default.
[ DATE_PATTERN = '<date_pattern>' ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ INITIAL_LOAD_PREFIX = '<prefix>' ]
[ INITIAL_LOAD_PATTERN = '<pattern>' ]
[ MAX_DELAY = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
Type:
text
(Optional) The date pattern of the partitions on the Amazon S3 bucket to read from. SQLake supports reading from buckets partitioned up to the minute.
Example:
'yyyy/MM/dd/HH/mm'
When you set a
DATE_PATTERN
, SQLake uses the date in the folder path to understand when new files are added. The date in the path is used to process data in order of arrival, as well as set the $source_time
and $event_time
system columns used to keep jobs synchronized. If files are added to a folder named with a future date, these files will not be ingested until that date becomes the present.If you don’t set a
DATE_PATTERN
, SQLake will list and ingest files in the ingest job’s BUCKET
and PREFIX
location as soon as they are discovered. Historical data will also be processed as soon as it is added and discovered by SQLake. To discover new files, when a
DATE_PATTERN
is not set, SQLake lists the top-level prefix and performs a diff to detect newly added files. Subsequently, it lists the paths adjacent to these new files with the assumption that if a file was added here, others will be as well. This process is performed at regular intervals to ensure files are not missed. For buckets with few files and predictable changes, this works well. However, for buckets with many changes across millions of files and hundreds of prefixes, the scanning and diffing process may result in ingestion and processing delay. To optimize this process, consider setting the job’s
DELETE_FILES_AFTER_LOAD
property to TRUE
. This moves ingested files to another staging location, leaving the source folder empty, and making it easier and faster for SQLake to discover new files. Be aware that configuring SQLake to move ingested files could impact other systems if they depend on the same raw files.To troubleshoot jobs that ingest data, you can query the task execution system table and inspect whether 0 bytes of data have been read in the “ingest data” stage, or SQLake is throwing parse errors in the “parse data” stage. In the case that 0 bytes have been read, it means that your job is configured correctly, but there is no new data. In the case where you see parse errors, you can narrow it down to either a misconfiguration of the job or bad data.
Values:
{ NOW | BEGINNING | <timestamp> }
Default:
BEGINNING
(Optional) Configures the time to start ingesting data. Files before the specified time are ignored. Timestamps should be in the UTC time format.
When a
DATE_PATTERN
is not specified, configuring this option is not allowed. By default, all data available is ingested.Type:
text
(Optional) Any file matching this prefix is immediately loaded when the job is run.
Type:
text
(Optional) Any file matching this regex pattern is immediately loaded when the job is run.
This loads data separately from the date pattern and is primarily used in CDC use cases, where you load some initial files named
LOAD00001
, LOAD00002
, etc. After that, all the data has a date pattern in the file name.Value:
<integer> { MINUTE[S] | HOUR[S] | DAY[S] }
Default:
1 DAY
(Optional) Configures how far back to check for new files. The larger this is, the more list operations are used.
Type:
text
The location to read files from, as a full S3 URI.
Last modified 2d ago