8000 Use OnceLock to store TokioRuntime (#895) · kylebarron/datafusion-python@ec8246d · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit ec8246d

Browse files
Use OnceLock to store TokioRuntime (apache#895)
* bump rust-version to match upstream datafusion * use std::sync::OnceLock to store tokio runtime instead of round-tripping to python * stop exporting TokioRuntime to python * remove unused argument from get_tokio_runtime * remove superflous Arc from get_tokio_runtime * add #[inline] annotation to get_tokio_runtime I also included a reference comment in case future users experience problems with using datafusion-python behind a forking app server l ike `gunicorn`. * fix clippy lint * cargo fmt
1 parent 1082214 commit ec8246d

File tree

6 files changed

+16
-24
lines changed

6 files changed

+16
-24
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ description = "Apache DataFusion DataFrame and SQL Query Engine"
2525
readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
28-
rust-version = "1.64"
28+
rust-version = "1.78"
2929
include = ["/src", "/datafusion", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
3030

3131
[features]

python/datafusion/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from .catalog import Catalog, Database, Table
3737

3838
# The following imports are okay to remain as opaque to the user.
39-
from ._internal import Config, runtime
39+
from ._internal import Config
4040

4141
from .record_batch import RecordBatchStream, RecordBatch
4242

@@ -75,7 +75,6 @@
7575
"literal",
7676
"lit",
7777
"DFSchema",
78-
"runtime",
7978
"Catalog",
8079
"Database",
8180
"Table",

src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,7 @@ impl PySessionContext {
982982
) -> PyResult<PyRecordBatchStream> {
983983
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
984984
// create a Tokio runtime to run the async code
985-
let rt = &get_tokio_runtime(py).0;
985+
let rt = &get_tokio_runtime().0;
986986
let plan = plan.plan.clone();
987987
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
988988
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });

src/dataframe.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ impl PyDataFrame {
543543

544544
fn execute_stream(&self, py: Python) -> PyResult<PyRecordBatchStream> {
545545
// create a Tokio runtime to run the async code
546-
let rt = &get_tokio_runtime(py).0;
546+
let rt = &get_tokio_runtime().0;
547547
let df = self.df.as_ref().clone();
548548
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
549549
rt.spawn(async move { df.execute_stream().await });
@@ -553,7 +553,7 @@ impl PyDataFrame {
553553

554554
fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
555555
// create a Tokio runtime to run the async code
556-
let rt = &get_tokio_runtime(py).0;
556+
let rt = &get_tokio_runtime().0;
557557
let df = self.df.as_ref().clone();
558558
let fut: JoinHandle<datafusion::common::Result<Vec<SendableRecordBatchStream>>> =
559559
rt.spawn(async move { df.execute_stream_partitioned().await });

src/lib.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ pub mod utils;
6666
static GLOBAL: MiMalloc = MiMalloc;
6767

6868
// Used to define Tokio Runtime as a Python module attribute
69-
#[pyclass]
7069
pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
7170

7271
/// Low-level DataFusion internal package.
@@ -75,11 +74,6 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
7574
/// datafusion directory.
7675
#[pymodule]
7776
fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
78-
// Register the Tokio Runtime as a module attribute so we can reuse it
79-
m.add(
80-
"runtime",
81-
TokioRuntime(tokio::runtime::Runtime::new().unwrap()),
82-
)?;
8377
// Register the python classes
8478
m.add_class::<catalog::PyCatalog>()?;
8579
m.add_class::<catalog::PyDatabase>()?;

src/utils.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,19 @@ use crate::TokioRuntime;
2020
use datafusion::logical_expr::Volatility;
2121
use pyo3::prelude::*;
2222
use std::future::Future;
23+
use std::sync::OnceLock;
2324
use tokio::runtime::Runtime;
2425

2526
/// Utility to get the Tokio Runtime from Python
26-
pub(crate) fn get_tokio_runtime(py: Python) -> PyRef<TokioRuntime> {
27-
let datafusion = py.import_bound("datafusion._internal").unwrap();
28-
let tmp = datafusion.getattr("runtime").unwrap();
29-
match tmp.extract::<PyRef<TokioRuntime>>() {
30-
Ok(runtime) => runtime,
31-
Err(_e) => {
32-
let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap());
33-
let obj: Bound<'_, TokioRuntime> = Py::new(py, rt).unwrap().into_bound(py);
34-
obj.extract().unwrap()
35-
}
36-
}
27+
#[inline]
28+
pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
29+
// NOTE: Other pyo3 python libraries have had issues with using tokio
30+
// behind a forking app-server like `gunicorn`
31+
// If we run into that problem, in the future we can look to `delta-rs`
32+
// which adds a check in that disallows calls from a forked process
33+
// https://github.com/delta-io/delta-rs/blob/87010461cfe01563d91a4b9cd6fa468e2ad5f283/python/src/utils.rs#L10-L31
34+
static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
35+
RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap()))
3736
}
3837

3938
/// Utility to collect rust futures with GIL released
@@ -42,7 +41,7 @@ where
4241
F: Future + Send,
4342
F::Output: Send,
4443
{
45-
let runtime: &Runtime = &get_tokio_runtime(py).0;
44+
let runtime: &Runtime = &get_tokio_runtime().0;
4645
py.allow_threads(|| runtime.block_on(f))
4746
}
4847

0 commit comments

Comments
 (0)
0