8000 feat: Add experimental polars execution by TrevorBergeron · Pull Request #1747 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
24 changes: 24 additions & 0 deletions bigframes/_config/bigquery_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import google.auth.credentials
import requests.adapters

import bigframes._importing
import bigframes.enums
import bigframes.exceptions as bfe

Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(
requests_transport_adapters: Sequence[
Tuple[str, requests.adapters.BaseAdapter]
] = (),
enable_polars_execution: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a compute option so it can be changed at runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of just want to keep it as a constant within-session for now (will need to commit if making this a GA feature though). Turning polars execution on and off mid-session will make things like caching, multi-part execution really tricky

):
self._credentials = credentials
self._project = project
Expand All @@ -113,6 +115,9 @@ def __init__(
client_endpoints_override = {}

self._client_endpoints_override = client_endpoints_override
if enable_polars_execution:
bigframes._importing.import_polars()
self._enable_polars_execution = enable_polars_execution

@property
def application_name(self) -> Optional[str]:
Expand Down Expand Up @@ -424,3 +429,22 @@ def requests_transport_adapters(
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
)
self._requests_transport_adapters = value

@property
def enable_polars_execution(self) -> bool:
"""If True, will use polars to execute some simple query plans locally."""
return self._enable_polars_execution

@enable_polars_execution.setter
def enable_polars_execution(self, value: bool):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check that the session has already started? If not (perhaps because we want to safely ignore this if the global session has already started), maybe add a comment for why.

    def enable_polars_execution(self, value: bool):
        if self._session_started and self._enable_polars_execution != value:
            raise ValueError(
                SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
            )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, should error out probably, as a session cannot change once started. error added in new revision

if self._session_started and self._enable_polars_execution != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
)
if value is True:
msg = bfe.format_message(
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
)
warnings.warn(msg, category=bfe.PreviewWarning)
bigframes._importing.import_polars()
self._enable_polars_execution = value
30 changes: 30 additions & 0 deletions bigframes/_importing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
from types import ModuleType

from packaging import version

# Keep this in sync with setup.py
POLARS_MIN_VERSION = version.Version("1.7.0")


def import_polars() -> ModuleType:
polars_module = importlib.import_module("polars")
imported_version = version.Version(polars_module.build_info()["version"])
if imported_version < POLARS_MIN_VERSION:
raise ImportError(
f"Imported polars version: {imported_version} is below the minimum version: {POLARS_MIN_VERSION}"
)
return polars_module
4 changes: 2 additions & 2 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,15 @@ class PolarsCompiler:
expr_compiler = PolarsExpressionCompiler()
agg_compiler = PolarsAggregateCompiler()

def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
if not polars_installed:
raise ValueError(
"Polars is not installed, cannot compile to polars engine."
)

# TODO: Create standard way to configure BFET -> BFET rewrites
# Polars has incomplete slice support in lazy mode
node = array_value.node
node = plan
node = bigframes.core.rewrite.column_pruning(node)
node = nodes.bottom_up(node, bigframes.core.rewrite.rewrite_slice)
node = bigframes.core.rewrite.pull_out_window_order(node)
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def __init__(
storage_manager=self._temp_storage_manager,
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
enable_polars_execution=context.enable_polars_execution,
)

def __del__(self):
Expand Down
24 changes: 19 additions & 5 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@
import bigframes.core.tree_properties as tree_properties
import bigframes.dtypes
import bigframes.features
from bigframes.session import executor, loader, local_scan_executor, read_api_execution
from bigframes.session import (
executor,
loader,
local_scan_executor,
read_api_execution,
semi_executor,
)
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
import bigframes.session.planner
Expand Down Expand Up @@ -147,6 +153,7 @@ def __init__(
*,
strictly_ordered: bool = True,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
enable_polars_execution: bool = False,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
Expand All @@ -155,14 +162,21 @@ def __init__(
self.metrics = metrics
self.loader = loader
self.bqstoragereadclient = bqstoragereadclient
# Simple left-to-right precedence for now
self._semi_executors = (
self._enable_polars_execution = enable_polars_execution
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
read_api_execution.ReadApiSemiExecutor(
bqstoragereadclient=bqstoragereadclient,
project=self.bqclient.project,
),
local_scan_executor.LocalScanExecutor(),
)
if enable_polars_execution:
from bigframes.session import polars_executor

self._semi_executors = (
*self._semi_executors,
polars_executor.PolarsExecutor(),
)
self._upload_lock = threading.Lock()

def to_sql(
Expand Down Expand Up @@ -637,8 +651,8 @@ def _execute_plan(
"""Just execute whatever plan as is, without further caching or decomposition."""
# First try to execute fast-paths
if not output_spec.require_bq_table:
for semi_executor in self._semi_executors:
maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek)
for exec in self._semi_executors:
maybe_result = exec.execute(plan, ordered=ordered, peek=peek)
if maybe_result:
return maybe_result

Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def execute(
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
try:
lazy_frame: pl.LazyFrame = self._compiler.compile(
array_value.ArrayValue(plan)
array_value.ArrayValue(plan).node
)
except Exception:
return None
Expand Down
4 changes: 2 additions & 2 deletions bigframes/testing/polars_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def peek(
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().limit(n_rows).to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand All @@ -64,7 +64,7 @@ def execute(
"""
Execute the ArrayValue, storing the result to a temporary session-owned table.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
SYSTEM_TEST_EXTRAS: List[str] = []
SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
"3.9": ["tests", "anywidget"],
"3.10": ["tests"],
"3.10": ["tests", "polars"],
"3.12": ["tests", "scikit-learn", "polars", "anywidget"],
"3.13": ["tests", "polars"],
}
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ matplotlib==3.7.1
psutil==5.9.5
seaborn==0.13.1
traitlets==5.7.1
polars==1.7.0
76 changes: 76 additions & 0 deletions tests/system/small/test_polars_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

import bigframes
from bigframes.testing.utils import assert_pandas_df_equal

polars = pytest.importorskip("polars", reason="polars is required for this test")


@pytest.fixture(scope="module")
def session_w_polars():
context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True)
session = bigframes.Session(context=context)
yield session
session.close() # close generated session at cleanup time


def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False)[
["int64_too", "bool_col"]
]
bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas()

assert session_w_polars._metrics.execution_count == execution_count_before
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna(
subset=["int64_col", "string_col"]
)
bf_result = (
bf_df.sort_index(ascending=False)
.dropna(subset=["int64_col", "string_col"])
.to_pandas()
)

# Filter and isnull not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_unsupported_sql_fallback(
session_w_polars, scalars_pandas_df_index
):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_df = scalars_pandas_df_index.copy()
pd_df["str_len_col"] = pd_df.string_col.str.len()
pd_result = pd_df

bf_df["str_len_col"] = bf_df.string_col.str.len()
bf_result = bf_df.to_pandas()

# str len not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)
0