8000 Allow for multiple input files per table instead of a single file (#519) · llama90/arrow-datafusion-python@501acff · GitHub
[go: up one dir, main page]

Skip to content

Commit 501acff

Browse files
authored
Allow for multiple input files per table instead of a single file (apache#519)
1 parent 59140f2 commit 501acff

File tree

3 files changed

+14
-14
lines changed

3 files changed

+14
-14
lines changed

datafusion/input/location.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
import os
19+
import glob
1920
from typing import Any
2021

2122
from datafusion.common import DataTypeMap, SqlTable
@@ -41,14 +42,12 @@ def build_table(
4142
format = extension.lstrip(".").lower()
4243
num_rows = 0 # Total number of rows in the file. Used for statistics
4344
columns = []
44-
4545
if format == "parquet":
4646
import pyarrow.parquet as pq
4747

4848
# Read the Parquet metadata
4949
metadata = pq.read_metadata(input_file)
5050
num_rows = metadata.num_rows
51-
5251
# Iterate through the schema and build the SqlTable
5352
for col in metadata.schema:
5453
columns.append(
@@ -57,7 +56,6 @@ def build_table(
5756
DataTypeMap.from_parquet_type_str(col.physical_type),
5857
)
5958
)
60-
6159
elif format == "csv":
6260
import csv
6361

@@ -73,7 +71,6 @@ def build_table(
7371
print(header_row)
7472
for _ in reader:
7573
num_rows += 1
76-
7774
# TODO: Need to actually consume this row into resonable columns
7875
raise RuntimeError(
7976
"TODO: Currently unable to support CSV input files."
@@ -84,4 +81,7 @@ def build_table(
8481
Only Parquet and CSV."
8582
)
8683

87-
return SqlTable(table_name, columns, num_rows, input_file)
84+
# Input could possibly be multiple files. Create a list if so
85+
input_files = glob.glob(input_file)
86+
87+
return SqlTable(table_name, columns, num_rows, input_files)

datafusion/tests/test_input.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ def test_location_input():
3030
tbl = location_input.build_table(input_file, table_name)
3131
assert "blog" == tbl.name
3232
assert 3 == len(tbl.columns)
33-
assert "blogs.parquet" in tbl.filepath
33+
assert "blogs.parquet" in tbl.filepaths[0]

src/common/schema.rs

Lines changed: 8 additions & 8 deletions
Original file line numb 8000 erDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub struct SqlTable {
5656
#[pyo3(get, set)]
5757
pub statistics: SqlStatistics,
5858
#[pyo3(get, set)]
59-
pub filepath: Option<String>,
59+
pub filepaths: Option<Vec<String>>,
6060
}
6161

6262
#[pymethods]
@@ -66,7 +66,7 @@ impl SqlTable {
6666
table_name: String,
6767
columns: Vec<(String, DataTypeMap)>,
6868
row_count: f64,
69-
filepath: Option<String>,
69+
filepaths: Option<Vec<String>>,
7070
) -> Self {
7171
Self {
7272
name: table_name,
@@ -76,7 +76,7 @@ impl SqlTable {
7676
indexes: Vec::new(),
7777
constraints: Vec::new(),
7878
statistics: SqlStatistics::new(row_count),
79-
filepath,
79+
filepaths,
8080
}
8181
}
8282
}
@@ -124,20 +124,20 @@ impl SqlSchema {
124124
pub struct SqlTableSource {
125125
schema: SchemaRef,
126126
statistics: Option<SqlStatistics>,
127-
filepath: Option<String>,
127+
filepaths: Option<Vec<String>>,
128128
}
129129

130130
impl SqlTableSource {
131131
/// Initialize a new `EmptyTable` from a schema
132132
pub fn new(
133133
schema: SchemaRef,
134134
statistics: Option<SqlStatistics>,
135-
filepath: Option<String>,
135+
filepaths: Option<Vec<String>>,
136136
) -> Self {
137137
Self {
138138
schema,
139139
statistics,
140-
filepath,
140+
filepaths,
141141
}
142142
}
143143

@@ -148,8 +148,8 @@ impl SqlTableSource {
148148

149149
/// Access optional filepath associated with this table source
150150
#[allow(dead_code)]
151-
pub fn filepath(&self) -> Option<&String> {
152-
self.filepath.as_ref()
151+
pub fn filepaths(&self) -> Option<&Vec<String>> {
152+
self.filepaths.as_ref()
153153
}
154154
}
155155

0 commit comments

Comments
 (0)
0