From 5449696cbb4e31ab58cea1c2d48d2c971aa42e37 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 16 May 2025 23:05:18 +0000 Subject: [PATCH 1/4] feat: Add experimental polars execution --- bigframes/_config/bigquery_options.py | 16 +++++ bigframes/core/compile/polars/compiler.py | 8 ++- bigframes/session/__init__.py | 1 + bigframes/session/bq_caching_executor.py | 23 +++++-- bigframes/session/polars_executor.py | 66 ++++++++++++++++++ tests/system/small/test_polars_execution.py | 74 +++++++++++++++++++++ tests/unit/polars_session.py | 2 +- 7 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 bigframes/session/polars_executor.py create mode 100644 tests/system/small/test_polars_execution.py diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 3a6008eaa8..10adc81dde 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -90,6 +90,7 @@ def __init__( allow_large_results: bool = False, ordering_mode: Literal["strict", "partial"] = "strict", client_endpoints_override: Optional[dict] = None, + enable_polars_execution: bool = False, ): self._credentials = credentials self._project = project @@ -108,6 +109,7 @@ def __init__( client_endpoints_override = {} self._client_endpoints_override = client_endpoints_override + self._enable_polars_execution = enable_polars_execution @property def application_name(self) -> Optional[str]: @@ -379,3 +381,17 @@ def client_endpoints_override(self, value: dict): ) self._client_endpoints_override = value + + @property + def enable_polars_execution(self) -> bool: + """If True, will use polars to execute some simple query plans locally.""" + return self._enable_polars_execution + + @enable_polars_execution.setter + def enable_polars_execution(self, value: bool): + if value is True: + msg = bfe.format_message( + "Polars execution is an experimental feature, and may not be stable. Must have polars installed." + ) + warnings.warn(msg, category=bfe.PreviewWarning) + self._enable_polars_execution = value diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index b2f018e80a..36a2972c1a 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -96,6 +96,10 @@ def _( return args[0] % args[1] if isinstance(op, ops.coalesce_op.__class__): return pl.coalesce(*args) + if isinstance(op, ops.isnull_op.__class__): + return args[0].is_null() + if isinstance(op, ops.notnull_op.__class__): + return args[0].is_not_null() if isinstance(op, ops.CaseWhenOp): expr = pl.when(args[0]).then(args[1]) for pred, result in zip(args[2::2], args[3::2]): @@ -184,7 +188,7 @@ class PolarsCompiler: expr_compiler = PolarsExpressionCompiler() agg_compiler = PolarsAggregateCompiler() - def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame: + def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame: if not polars_installed: raise ValueError( "Polars is not installed, cannot compile to polars engine." @@ -192,7 +196,7 @@ def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame: # TODO: Create standard way to configure BFET -> BFET rewrites # Polars has incomplete slice support in lazy mode - node = nodes.bottom_up(array_value.node, bigframes.core.rewrite.rewrite_slice) + node = nodes.bottom_up(plan, bigframes.core.rewrite.rewrite_slice) return self.compile_node(node) @functools.singledispatchmethod diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7630e71eaa..df1389d85a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -254,6 +254,7 @@ def __init__( storage_manager=self._temp_storage_manager, strictly_ordered=self._strictly_ordered, metrics=self._metrics, + enable_polars_execution=context.enable_polars_execution, ) self._loader = bigframes.session.loader.GbqDataLoader( session=self, diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 118838c059..0bca205381 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -38,7 +38,12 @@ import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features -from bigframes.session import executor, local_scan_executor, read_api_execution +from bigframes.session import ( + executor, + local_scan_executor, + read_api_execution, + semi_executor, +) import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -123,6 +128,7 @@ def __init__( *, strictly_ordered: bool = True, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + enable_polars_execution: bool = False, ): self.bqclient = bqclient self.storage_manager = storage_manager @@ -130,14 +136,21 @@ def __init__( self.cache: ExecutionCache = ExecutionCache() self.metrics = metrics self.bqstoragereadclient = bqstoragereadclient - # Simple left-to-right precedence for now - self._semi_executors = ( + self._enable_polars_execution = enable_polars_execution + self._semi_executors: Sequence[semi_executor.SemiExecutor] = ( read_api_execution.ReadApiSemiExecutor( bqstoragereadclient=bqstoragereadclient, project=self.bqclient.project, ), local_scan_executor.LocalScanExecutor(), ) + if enable_polars_execution: + from bigframes.session import polars_executor + + self._semi_executors = ( + *self._semi_executors, + polars_executor.PolarsExecutor(), + ) def to_sql( self, @@ -542,8 +555,8 @@ def _execute_plan( """Just execute whatever plan as is, without further caching or decomposition.""" # First try to execute fast-paths if not output_spec.require_bq_table: - for semi_executor in self._semi_executors: - maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek) + for exec in self._semi_executors: + maybe_result = exec.execute(plan, ordered=ordered, peek=peek) if maybe_result: return maybe_result diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py new file mode 100644 index 0000000000..80d59f5285 --- /dev/null +++ b/bigframes/session/polars_executor.py @@ -0,0 +1,66 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional, TYPE_CHECKING + +from bigframes.core import bigframe_node, nodes +from bigframes.session import executor, semi_executor + +if TYPE_CHECKING: + import polars as pl + + +_COMPATIBLE_NODES = ( + nodes.ReadLocalNode, + nodes.OrderByNode, + nodes.ReversedNode, + nodes.SelectionNode, + nodes.FilterNode, # partial support + nodes.ProjectionNode, # partial support +) + + +class PolarsExecutor(semi_executor.SemiExecutor): + def __init__(self): + # This will error out if polars is not installed + from bigframes.core.compile.polars import PolarsCompiler + + self._compiler = PolarsCompiler() + + def execute( + self, + plan: bigframe_node.BigFrameNode, + ordered: bool, + peek: Optional[int] = None, + ) -> Optional[executor.ExecuteResult]: + if not self._can_execute(plan): + return None + # Note: Ignoring ordered flag, as just executing totally ordered is fine. + try: + lazy_frame: pl.LazyFrame = self._compiler.compile(plan) + except Exception: + return None + if peek is not None: + lazy_frame = lazy_frame.limit(peek) + pa_table = lazy_frame.collect().to_arrow() + return executor.ExecuteResult( + arrow_batches=iter(pa_table.to_batches()), + schema=plan.schema, + total_bytes=pa_table.nbytes, + total_rows=pa_table.num_rows, + ) + + def _can_execute(self, plan: bigframe_node.BigFrameNode): + return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes()) diff --git a/tests/system/small/test_polars_execution.py b/tests/system/small/test_polars_execution.py new file mode 100644 index 0000000000..e0e9a751c1 --- /dev/null +++ b/tests/system/small/test_polars_execution.py @@ -0,0 +1,74 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +import bigframes +from tests.system.utils import assert_pandas_df_equal + +polars = pytest.importorskip("polars", reason="polars is required for this test") + + +@pytest.fixture(scope="module") +def session_w_polars(): + context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True) + session = bigframes.Session(context=context) + yield session + session.close() # close generated session at cleanup time + + +def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index): + execution_count_before = session_w_polars._metrics.execution_count + bf_df = session_w_polars.read_pandas(scalars_pandas_df_index) + + pd_result = scalars_pandas_df_index.sort_index(ascending=False)[ + ["int64_too", "bool_col"] + ] + bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas() + + assert session_w_polars._metrics.execution_count == execution_count_before + assert_pandas_df_equal(bf_result, pd_result) + + +def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index): + execution_count_before = session_w_polars._metrics.execution_count + bf_df = session_w_polars.read_pandas(scalars_pandas_df_index) + + pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna( + subset=["int64_col", "string_col"] + ) + bf_result = ( + bf_df.sort_index(ascending=False) + .dropna(subset=["int64_col", "string_col"]) + .to_pandas() + ) + + assert session_w_polars._metrics.execution_count == execution_count_before + assert_pandas_df_equal(bf_result, pd_result) + + +def test_polar_execution_unsupported_sql_fallback( + session_w_polars, scalars_pandas_df_index +): + execution_count_before = session_w_polars._metrics.execution_count + bf_df = session_w_polars.read_pandas(scalars_pandas_df_index) + + pd_df = scalars_pandas_df_index.copy() + pd_df["str_len_col"] = pd_df.string_col.str.len() + pd_result = pd_df + + bf_df["str_len_col"] = bf_df.string_col.str.len() + bf_result = bf_df.to_pandas() + + assert session_w_polars._metrics.execution_count == (execution_count_before + 1) + assert_pandas_df_equal(bf_result, pd_result) diff --git a/tests/unit/polars_session.py b/tests/unit/polars_session.py index d592b49038..61daab9a8b 100644 --- a/tests/unit/polars_session.py +++ b/tests/unit/polars_session.py @@ -46,7 +46,7 @@ def execute( """ Execute the ArrayValue, storing the result to a temporary session-owned table. """ - lazy_frame: polars.LazyFrame = self.compiler.compile(array_value) + lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node) pa_table = lazy_frame.collect().to_arrow() # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. # Nullability may be different, and might use large versions of list, string datatypes. From 6a2ae4c56a67c6963dacad23d2583a9879a0f3d9 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 17 Jun 2025 19:47:25 +0000 Subject: [PATCH 2/4] fix test and import --- tests/system/small/test_polars_execution.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/system/small/test_polars_execution.py b/tests/system/small/test_polars_execution.py index e0e9a751c1..0aed693b80 100644 --- a/tests/system/small/test_polars_execution.py +++ b/tests/system/small/test_polars_execution.py @@ -14,7 +14,7 @@ import pytest import bigframes -from tests.system.utils import assert_pandas_df_equal +from bigframes.testing.utils import assert_pandas_df_equal polars = pytest.importorskip("polars", reason="polars is required for this test") @@ -53,7 +53,8 @@ def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_ind .to_pandas() ) - assert session_w_polars._metrics.execution_count == execution_count_before + # Filter and isnull not supported by polar engine yet, so falls back to bq execution + assert session_w_polars._metrics.execution_count == (execution_count_before + 1) assert_pandas_df_equal(bf_result, pd_result) @@ -70,5 +71,6 @@ def test_polar_execution_unsupported_sql_fallback( bf_df["str_len_col"] = bf_df.string_col.str.len() bf_result = bf_df.to_pandas() + # str len not supported by polar engine yet, so falls back to bq execution assert session_w_polars._metrics.execution_count == (execution_count_before + 1) assert_pandas_df_equal(bf_result, pd_result) From 404f4473f599797c93918ed7fd5dc0b868dc952d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Jun 2025 17:42:53 +0000 Subject: [PATCH 3/4] error for old polars version --- bigframes/_config/bigquery_options.py | 4 ++++ bigframes/_importing.py | 30 +++++++++++++++++++++++++++ noxfile.py | 2 +- testing/constraints-3.10.txt | 1 + 4 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 bigframes/_importing.py diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 4a5076addc..3ab1559351 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -22,6 +22,7 @@ import google.auth.credentials import requests.adapters +import bigframes._importing import bigframes.enums import bigframes.exceptions as bfe @@ -114,6 +115,8 @@ def __init__( client_endpoints_override = {} self._client_endpoints_override = client_endpoints_override + if enable_polars_execution: + bigframes._importing.import_polars() self._enable_polars_execution = enable_polars_execution @property @@ -439,4 +442,5 @@ def enable_polars_execution(self, value: bool): "Polars execution is an experimental feature, and may not be stable. Must have polars installed." ) warnings.warn(msg, category=bfe.PreviewWarning) + bigframes._importing.import_polars() self._enable_polars_execution = value diff --git a/bigframes/_importing.py b/bigframes/_importing.py new file mode 100644 index 0000000000..095a1d9c51 --- /dev/null +++ b/bigframes/_importing.py @@ -0,0 +1,30 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import importlib +from types import ModuleType + +from packaging import version + +# Keep this in sync with setup.py +POLARS_MIN_VERSION = version.Version("1.7.0") + + +def import_polars() -> ModuleType: + polars_module = importlib.import_module("polars") + imported_version = version.Version(polars_module.build_info()["version"]) + if imported_version < POLARS_MIN_VERSION: + raise ImportError( + f"Imported polars version: {imported_version} is below the minimum version: {POLARS_MIN_VERSION}" + ) + return polars_module diff --git a/noxfile.py b/noxfile.py index dee5f929b7..7ba2f8738e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -107,7 +107,7 @@ SYSTEM_TEST_EXTRAS: List[str] = [] SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = { "3.9": ["tests"], - "3.10": ["tests"], + "3.10": ["tests", "polars"], "3.12": ["tests", "scikit-learn", "polars"], "3.13": ["tests", "polars"], } diff --git a/testing/constraints-3.10.txt b/testing/constraints-3.10.txt index b11ab5a88d..12ad443aab 100644 --- a/testing/constraints-3.10.txt +++ b/testing/constraints-3.10.txt @@ -15,3 +15,4 @@ matplotlib==3.7.1 psutil==5.9.5 seaborn==0.13.1 traitlets==5.7.1 +polars==1.7.0 From fa02dffc56e1ac71765dcd92592dedc3bcf67284 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Jun 2025 21:43:55 +0000 Subject: [PATCH 4/4] add session started error --- bigframes/_config/bigquery_options.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 3ab1559351..09ffee95d4 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -437,6 +437,10 @@ def enable_polars_execution(self) -> bool: @enable_polars_execution.setter def enable_polars_execution(self, value: bool): + if self._session_started and self._enable_polars_execution != value: + raise ValueError( + SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution") + ) if value is True: msg = bfe.format_message( "Polars execution is an experimental feature, and may not be stable. Must have polars installed."