Auto Loader Overview
Auto Loader Overview
Auto Loader Overview
You can use Auto Loader to process billions of files to migrate or backfill a table. Auto
Loader scales to support near real-time ingestion of millions of files per hour.
In case of failures, Auto Loader can resume from where it left off by information stored
in the checkpoint location and continue to provide exactly-once guarantees when
writing data into Delta Lake. You don’t need to maintain or manage any state yourself to
achieve fault tolerance or exactly-once semantics.
Databricks recommends Auto Loader in Delta Live Tables for incremental data ingestion.
Delta Live Tables extends functionality in Apache Spark Structured Streaming and allows
you to write just a few lines of declarative Python or SQL to deploy a production-quality
data pipeline.
You can use Auto Loader in your Delta Live Tables introduction pipelines. Delta Live
Tables extends functionality in Apache Spark Structured Streaming and allows you to
write just a few lines of declarative Python or SQL to deploy a production-quality data
pipeline with:
You do not need to provide a schema or checkpoint location because Delta Live Tables
automatically manages these settings for your pipelines.
Delta Live Tables provides slightly modified Python syntax for Auto Loader, and adds
SQL support for Auto Loader.
The following examples use Auto Loader to create datasets from CSV and JSON files:
You can use supported format options with Auto Loader. Using the map() function, you
can pass any number of options to the cloud_files() method. Options are key-value pairs,
where the keys and values are strings. The following describes the syntax for working
with Auto Loader in SQL:
The following example reads data from tab-delimited CSV files with a header:
You can use the schema to specify the format manually; you must specify
the schema for formats that do not support schema inference:
Configure Auto Loader to ingest data to Delta Lake
Databricks recommends storing data with Delta Lake. Delta Lake is an open source
storage layer that provides ACID transactions and enables the data lakehouse. Delta
Lake is the default format for tables created in Databricks.
To configure Auto Loader to ingest data to a Delta Lake table, copy and paste the
following code into the empty cell in your notebook:
Concepts:
You can tune Auto Loader based on data volume, variety, and velocity.
Auto Loader can automatically detect the introduction of new columns to your data and
restart so you don’t have to manage the tracking and handling of schema changes
yourself. Auto Loader can also “rescue” data that was unexpected (for example, of
differing data types) in a JSON blob column, that you can choose to access later using
the semi-structured data access APIs.
The following formats are supported for schema inference and evolution:
Syntax for schema inference and evolution
Schema inference
To infer the schema, Auto Loader samples the first 50 GB or 1000 files that it discovers,
whichever limit is crossed first. To avoid incurring this inference cost at every stream
start up, and to be able to provide a stable schema across stream restarts, you must set
the option cloudFiles.schemaLocation. Auto Loader creates a hidden
directory _schemas at this location to track schema changes to the input data over time.
If your stream contains a single cloudFiles source to ingest data, you can provide the
checkpoint location as cloudFiles.schemaLocation. Otherwise, provide a unique directory
for this option. If your input data returns an unexpected schema for your stream, check
that your schema location is being used by only a single Auto Loader source.
By default, Auto Loader infers columns in text-based file formats like CSV and JSON
as string columns. In JSON datasets, nested columns are also inferred as string columns.
Since JSON and CSV data is self-describing and can support many data types, inferring
the data as string can help avoid schema evolution issues such as numeric type
mismatches (integers, longs, floats). If you want to retain the original Spark schema
inference behavior, set the option cloudFiles.inferColumnTypes to true.
Auto Loader also attempts to infer partition columns from the underlying directory
structure of the data if the data is laid out in Hive style partitioning. For example, a file
path such as base_path/event=click/date=2021-04-01/f0.json would result in the
inference of date and event as partition columns. The data types for these columns will
be strings unless you set cloudFiles.inferColumnTypes to true. If the underlying directory
structure contains conflicting Hive partitions or doesn’t contain Hive style partitioning,
the partition columns will be ignored. You can provide the
option cloudFiles.partitionColumns as a comma-separated list of column names to
always try and parse the given columns from the file path if these columns exist
as key=value pairs in your directory structure.
Each Parquet file is self-describing and associated with a typed schema. To infer the
schema of the Parquet data, Auto Loader samples a subset of Parquet files and merges
the schemas of these individual files.
If a column has different data types in two Parquet files, Auto Loader determines if one
data type can be safely upcast to the other. If upcasting is possible, Auto Loader can
merge the two schemas and choose the more encompassing data type for this column;
otherwise the inference fails. For example, a: int and a: double can be merged as a:
double; a: double and a: string can be merged as a: string; but a: int and a: struct cannot
be merged. Note that, after merging a: int and a: double as a: double, Auto Loader can
read Parquet files with column a: double as normal, but for the Parquet files with a: int,
Auto Loader needs to read a as part of the rescued data column, because the data type
is different from the inferred schema. Users still have a chance to safely upcast the
rescued a:int and backfill a:double later.
When Auto Loader infers the schema, a rescued data column is automatically added to
your schema as _rescued_data. See the section on rescued data column and schema
evolution for details.
When rescued data column is enabled, fields named in a case other than that of the
schema are loaded to the _rescued_data column. Change this behavior by setting the
option readerCaseSensitive to false, in which case Auto Loader reads data in a case-
insensitive way.
Choosing between file notification and directory listing modes
Auto Loader supports two modes for detecting new files: directory listing and file
notification. You can switch file discovery modes across stream restarts and still obtain
exactly-once data processing guarantees.
Auto Loader identifies new files by listing the input directory. Directory listing mode
allows you to quickly start Auto Loader streams without any permission configurations
other than access to your data on cloud storage. In Databricks Runtime 9.1 and above,
Auto Loader can automatically detect whether files are arriving with lexical ordering to
your cloud storage and significantly reduce the amount of API calls it needs to make to
detect new files.
In directory listing mode, Auto Loader identifies new files by listing the input directory.
Directory listing mode allows you to quickly start Auto Loader streams without any
permission configurations other than access to your data on cloud storage. In
Databricks Runtime 9.1 and above, Auto Loader can automatically detect whether files
are arriving with lexical ordering to your cloud storage and significantly reduce the
number of API calls it needs to make to detect new files.
Auto Loader can automatically set up a notification service and queue service that
subscribe to file events from the input directory. File notification mode is more
performant and scalable for large input directories or a high volume of files but requires
additional cloud permissions for set up.
In file notification mode, Auto Loader automatically sets up a notification service and
queue service that subscribes to file events from the input directory. File notification
mode is more performant and scalable for large input directories or a high volume of
files but requires additional cloud permissions.
Databricks recommends that you follow the streaming best practices for running Auto
Loader in production.
Auto Loader reports metrics to the Streaming Query Listener at every batch. You can
view how many files exist in the backlog and how large the backlog is in
the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in
the streaming query progress dashboard:
In Databricks Runtime 10.1 and later, when using file notification mode, the metrics will
also include the approximate number of file events that are in the cloud queue
as approximateQueueSize for AWS and Azure.
Cost considerations
When running Auto Loader, your main source of costs would be the cost of compute
resources and file discovery.
File discovery costs can come in the form of LIST operations on your storage accounts in
directory listing mode and API requests on the subscription service, and queue service in
file notification mode. To reduce file discovery costs, Databricks recommends:
Event retention
Auto Loader keeps track of discovered files in the checkpoint location using RocksDB to
provide exactly-once ingestion guarantees. For high volume datasets, you can use
the cloudFiles.maxFileAge option to expire events from the checkpoint location to
reduce your storage costs and Auto Loader start up time. The minimum value that you
can set for cloudFiles.maxFileAge is "14 days". Deletes in RocksDB appear as tombstone
entries, therefore you should expect the storage usage to increase temporarily as events
expire before it starts to level off.
If your stream restarts after a long time, file notification events that are pulled from
the queue that are older than cloudFiles.maxFileAge are ignored. Similarly, if you
use directory listing, files that may have appeared during the down time that are
older than cloudFiles.maxFileAge are ignored.
If you use directory listing mode and use cloudFiles.maxFileAge, for example set
to "1 month", you stop your stream and restart the stream
with cloudFiles.maxFileAge set to "2 months", all files that are older than 1 month,
but more recent than 2 months are reprocessed.
Scalability: Auto Loader can discover billions of files efficiently. Backfills can be
performed asynchronously to avoid wasting any compute resources.
Performance: The cost of discovering files with Auto Loader scales with the number
of files that are being ingested instead of the number of directories that the files
may land in.
Schema inference and evolution support: Auto Loader can detect schema drifts,
notify you when schema changes happen, and rescue data that would have been
otherwise ignored or lost.
Cost: Auto Loader uses native cloud APIs to get lists of files that exist in storage. In
addition, Auto Loader’s file notification mode can help reduce your cloud costs
further by avoiding directory listing altogether. Auto Loader can automatically set
up file notification services on storage to make file discovery much cheaper.