8000 Systematic Configuration in 'Create External Table' and 'Copy To' Options by metesynnada · Pull Request #9382 · apache/datafusion · GitHub
[go: up one dir, main page]

Skip to content

Systematic Configuration in 'Create External Table' and 'Copy To' Options #9382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0f176a1
Initial but not completely work like proto
metesynnada Feb 15, 2024
10000fb
Delete docs.yaml
metesynnada Feb 22, 2024
6c76423
Merge pull request #5 from synnada-ai/ci-action-fixing
mustafasrepo Feb 22, 2024
599516b
Merge branch 'apache:main' into main
mustafasrepo Feb 23, 2024
b20b65c
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
cac943d
Before proto is handled
metesynnada Feb 26, 2024
3fba4a0
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Feb 26, 2024
c17d655
Update listing_table_factory.rs
metesynnada Feb 26, 2024
13e7e83
Before proto 2
metesynnada Feb 26, 2024
25d5685
Minor adjustments
metesynnada Feb 26, 2024
14ed29f
Merge branch 'main' into configuration
metesynnada Feb 26, 2024
daca94e
Update headers
mustafasrepo Feb 27, 2024
0e1300d
Update copy.slt
metesynnada Feb 27, 2024
c2da778
Merge branch 'configuration' of https://github.com/synnada-ai/datafus…
mustafasrepo Feb 27, 2024
ea22682
Add new test,
mustafasrepo Feb 27, 2024
fb86d94
Passes SLT tests
metesynnada Feb 27, 2024
b89b138
Update csv.rs
metesynnada Feb 28, 2024
33eca8d
Before trying char
metesynnada Feb 28, 2024
3caf33e
Fix u8 handling
metesynnada Feb 28, 2024
d16bb65
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Feb 29, 2024
4f1acf1
Update according to review
metesynnada Feb 29, 2024
036f005
Passing tests
metesynnada Mar 5, 2024
ba938a9
passing tests with proto
metesynnada Mar 5, 2024
cff6bc9
Cargo fix
metesynnada Mar 5, 2024
2fd3c4e
Testing and clippy refactors
metesynnada Mar 6, 2024
f141345
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 6, 2024
55a223c
After merge corrections
metesynnada Mar 6, 2024
7dfa7ee
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 6, 2024
4f9acdf
Parquet feature fix
metesynnada Mar 6, 2024
b0bb337
On datafusion-cli register COPY statements
metesynnada Mar 7, 2024
00525a8
Correcting a test
metesynnada Mar 7, 2024
f2f1d96
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 7, 2024
129e682
Review
ozankabak Mar 11, 2024
9f06c6f
Review visited
metesynnada Mar 12, 2024
e27dfe8
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Before proto 2
  • Loading branch information
metesynnada committed Feb 26, 2024
commit 13e7e833ed8dd1108bc0a8f5a40c115ad92c510c
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ mod tests {
use super::*;
use datafusion::common::plan_err;
use datafusion_common::{
file_options::StatementOptions, FileType, FileTypeWriterOptions,
FileType, FileTypeWriterOptions,
};

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,6 @@ impl ConfigField for TableParquetOptions {
}
}


macro_rules! config_namespace_with_hashmap {
(
$(#[doc = $struct_d:tt])*
Expand Down
7 changes: 0 additions & 7 deletions datafusion/common/src/file_options/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@

//! Options related to how Arrow files should be written

use crate::{
config::ConfigOptions,
error::{DataFusionError, Result},
};

use super::StatementOptions;

#[derive(Clone, Debug)]
pub struct ArrowWriterOptions {}

Expand Down
7 changes: 0 additions & 7 deletions datafusion/common/src/file_options/avro_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,5 @@

//! Options related to how avro files should be written

use crate::{
config::ConfigOptions,
error::{DataFusionError, Result},
};

use super::StatementOptions;

#[derive(Clone, Debug)]
pub struct AvroWriterOptions {}
7 changes: 1 addition & 6 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

//! Options related to how csv files should be written

use std::str::FromStr;

use arrow::csv::WriterBuilder;

use crate::config::{CsvOptions, TableOptions};
use crate::config::CsvOptions;
use crate::{
config::ConfigOptions,
error::{DataFusionError, Result},
parsers::CompressionTypeVariant,
};

use super::StatementOptions;

/// Options for writing CSV files
#[derive(Clone, Debug)]
pub struct CsvWriterOptions {
Expand Down
22 changes: 0 additions & 22 deletions datafusion/common/src/file_options/json_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ use std::str::FromStr;

use crate::config::JsonOptions;
use crate::{
config::ConfigOptions,
error::{DataFusionError, Result},
parsers::CompressionTypeVariant,
};

use super::StatementOptions;

/// Options for writing JSON files
#[derive(Clone, Debug)]
pub struct JsonWriterOptions {
Expand All @@ -40,25 +37,6 @@ impl JsonWriterOptions {
}
}

impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions {
type Error = DataFusionError;

fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result<Self> {
let _configs = value.0;
let statement_options = value.1;
let mut compression = CompressionTypeVariant::UNCOMPRESSED;
for (option, value) in &statement_options.options {
match option.to_lowercase().as_str(){
"compression" => {
compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?;
},
_ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for JSON format!")))
}
}
Ok(JsonWriterOptions { compression })
}
}

impl TryFrom<&JsonOptions> for JsonWriterOptions {
type Error = DataFusionError;

Expand Down
143 changes: 3 additions & 140 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,146 +26,9 @@ pub mod json_writer;
pub mod parquet_writer;
pub(crate) mod parse_utils;

use std::{
collections::HashMap,
fmt::{self, Display},
path::Path,
str::FromStr,
};

use crate::{
config::ConfigOptions, file_options::parse_utils::parse_boolean_string,
DataFusionError, FileType, Result,
};
use std::str::FromStr;

#[cfg(feature = "parquet")]
use self::parquet_writer::ParquetWriterOptions;

use self::{
arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions,
csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions,
};

/// Represents a single arbitrary setting in a
/// [StatementOptions] where OptionTuple.0 determines
/// the specific setting to be modified and OptionTuple.1
/// determines the value which should be applied
pub type OptionTuple = (String, String);

/// Represents arbitrary tuples of options passed as String
/// tuples from SQL statements. As in the following statement:
/// COPY ... TO ... (setting1 value1, setting2 value2, ...)
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct StatementOptions {
options: Vec<OptionTuple>,
}

/// Useful for conversion from external tables which use Hashmap<String, String>
impl From<&HashMap<String, String>> for StatementOptions {
fn from(value: &HashMap<String, String>) -> Self {
Self {
options: value
.iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect::<Vec<OptionTuple>>(),
}
}
}

impl StatementOptions {
pub fn new(options: Vec<OptionTuple>) -> Self {
Self { options }
}

pub fn into_inner(self) -> Vec<OptionTuple> {
self.options
}

/// Scans for option and if it exists removes it and attempts to parse as a boolean
/// Returns none if it does not exist.
pub fn take_bool_option(&mut self, find: &str) -> Result<Option<bool>> {
let maybe_option = self.scan_and_remove_option(find);
maybe_option
.map(|(_, v)| parse_boolean_string(find, v))
.transpose()
}

/// Scans for option and if it exists removes it and returns it
/// Returns none if it does not exist
pub fn take_str_option(&mut self, find: &str) -> Option<String> {
let maybe_option = self.scan_and_remove_option(find);
maybe_option.map(|(_, v)| v)
}

/// Finds partition_by option if exists and parses into a `Vec<String>`.
/// If option doesn't exist, returns empty `vec![]`.
/// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']`
pub fn take_partition_by(&mut self) -> Vec<String> {
let partition_by = self.take_str_option("partition_by");
match partition_by {
Some(part_cols) => {
let dequoted = part_cols
.chars()
.enumerate()
.filter(|(idx, c)| {
!((*idx == 0 || *idx == part_cols.len() - 1)
&& (*c == '\'' || *c == '"'))
})
.map(|(_idx, c)| c)
.collect::<String>();
dequoted
.split(',')
.map(|s| s.trim().replace("''", "'"))
.collect::<Vec<_>>()
}
None => vec![],
}
}

/// Infers the file_type given a target and arbitrary options.
/// If the options contain an explicit "format" option, that will be used.
/// Otherwise, attempt to infer file_type from the extension of target.
/// Finally, return an error if unable to determine the file_type
/// If found, format is removed from the options list.
pub fn try_infer_file_type(&mut self, target: &str) -> Result<FileType> {
let explicit_format = self.scan_and_remove_option("format");
let format = match explicit_format {
Some(s) => FileType::from_str(s.1.as_str()),
None => {
// try to infer file format from file extension
let extension: &str = &Path::new(target)
.extension()
.ok_or(DataFusionError::Configuration(
"Format not explicitly set and unable to get file extension!"
.to_string(),
))?
.to_str()
.ok_or(DataFusionError::Configuration(
"Format not explicitly set and failed to parse file extension!"
.to_string(),
))?
.to_lowercase();

FileType::from_str(extension)
}
}?;

Ok(format)
}

/// Finds an option in StatementOptions if exists, removes and returns it
/// along with the vec of remaining options.
fn scan_and_remove_option(&mut self, find: &str) -> Option<OptionTuple> {
let idx = self
.options
.iter()
.position(|(k, _)| k.to_lowercase() == find.to_lowercase());
match idx {
Some(i) => Some(self.options.swap_remove(i)),
None => None,
}
}
}
use crate::FileType;

#[cfg(test)]
#[cfg(feature = "parquet")]
Expand All @@ -187,7 +50,7 @@ mod tests {

use crate::Result;

use super::{parquet_writer::ParquetWriterOptions, StatementOptions};
use super::parquet_writer::ParquetWriterOptions;

#[test]
fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
Expand Down
108 changes: 1 addition & 107 deletions datafusion/common/src/file_options/parquet_writer.rs
A6A7
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};

use crate::{config::ConfigOptions, DataFusionError, Result};

use super::StatementOptions;

use crate::config::{ParquetOptions, TableParquetOptions};
use crate::config::TableParquetOptions;
use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
Expand Down Expand Up @@ -111,110 +109,6 @@ pub fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilde
Ok(builder)
}

impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(
configs_and_statement_options: (&ConfigOptions, &StatementOptions),
) -> Result<Self> {
let configs = configs_and_statement_options.0;
let statement_options = configs_and_statement_options.1;
let mut builder = default_builder(configs)?;
for (option, value) in &statement_options.options {
let (option, col_path) = split_option_and_column_path(option);
builder = match option.to_lowercase().as_str(){
"max_row_group_size" => builder
.set_max_row_group_size(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?),
"data_pagesize_limit" => builder
.set_data_page_size_limit(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
"write_batch_size" => builder
.set_write_batch_size(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
"writer_version" => builder
.set_writer_version(parse_version_string(value)?),
"dictionary_page_size_limit" => builder
.set_dictionary_page_size_limit(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
"created_by" => builder
.set_created_by(value.to_owned()),
"column_index_truncate_length" => builder
.set_column_index_truncate_length(Some(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?)),
"data_page_row_count_limit" => builder
.set_data_page_row_count_limit(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
"bloom_filter_enabled" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_bloom_filter_enabled(path, parsed_value),
None => builder.set_bloom_filter_enabled(parsed_value)
}
},
"encoding" => {
let parsed_encoding = parse_encoding_string(value)?;
match col_path{
Some(path) => builder.set_column_encoding(path, parsed_encoding),
None => builder.set_encoding(parsed_encoding)
}
},
"dictionary_enabled" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_dictionary_enabled(path, parsed_value),
None => builder.set_dictionary_enabled(parsed_value)
}
},
"compression" => {
let parsed_compression = parse_compression_string(value)?;
match col_path{
Some(path) => builder.set_column_compression(path, parsed_compression),
None => builder.set_compression(parsed_compression)
}
},
"statistics_enabled" => {
let parsed_value = parse_statistics_string(value)?;
match col_path{
Some(path) => builder.set_column_statistics_enabled(path, parsed_value),
None => builder.set_statistics_enabled(parsed_value)
}
},
"max_statistics_size" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_max_statistics_size(path, parsed_value),
None => builder.set_max_statistics_size(parsed_value)
}
},
"bloom_filter_fpp" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_bloom_filter_fpp(path, parsed_value),
None => builder.set_bloom_filter_fpp(parsed_value)
}
},
"bloom_filter_ndv" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_bloom_filter_ndv(path, parsed_value),
None => builder.set_bloom_filter_ndv(parsed_value)
}
},
_ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!")))
}
}
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
}
}

impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Interval parsing logic
use sqlparser::parser::ParserError;
use std::fmt::{Display, Formatter};
use std::fmt::Display;

use std::result;
use std::str::FromStr;
Expand Down
Loading
0