|
15 | 15 | // specific language governing permissions and limitations
|
16 | 16 | // under the License.
|
17 | 17 |
|
18 |
| -use crate::physical_plan::PyExecutionPlan; |
19 |
| -use crate::sql::logical::PyLogicalPlan; |
20 |
| -use crate::utils::wait_for_future; |
21 |
| -use crate::{errors::DataFusionError, expr::PyExpr}; |
| 18 | +use std::sync::Arc; |
| 19 | + |
22 | 20 | use datafusion::arrow::datatypes::Schema;
|
23 | 21 | use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
|
24 | 22 | use datafusion::arrow::util::pretty;
|
25 | 23 | use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
|
| 24 | +use datafusion::execution::SendableRecordBatchStream; |
26 | 25 | use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
|
27 | 26 | use datafusion::parquet::file::properties::WriterProperties;
|
28 | 27 | use datafusion::prelude::*;
|
29 | 28 | use pyo3::exceptions::{PyTypeError, PyValueError};
|
30 | 29 | use pyo3::prelude::*;
|
31 | 30 | use pyo3::types::PyTuple;
|
32 |
| -use std::sync::Arc; |
| 31 | +use tokio::task::JoinHandle; |
| 32 | + |
| 33 | +use crate::errors::py_datafusion_err; |
| 34 | +use crate::physical_plan::PyExecutionPlan; |
| 35 | +use crate::record_batch::PyRecordBatchStream; |
| 36 | +use crate::sql::logical::PyLogicalPlan; |
| 37 | +use crate::utils::{get_tokio_runtime, wait_for_future}; |
| 38 | +use crate::{errors::DataFusionError, expr::PyExpr}; |
33 | 39 |
|
34 | 40 | /// A PyDataFrame is a representation of a logical plan and an API to compose statements.
|
35 | 41 | /// Use it to build a plan and `.collect()` to execute the plan and collect the result.
|
@@ -399,6 +405,35 @@ impl PyDataFrame {
|
399 | 405 | })
|
400 | 406 | }
|
401 | 407 |
|
| 408 | + fn execute_stream(&self, py: Python) -> PyResult<PyRecordBatchStream> { |
| 409 | + // create a Tokio runtime to run the async code |
| 410 | + let rt = &get_tokio_runtime(py).0; |
| 411 | + let df = self.df.as_ref().clone(); |
| 412 | + let fut: JoinHandle<datafusion_common::Result<SendableRecordBatchStream>> = |
| 413 | + rt.spawn(async move { df.execute_stream().await }); |
| 414 | + let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?; |
| 415 | + Ok(PyRecordBatchStream::new(stream?)) |
| 416 | + } |
| 417 | + |
| 418 | + fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> { |
| 419 | + // create a Tokio runtime to run the async code |
| 420 | + let rt = &get_tokio_runtime(py).0; |
| 421 | + let df = self.df.as_ref().clone(); |
| 422 | + let fut: JoinHandle<datafusion_common::Result<Vec<SendableRecordBatchStream>>> = |
| 423 | + rt.spawn(async move { df.execute_stream_partitioned().await }); |
| 424 | + let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?; |
| 425 | + |
| 426 | + match stream { |
| 427 | + Ok(batches) => Ok(batches |
| 428 | + .into_iter() |
| 429 | + .map(|batch_stream| PyRecordBatchStream::new(batch_stream)) |
| 430 | + .collect()), |
| 431 | + _ => Err(PyValueError::new_err( |
| 432 | + "Unable to execute stream partitioned", |
| 433 | + )), |
| 434 | + } |
| 435 | + } |
| 436 | + |
402 | 437 | /// Convert to pandas dataframe with pyarrow
|
403 | 438 | /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame
|
404 | 439 | fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
|
|
0 commit comments