[go: up one dir, main page]

0% found this document useful (0 votes)
130 views14 pages

Auto Loader Overview

Download as docx, pdf, or txt
Download as docx, pdf, or txt
Download as docx, pdf, or txt
You are on page 1/ 14

Overview

What is Auto Loader?


Auto Loader incrementally and efficiently processes new data files as they arrive in cloud
storage without any additional setup.

About Auto Loader


Auto Loader incrementally and efficiently processes new data files as they arrive in cloud
storage. Auto Loader can load data files from AWS S3 ( s3://), Azure Data Lake Storage
Gen2 (ADLS Gen2, abfss://), Google Cloud Storage (GCS, gs://), Azure Blob Storage
(wasbs://), ADLS Gen1 (adl://), and Databricks File System (DBFS, dbfs:/). Auto Loader can
ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input


directory path on the cloud file storage, the cloudFiles source automatically processes
new files as they arrive, with the option of also processing existing files in that directory.
Auto Loader has support for both Python and SQL in Delta Live Tables.

You can use Auto Loader to process billions of files to migrate or backfill a table. Auto
Loader scales to support near real-time ingestion of millions of files per hour.

Checkpointing with Auto Loader

As files are discovered, their metadata is persisted in a scalable key-value store


(RocksDB) in the checkpoint location of your Auto Loader pipeline. This key-value store
ensures that data is processed exactly once.

In case of failures, Auto Loader can resume from where it left off by information stored
in the checkpoint location and continue to provide exactly-once guarantees when
writing data into Delta Lake. You don’t need to maintain or manage any state yourself to
achieve fault tolerance or exactly-once semantics.

Databricks recommends Auto Loader in Delta Live Tables for incremental data ingestion.
Delta Live Tables extends functionality in Apache Spark Structured Streaming and allows
you to write just a few lines of declarative Python or SQL to deploy a production-quality
data pipeline.

Using Auto Loader in Delta Live Tables

You can use Auto Loader in your Delta Live Tables introduction pipelines. Delta Live
Tables extends functionality in Apache Spark Structured Streaming and allows you to
write just a few lines of declarative Python or SQL to deploy a production-quality data
pipeline with:

 Autoscaling compute infrastructure for cost savings


 Data quality checks with expectations
 Automatic schema evolution handling
 Monitoring via metrics in the event log

You do not need to provide a schema or checkpoint location because Delta Live Tables
automatically manages these settings for your pipelines.

Auto Loader syntax for DLT:

Delta Live Tables provides slightly modified Python syntax for Auto Loader, and adds
SQL support for Auto Loader.

The following examples use Auto Loader to create datasets from CSV and JSON files:

You can use supported format options with Auto Loader. Using the map() function, you
can pass any number of options to the cloud_files() method. Options are key-value pairs,
where the keys and values are strings. The following describes the syntax for working
with Auto Loader in SQL:

The following example reads data from tab-delimited CSV files with a header:

You can use the schema to specify the format manually; you must specify
the schema for formats that do not support schema inference:
Configure Auto Loader to ingest data to Delta Lake

Databricks recommends using Auto Loader for incremental data ingestion. Auto Loader


automatically detects and processes new files as they arrive in cloud object storage.

Databricks recommends storing data with Delta Lake. Delta Lake is an open source
storage layer that provides ACID transactions and enables the data lakehouse. Delta
Lake is the default format for tables created in Databricks.

To configure Auto Loader to ingest data to a Delta Lake table, copy and paste the
following code into the empty cell in your notebook:
Concepts:

You can tune Auto Loader based on data volume, variety, and velocity.

Configure schema inference and evolution in Auto Loader

Auto Loader can automatically detect the introduction of new columns to your data and
restart so you don’t have to manage the tracking and handling of schema changes
yourself. Auto Loader can also “rescue” data that was unexpected (for example, of
differing data types) in a JSON blob column, that you can choose to access later using
the semi-structured data access APIs.

The following formats are supported for schema inference and evolution:
Syntax for schema inference and evolution

The following example uses parquet for the cloudFiles.format. Use csv, avro, or json for


other file sources. All other settings for read and write stay the same for the default
behaviors for each format.

Schema inference

To infer the schema, Auto Loader samples the first 50 GB or 1000 files that it discovers,
whichever limit is crossed first. To avoid incurring this inference cost at every stream
start up, and to be able to provide a stable schema across stream restarts, you must set
the option cloudFiles.schemaLocation. Auto Loader creates a hidden
directory _schemas at this location to track schema changes to the input data over time.
If your stream contains a single cloudFiles source to ingest data, you can provide the
checkpoint location as cloudFiles.schemaLocation. Otherwise, provide a unique directory
for this option. If your input data returns an unexpected schema for your stream, check
that your schema location is being used by only a single Auto Loader source.

By default, Auto Loader infers columns in text-based file formats like CSV and JSON
as string columns. In JSON datasets, nested columns are also inferred as string columns.
Since JSON and CSV data is self-describing and can support many data types, inferring
the data as string can help avoid schema evolution issues such as numeric type
mismatches (integers, longs, floats). If you want to retain the original Spark schema
inference behavior, set the option cloudFiles.inferColumnTypes to true.

Auto Loader also attempts to infer partition columns from the underlying directory
structure of the data if the data is laid out in Hive style partitioning. For example, a file
path such as base_path/event=click/date=2021-04-01/f0.json would result in the
inference of date and event as partition columns. The data types for these columns will
be strings unless you set cloudFiles.inferColumnTypes to true. If the underlying directory
structure contains conflicting Hive partitions or doesn’t contain Hive style partitioning,
the partition columns will be ignored. You can provide the
option cloudFiles.partitionColumns as a comma-separated list of column names to
always try and parse the given columns from the file path if these columns exist
as key=value pairs in your directory structure.
Each Parquet file is self-describing and associated with a typed schema. To infer the
schema of the Parquet data, Auto Loader samples a subset of Parquet files and merges
the schemas of these individual files.

If a column has different data types in two Parquet files, Auto Loader determines if one
data type can be safely upcast to the other. If upcasting is possible, Auto Loader can
merge the two schemas and choose the more encompassing data type for this column;
otherwise the inference fails. For example, a: int and a: double can be merged as a:
double; a: double and a: string can be merged as a: string; but a: int and a: struct cannot
be merged. Note that, after merging a: int and a: double as a: double, Auto Loader can
read Parquet files with column a: double as normal, but for the Parquet files with a: int,
Auto Loader needs to read a as part of the rescued data column, because the data type
is different from the inferred schema. Users still have a chance to safely upcast the
rescued a:int and backfill a:double later.

When Auto Loader infers the schema, a rescued data column is automatically added to
your schema as _rescued_data. See the section on rescued data column and schema
evolution for details.

Change case-sensitive behavior

Unless case sensitivity is enabled, the columns abc, Abc, and ABC are considered the


same column for the purposes of schema inference. The case that is chosen is arbitrary
and depends on the sampled data. You can use schema hints to enforce which case
should be used. Once a selection has been made and the schema is inferred, Auto
Loader does not consider the casing variants that were not selected consistent with the
schema.

When rescued data column is enabled, fields named in a case other than that of the
schema are loaded to the _rescued_data column. Change this behavior by setting the
option readerCaseSensitive to false, in which case Auto Loader reads data in a case-
insensitive way.
Choosing between file notification and directory listing modes

Auto Loader supports two modes for detecting new files: directory listing and file
notification. You can switch file discovery modes across stream restarts and still obtain
exactly-once data processing guarantees.

Directory listing mode

Auto Loader identifies new files by listing the input directory. Directory listing mode
allows you to quickly start Auto Loader streams without any permission configurations
other than access to your data on cloud storage. In Databricks Runtime 9.1 and above,
Auto Loader can automatically detect whether files are arriving with lexical ordering to
your cloud storage and significantly reduce the amount of API calls it needs to make to
detect new files.

In directory listing mode, Auto Loader identifies new files by listing the input directory.
Directory listing mode allows you to quickly start Auto Loader streams without any
permission configurations other than access to your data on cloud storage. In
Databricks Runtime 9.1 and above, Auto Loader can automatically detect whether files
are arriving with lexical ordering to your cloud storage and significantly reduce the
number of API calls it needs to make to detect new files.

File notification mode

Auto Loader can automatically set up a notification service and queue service that
subscribe to file events from the input directory. File notification mode is more
performant and scalable for large input directories or a high volume of files but requires
additional cloud permissions for set up.

In file notification mode, Auto Loader automatically sets up a notification service and
queue service that subscribes to file events from the input directory. File notification
mode is more performant and scalable for large input directories or a high volume of
files but requires additional cloud permissions.

Cloud storage supported by modes

The availability for these modes are listed below.


Configure Auto Loader for production workloads

Databricks recommends that you follow the streaming best practices for running Auto
Loader in production.

Databricks recommends using Auto Loader in Delta Live Tables for incremental data


ingestion. Delta Live Tables extends functionality in Apache Spark Structured Streaming
and allows you to write just a few lines of declarative Python or SQL to deploy a
production-quality data pipeline with:

 Autoscaling compute infrastructure for cost savings


 Data quality checks with expectations
 Automatic schema evolution handling
 Monitoring via metrics in the event log

Monitoring Auto Loader

Querying files discovered by Auto Loader


Auto Loader provides a SQL API for inspecting the state of a stream. Using
the cloud_files_state function, you can find metadata about files that have been
discovered by an Auto Loader stream. Simply query from cloud_files_state, providing the
checkpoint location associated with an Auto Loader stream.

Listen to stream updates

To further monitor Auto Loader streams, Databricks recommends using Apache


Spark’s Streaming Query Listener interface.

Auto Loader reports metrics to the Streaming Query Listener at every batch. You can
view how many files exist in the backlog and how large the backlog is in
the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in
the streaming query progress dashboard:

In Databricks Runtime 10.1 and later, when using file notification mode, the metrics will
also include the approximate number of file events that are in the cloud queue
as approximateQueueSize for AWS and Azure.

Cost considerations
When running Auto Loader, your main source of costs would be the cost of compute
resources and file discovery.

To reduce compute costs, Databricks recommends using Databricks Jobs to schedule


Auto Loader as batch jobs using Trigger.AvailableNow (in Databricks Runtime 10.1 and
later) or Trigger.Once instead of running it continuously as long as you don’t have low
latency requirements.

File discovery costs can come in the form of LIST operations on your storage accounts in
directory listing mode and API requests on the subscription service, and queue service in
file notification mode. To reduce file discovery costs, Databricks recommends:

 Providing a ProcessingTime trigger when running Auto Loader continuously in


directory listing mode
 Architecting file uploads to your storage account in lexical ordering to
leverage Incremental Listing when possible
 Using Databricks Runtime 9.0 or later in directory listing mode, especially for
deeply nested directories
 Leveraging file notifications when incremental listing is not possible
 Using resource tags to tag resources created by Auto Loader to track your costs

Using Trigger.AvailableNow and rate limiting

Auto Loader can be scheduled to run in Databricks Jobs as a batch job by


using Trigger.AvailableNow. The AvailableNow trigger will instruct Auto Loader to
process all files that arrived before the query start time. New files that are uploaded
after the stream has started will be ignored until the next trigger.

With Trigger.AvailableNow, file discovery will happen asynchronously with data


processing and data can be processed across multiple micro-batches with rate limiting.
Auto Loader by default processes a maximum of 1000 files every micro-batch. You can
configure cloudFiles.maxFilesPerTrigger and cloudFiles.maxBytesPerTrigger to configure
how many files or how many bytes should be processed in a micro-batch. The file limit is
a hard limit but the byte limit is a soft limit, meaning that more bytes can be processed
than the provided maxBytesPerTrigger. When the options are both provided together,
Auto Loader will process as many files that are needed to hit one of the limits.

Event retention

Auto Loader keeps track of discovered files in the checkpoint location using RocksDB to
provide exactly-once ingestion guarantees. For high volume datasets, you can use
the cloudFiles.maxFileAge option to expire events from the checkpoint location to
reduce your storage costs and Auto Loader start up time. The minimum value that you
can set for cloudFiles.maxFileAge is "14 days". Deletes in RocksDB appear as tombstone
entries, therefore you should expect the storage usage to increase temporarily as events
expire before it starts to level off.

Trying to tune the cloudFiles.maxFileAge option can lead to unprocessed files being


ignored by Auto Loader or already processed files expiring and then being re-processed
causing duplicate data. Here are some things to consider when choosing
a cloudFiles.maxFileAge:

 If your stream restarts after a long time, file notification events that are pulled from
the queue that are older than cloudFiles.maxFileAge are ignored. Similarly, if you
use directory listing, files that may have appeared during the down time that are
older than cloudFiles.maxFileAge are ignored.
 If you use directory listing mode and use cloudFiles.maxFileAge, for example set
to "1 month", you stop your stream and restart the stream
with cloudFiles.maxFileAge set to "2 months", all files that are older than 1 month,
but more recent than 2 months are reprocessed.

The best approach to tuning cloudFiles.maxFileAge would be to start from a generous


expiration, for example, "1 year" and working downwards to something like "9 months".
If you set this option the first time you start the stream, you will not ingest data older
than cloudFiles.maxFileAge, therefore, if you want to ingest old data you should not set
this option as you start your stream.

Benefits of Auto Loader over using Structured Streaming directly on files

In Apache Spark, you can read files incrementally


using spark.readStream.format(fileFormat).load(directory). Auto Loader provides the
following benefits over the file source:

 Scalability: Auto Loader can discover billions of files efficiently. Backfills can be
performed asynchronously to avoid wasting any compute resources.
 Performance: The cost of discovering files with Auto Loader scales with the number
of files that are being ingested instead of the number of directories that the files
may land in.
 Schema inference and evolution support: Auto Loader can detect schema drifts,
notify you when schema changes happen, and rescue data that would have been
otherwise ignored or lost.
 Cost: Auto Loader uses native cloud APIs to get lists of files that exist in storage. In
addition, Auto Loader’s file notification mode can help reduce your cloud costs
further by avoiding directory listing altogether. Auto Loader can automatically set
up file notification services on storage to make file discovery much cheaper.

You might also like