diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 93cc5e4210..323ef8f07a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -42,3 +42,8 @@ repos: additional_dependencies: [types-requests, types-tabulate, types-PyYAML, pandas-stubs<=2.2.3.241126] exclude: "^third_party" args: ["--check-untyped-defs", "--explicit-package-bases", "--ignore-missing-imports"] +- repo: https://github.com/biomejs/pre-commit + rev: v2.0.2 + hooks: + - id: biome-check + files: '\.js$' diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bf0d2a4d2..2e419c61e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,28 @@ [1]: https://pypi.org/project/bigframes/#history +## [2.11.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v2.10.0...v2.11.0) (2025-07-15) + + +### Features + +* Add `__contains__` to Index, Series, DataFrame ([#1899](https://github.com/googleapis/python-bigquery-dataframes/issues/1899)) ([07222bf](https://github.com/googleapis/python-bigquery-dataframes/commit/07222bfe2f6ae60859d33eb366598d7dee5c0572)) +* Add `thresh` param for Dataframe.dropna ([#1885](https://github.com/googleapis/python-bigquery-dataframes/issues/1885)) ([1395a50](https://github.com/googleapis/python-bigquery-dataframes/commit/1395a502ffa0faf4b7462045dcb0657485c7ce26)) +* Add concat pushdown for hybrid engine ([#1891](https://github.com/googleapis/python-bigquery-dataframes/issues/1891)) ([813624d](https://github.com/googleapis/python-bigquery-dataframes/commit/813624dddfd4f2396c8b1c9768c0c831bb0681ac)) +* Add pagination buttons (prev/next) to anywidget mode for DataFrames ([#1841](https://github.com/googleapis/python-bigquery-dataframes/issues/1841)) ([8eca767](https://github.com/googleapis/python-bigquery-dataframes/commit/8eca767425c7910c8f907747a8a8b335df0caa1a)) +* Add total_rows property to pandas batches iterator ([#1888](https://github.com/googleapis/python-bigquery-dataframes/issues/1888)) ([e3f5e65](https://github.com/googleapis/python-bigquery-dataframes/commit/e3f5e6539d220f8da57f08f67863ade29df4ad16)) +* Hybrid engine local join support ([#1900](https://github.com/googleapis/python-bigquery-dataframes/issues/1900)) ([1aa7950](https://github.com/googleapis/python-bigquery-dataframes/commit/1aa7950334bdc826a9a0a1894dad67ca6f755425)) +* Support `date` data type for to_datetime() ([#1902](https://github.com/googleapis/python-bigquery-dataframes/issues/1902)) ([24050cb](https://github.com/googleapis/python-bigquery-dataframes/commit/24050cb00247f68eb4ece827fd31ee1dd8b25380)) +* Support bpd.Series(json_data, dtype="json") ([#1882](https://github.com/googleapis/python-bigquery-dataframes/issues/1882)) ([05cb7d0](https://github.com/googleapis/python-bigquery-dataframes/commit/05cb7d0bc3599054acf8ecb8b15eb2045b9bf463)) + + +### Bug Fixes + +* Bpd.merge on common columns ([#1905](https://github.com/googleapis/python-bigquery-dataframes/issues/1905)) ([a1fa112](https://github.com/googleapis/python-bigquery-dataframes/commit/a1fa11291305a1da0d6a4121436c09ed04b224b5)) +* DataFrame string addition respects order ([#1894](https://github.com/googleapis/python-bigquery-dataframes/issues/1894)) ([52c8233](https://github.com/googleapis/python-bigquery-dataframes/commit/52c82337bcc9f2b6cfc1c6ac14deb83b693d114d)) +* Show slot_millis_sum warning only when `allow_large_results=False` ([#1892](https://github.com/googleapis/python-bigquery-dataframes/issues/1892)) ([25efabc](https://github.com/googleapis/python-bigquery-dataframes/commit/25efabc4897e0692725618ce43134127a7f2c2ee)) +* Used query row count metadata instead of table metadata ([#1893](https://github.com/googleapis/python-bigquery-dataframes/issues/1893)) ([e1ebc53](https://github.com/googleapis/python-bigquery-dataframes/commit/e1ebc5369a416280cec0ab1513e763b7a2fe3c20)) + ## [2.10.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v2.9.0...v2.10.0) (2025-07-08) diff --git a/MANIFEST.in b/MANIFEST.in index 16a933a629..e0deb6deb2 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -17,7 +17,7 @@ # Generated by synthtool. DO NOT EDIT! include README.rst LICENSE recursive-include third_party/bigframes_vendored * -recursive-include bigframes *.json *.proto py.typed +recursive-include bigframes *.json *.proto *.js py.typed recursive-include tests * global-exclude *.py[co] global-exclude __pycache__ diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 09ef17dff5..cb7c1923cf 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -522,7 +522,8 @@ def rank( def dropna( block: blocks.Block, column_ids: typing.Sequence[str], - how: typing.Literal["all", "any"] = "any", + how: str = "any", + thresh: typing.Optional[int] = None, subset: Optional[typing.Sequence[str]] = None, ): """ @@ -531,17 +532,38 @@ def dropna( if subset is None: subset = column_ids + # Predicates to check for non-null values in the subset of columns predicates = [ ops.notnull_op.as_expr(column_id) for column_id in column_ids if column_id in subset ] + if len(predicates) == 0: return block - if how == "any": - predicate = functools.reduce(ops.and_op.as_expr, predicates) - else: # "all" - predicate = functools.reduce(ops.or_op.as_expr, predicates) + + if thresh is not None: + # Handle single predicate case + if len(predicates) == 1: + count_expr = ops.AsTypeOp(pd.Int64Dtype()).as_expr(predicates[0]) + else: + # Sum the boolean expressions to count non-null values + count_expr = functools.reduce( + lambda a, b: ops.add_op.as_expr( + ops.AsTypeOp(pd.Int64Dtype()).as_expr(a), + ops.AsTypeOp(pd.Int64Dtype()).as_expr(b), + ), + predicates, + ) + # Filter rows where count >= thresh + predicate = ops.ge_op.as_expr(count_expr, ex.const(thresh)) + else: + # Only handle 'how' parameter when thresh is not specified + if how == "any": + predicate = functools.reduce(ops.and_op.as_expr, predicates) + else: # "all" + predicate = functools.reduce(ops.or_op.as_expr, predicates) + return block.filter(predicate) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index dbbf9ee864..c8632ebc8c 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -29,7 +29,17 @@ import random import textwrap import typing -from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import ( + Iterable, + Iterator, + List, + Literal, + Mapping, + Optional, + Sequence, + Tuple, + Union, +) import warnings import bigframes_vendored.constants as constants @@ -87,14 +97,22 @@ LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] -class BlockHolder(typing.Protocol): +@dataclasses.dataclass +class PandasBatches(Iterator[pd.DataFrame]): """Interface for mutable objects with state represented by a block value object.""" - def _set_block(self, block: Block): - """Set the underlying block value of the object""" + def __init__( + self, pandas_batches: Iterator[pd.DataFrame], total_rows: Optional[int] = 0 + ): + self._dataframes: Iterator[pd.DataFrame] = pandas_batches + self._total_rows: Optional[int] = total_rows + + @property + def total_rows(self) -> Optional[int]: + return self._total_rows - def _get_block(self) -> Block: - """Get the underlying block value of the object""" + def __next__(self) -> pd.DataFrame: + return next(self._dataframes) @dataclasses.dataclass() @@ -599,8 +617,7 @@ def try_peek( self.expr, n, use_explicit_destination=allow_large_results ) df = result.to_pandas() - self._copy_index_to_pandas(df) - return df + return self._copy_index_to_pandas(df) else: return None @@ -609,8 +626,7 @@ def to_pandas_batches( page_size: Optional[int] = None, max_results: Optional[int] = None, allow_large_results: Optional[bool] = None, - squeeze: Optional[bool] = False, - ): + ) -> Iterator[pd.DataFrame]: """Download results one message at a time. page_size and max_results determine the size and number of batches, @@ -621,43 +637,43 @@ def to_pandas_batches( use_explicit_destination=allow_large_results, ) - total_batches = 0 - for df in execute_result.to_pandas_batches( - page_size=page_size, max_results=max_results - ): - total_batches += 1 - self._copy_index_to_pandas(df) - if squeeze: - yield df.squeeze(axis=1) - else: - yield df - # To reduce the number of edge cases to consider when working with the # results of this, always return at least one DataFrame. See: # b/428918844. - if total_batches == 0: - df = pd.DataFrame( - { - col: pd.Series([], dtype=self.expr.get_column_type(col)) - for col in itertools.chain(self.value_columns, self.index_columns) - } - ) - self._copy_index_to_pandas(df) - yield df + empty_val = pd.DataFrame( + { + col: pd.Series([], dtype=self.expr.get_column_type(col)) + for col in itertools.chain(self.value_columns, self.index_columns) + } + ) + dfs = map( + lambda a: a[0], + itertools.zip_longest( + execute_result.to_pandas_batches(page_size, max_results), + [0], + fillvalue=empty_val, + ), + ) + dfs = iter(map(self._copy_index_to_pandas, dfs)) - def _copy_index_to_pandas(self, df: pd.DataFrame): - """Set the index on pandas DataFrame to match this block. + total_rows = execute_result.total_rows + if (total_rows is not None) and (max_results is not None): + total_rows = min(total_rows, max_results) - Warning: This method modifies ``df`` inplace. - """ + return PandasBatches(dfs, total_rows) + + def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame: + """Set the index on pandas DataFrame to match this block.""" # Note: If BigQuery DataFrame has null index, a default one will be created for the local materialization. + new_df = df.copy() if len(self.index_columns) > 0: - df.set_index(list(self.index_columns), inplace=True) + new_df.set_index(list(self.index_columns), inplace=True) # Pandas names is annotated as list[str] rather than the more # general Sequence[Label] that BigQuery DataFrames has. # See: https://github.com/pandas-dev/pandas-stubs/issues/804 - df.index.names = self.index.names # type: ignore - df.columns = self.column_labels + new_df.index.names = self.index.names # type: ignore + new_df.columns = self.column_labels + return new_df def _materialize_local( self, materialize_options: MaterializationOptions = MaterializationOptions() @@ -724,9 +740,7 @@ def _materialize_local( ) else: df = execute_result.to_pandas() - self._copy_index_to_pandas(df) - - return df, execute_result.query_job + return self._copy_index_to_pandas(df), execute_result.query_job def _downsample( self, total_rows: int, sampling_method: str, fraction: float, random_state @@ -1591,8 +1605,7 @@ def retrieve_repr_request_results( row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar() head_df = head_result.to_pandas() - self._copy_index_to_pandas(head_df) - return head_df, row_count, head_result.query_job + return self._copy_index_to_pandas(head_df), row_count, head_result.query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: expr, result_id = self._expr.promote_offsets() diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index 40037735d4..c31c122078 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -487,8 +487,14 @@ def compile_offsets(self, node: nodes.PromoteOffsetsNode): def compile_join(self, node: nodes.JoinNode): left = self.compile_node(node.left_child) right = self.compile_node(node.right_child) - left_on = [l_name.id.sql for l_name, _ in node.conditions] - right_on = [r_name.id.sql for _, r_name in node.conditions] + + left_on = [] + right_on = [] + for left_ex, right_ex in node.conditions: + left_ex, right_ex = lowering._coerce_comparables(left_ex, right_ex) + left_on.append(self.expr_compiler.compile_expression(left_ex)) + right_on.append(self.expr_compiler.compile_expression(right_ex)) + if node.type == "right": return self._ordered_join( right, left, "left", right_on, left_on, node.joins_nulls @@ -502,8 +508,8 @@ def _ordered_join( left_frame: pl.LazyFrame, right_frame: pl.LazyFrame, how: Literal["inner", "outer", "left", "cross"], - left_on: Sequence[str], - right_on: Sequence[str], + left_on: Sequence[pl.Expr], + right_on: Sequence[pl.Expr], join_nulls: bool, ): if how == "right": @@ -547,6 +553,11 @@ def compile_concat(self, node: nodes.ConcatNode): child_frames = [ frame.rename( {col: id.sql for col, id in zip(frame.columns, node.output_ids)} + ).cast( + { + field.id.sql: _bigframes_dtype_to_polars_dtype(field.dtype) + for field in node.fields + } ) for frame in child_frames ] diff --git a/bigframes/core/compile/sqlglot/aggregate_compiler.py b/bigframes/core/compile/sqlglot/aggregate_compiler.py new file mode 100644 index 0000000000..888b3756b5 --- /dev/null +++ b/bigframes/core/compile/sqlglot/aggregate_compiler.py @@ -0,0 +1,112 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import functools +import typing + +import sqlglot.expressions as sge + +from bigframes.core import expression, window_spec +from bigframes.core.compile.sqlglot.expressions import typed_expr +import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler +import bigframes.core.compile.sqlglot.sqlglot_ir as ir +import bigframes.operations as ops + + +def compile_aggregate( + aggregate: expression.Aggregation, + order_by: tuple[sge.Expression, ...], +) -> sge.Expression: + """Compiles BigFrames aggregation expression into SQLGlot expression.""" + if isinstance(aggregate, expression.NullaryAggregation): + return compile_nullary_agg(aggregate.op) + if isinstance(aggregate, expression.UnaryAggregation): + column = typed_expr.TypedExpr( + scalar_compiler.compile_scalar_expression(aggregate.arg), + aggregate.arg.output_type, + ) + if not aggregate.op.order_independent: + return compile_ordered_unary_agg(aggregate.op, column, order_by=order_by) + else: + return compile_unary_agg(aggregate.op, column) + elif isinstance(aggregate, expression.BinaryAggregation): + left = typed_expr.TypedExpr( + scalar_compiler.compile_scalar_expression(aggregate.left), + aggregate.left.output_type, + ) + right = typed_expr.TypedExpr( + scalar_compiler.compile_scalar_expression(aggregate.right), + aggregate.right.output_type, + ) + return compile_binary_agg(aggregate.op, left, right) + else: + raise ValueError(f"Unexpected aggregation: {aggregate}") + + +@functools.singledispatch +def compile_nullary_agg( + op: ops.aggregations.WindowOp, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + raise ValueError(f"Can't compile unrecognized operation: {op}") + + +@functools.singledispatch +def compile_binary_agg( + op: ops.aggregations.WindowOp, + left: typed_expr.TypedExpr, + right: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + raise ValueError(f"Can't compile unrecognized operation: {op}") + + +@functools.singledispatch +def compile_unary_agg( + op: ops.aggregations.WindowOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + raise ValueError(f"Can't compile unrecognized operation: {op}") + + +@functools.singledispatch +def compile_ordered_unary_agg( + op: ops.aggregations.WindowOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, + order_by: typing.Sequence[sge.Expression] = [], +) -> sge.Expression: + raise ValueError(f"Can't compile unrecognized operation: {op}") + + +@compile_unary_agg.register +def _( + op: ops.aggregations.SumOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + # Will be null if all inputs are null. Pandas defaults to zero sum though. + expr = _apply_window_if_present(sge.func("SUM", column.expr), window) + return sge.func("IFNULL", expr, ir._literal(0, column.dtype)) + + +def _apply_window_if_present( + value: sge.Expression, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + if window is not None: + raise NotImplementedError("Can't apply window to the expression.") + return value diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py index 93f072973c..1c5aaf50a8 100644 --- a/bigframes/core/compile/sqlglot/compiler.py +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -22,6 +22,7 @@ from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite from bigframes.core.compile import configs +import bigframes.core.compile.sqlglot.aggregate_compiler as aggregate_compiler from bigframes.core.compile.sqlglot.expressions import typed_expr import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler import bigframes.core.compile.sqlglot.sqlglot_ir as ir @@ -217,7 +218,7 @@ def compile_filter( self, node: nodes.FilterNode, child: ir.SQLGlotIR ) -> ir.SQLGlotIR: condition = scalar_compiler.compile_scalar_expression(node.predicate) - return child.filter(condition) + return child.filter(tuple([condition])) @_compile_node.register def compile_join( @@ -267,6 +268,37 @@ def compile_random_sample( ) -> ir.SQLGlotIR: return child.sample(node.fraction) + @_compile_node.register + def compile_aggregate( + self, node: nodes.AggregateNode, child: ir.SQLGlotIR + ) -> ir.SQLGlotIR: + ordering_cols = tuple( + sge.Ordered( + this=scalar_compiler.compile_scalar_expression( + ordering.scalar_expression + ), + desc=ordering.direction.is_ascending is False, + nulls_first=ordering.na_last is False, + ) + for ordering in node.order_by + ) + aggregations: tuple[tuple[str, sge.Expression], ...] = tuple( + (id.sql, aggregate_compiler.compile_aggregate(agg, order_by=ordering_cols)) + for agg, id in node.aggregations + ) + by_cols: tuple[sge.Expression, ...] = tuple( + scalar_compiler.compile_scalar_expression(by_col) + for by_col in node.by_column_ids + ) + + dropna_cols = [] + if node.dropna: + for key, by_col in zip(node.by_column_ids, by_cols): + if node.child.field_by_id[key.id].nullable: + dropna_cols.append(by_col) + + return child.aggregate(aggregations, by_cols, tuple(dropna_cols)) + def _replace_unsupported_ops(node: nodes.BigFrameNode): node = nodes.bottom_up(node, rewrite.rewrite_slice) diff --git a/bigframes/core/compile/sqlglot/sqlglot_ir.py b/bigframes/core/compile/sqlglot/sqlglot_ir.py index c0bed4090c..b194fe9e5d 100644 --- a/bigframes/core/compile/sqlglot/sqlglot_ir.py +++ b/bigframes/core/compile/sqlglot/sqlglot_ir.py @@ -15,6 +15,7 @@ from __future__ import annotations import dataclasses +import functools import typing from google.cloud import bigquery @@ -25,11 +26,9 @@ import sqlglot.expressions as sge from bigframes import dtypes -from bigframes.core import guid, utils +from bigframes.core import guid, local_data, schema, utils from bigframes.core.compile.sqlglot.expressions import typed_expr import bigframes.core.compile.sqlglot.sqlglot_types as sgt -import bigframes.core.local_data as local_data -import bigframes.core.schema as bf_schema # shapely.wkt.dumps was moved to shapely.io.to_wkt in 2.0. try: @@ -68,7 +67,7 @@ def sql(self) -> str: def from_pyarrow( cls, pa_table: pa.Table, - schema: bf_schema.ArraySchema, + schema: schema.ArraySchema, uid_gen: guid.SequentialUIDGenerator, ) -> SQLGlotIR: """Builds SQLGlot expression from a pyarrow table. @@ -280,9 +279,13 @@ def limit( def filter( self, - condition: sge.Expression, + conditions: tuple[sge.Expression, ...], ) -> SQLGlotIR: """Filters the query by adding a WHERE clause.""" + condition = _and(conditions) + if condition is None: + return SQLGlotIR(expr=self.expr.copy(), uid_gen=self.uid_gen) + new_expr = _select_to_cte( self.expr, sge.to_identifier( @@ -316,10 +319,11 @@ def join( right_ctes = right_select.args.pop("with", []) merged_ctes = [*left_ctes, *right_ctes] - join_conditions = [ - _join_condition(left, right, joins_nulls) for left, right in conditions - ] - join_on = sge.And(expressions=join_conditions) if join_conditions else None + join_on = _and( + tuple( + _join_condition(left, right, joins_nulls) for left, right in conditions + ) + ) join_type_str = join_type if join_type != "outer" else "full outer" new_expr = ( @@ -364,6 +368,47 @@ def sample(self, fraction: float) -> SQLGlotIR: ).where(condition, append=False) return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen) + def aggregate( + self, + aggregations: tuple[tuple[str, sge.Expression], ...], + by_cols: tuple[sge.Expression, ...], + dropna_cols: tuple[sge.Expression, ...], + ) -> SQLGlotIR: + """Applies the aggregation expressions. + + Args: + aggregations: output_column_id, aggregation_expr tuples + by_cols: column expressions for aggregation + dropna_cols: columns whether null keys should be dropped + """ + aggregations_expr = [ + sge.Alias( + this=expr, + alias=sge.to_identifier(id, quoted=self.quoted), + ) + for id, expr in aggregations + ] + + new_expr = _select_to_cte( + self.expr, + sge.to_identifier( + next(self.uid_gen.get_uid_stream("bfcte_")), quoted=self.quoted + ), + ) + new_expr = new_expr.group_by(*by_cols).select( + *[*by_cols, *aggregations_expr], append=False + ) + + condition = _and( + tuple( + sg.not_(sge.Is(this=drop_col, expression=sge.Null())) + for drop_col in dropna_cols + ) + ) + if condition is not None: + new_expr = new_expr.where(condition, append=False) + return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen) + def insert( self, destination: bigquery.TableReference, @@ -552,6 +597,16 @@ def _table(table: bigquery.TableReference) -> sge.Table: ) +def _and(conditions: tuple[sge.Expression, ...]) -> typing.Optional[sge.Expression]: + """Chains multiple expressions together using a logical AND.""" + if not conditions: + return None + + return functools.reduce( + lambda left, right: sge.And(this=left, expression=right), conditions + ) + + def _join_condition( left: typed_expr.TypedExpr, right: typed_expr.TypedExpr, diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index f653b8700b..b442f87aec 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -16,8 +16,9 @@ from __future__ import annotations +import functools import typing -from typing import Hashable, Literal, Optional, overload, Sequence, Union +from typing import cast, Hashable, Literal, Optional, overload, Sequence, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index @@ -86,6 +87,8 @@ def __new__( pd_df = pandas.DataFrame(index=data) block = df.DataFrame(pd_df, session=session)._block else: + if isinstance(dtype, str) and dtype.lower() == "json": + dtype = bigframes.dtypes.JSON_DTYPE pd_index = pandas.Index(data=data, dtype=dtype, name=name) pd_df = pandas.DataFrame(index=pd_index) block = df.DataFrame(pd_df, session=session)._block @@ -527,6 +530,29 @@ def isin(self, values) -> Index: ) ).fillna(value=False) + def __contains__(self, key) -> bool: + hash(key) # to throw for unhashable values + if self.nlevels == 0: + return False + + if (not isinstance(key, tuple)) or (self.nlevels == 1): + key = (key,) + + match_exprs = [] + for key_part, index_col, dtype in zip( + key, self._block.index_columns, self._block.index.dtypes + ): + key_type = bigframes.dtypes.is_compatible(key_part, dtype) + if key_type is None: + return False + key_expr = ex.const(key_part, key_type) + match_expr = ops.eq_null_match_op.as_expr(ex.deref(index_col), key_expr) + match_exprs.append(match_expr) + + match_expr_final = functools.reduce(ops.and_op.as_expr, match_exprs) + block, match_col = self._block.project_expr(match_expr_final) + return cast(bool, block.get_stat(match_col, agg_ops.AnyOp())) + def _apply_unary_expr( self, op: ex.Expression, diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 205621fee2..cf6e8a7e5c 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -161,7 +161,7 @@ def is_noop(self) -> bool: return ( ((not self.start) or (self.start == 0)) and (self.step == 1) - and ((self.stop is None) or (self.stop == self.row_count)) + and ((self.stop is None) or (self.stop == self.child.row_count)) ) @property @@ -424,7 +424,7 @@ def remap_refs( @dataclasses.dataclass(frozen=True, eq=False) class ConcatNode(BigFrameNode): - # TODO: Explcitly map column ids from each child + # TODO: Explcitly map column ids from each child? children: Tuple[BigFrameNode, ...] output_ids: Tuple[identifiers.ColumnId, ...] diff --git a/bigframes/core/rewrite/schema_binding.py b/bigframes/core/rewrite/schema_binding.py index af0593211c..40a00ff8f6 100644 --- a/bigframes/core/rewrite/schema_binding.py +++ b/bigframes/core/rewrite/schema_binding.py @@ -13,6 +13,7 @@ # limitations under the License. import dataclasses +import typing from bigframes.core import bigframe_node from bigframes.core import expression as ex @@ -65,4 +66,49 @@ def bind_schema_to_node( conditions=conditions, ) + if isinstance(node, nodes.AggregateNode): + aggregations = [] + for aggregation, id in node.aggregations: + if isinstance(aggregation, ex.UnaryAggregation): + replaced = typing.cast( + ex.Aggregation, + dataclasses.replace( + aggregation, + arg=typing.cast( + ex.RefOrConstant, + ex.bind_schema_fields( + aggregation.arg, node.child.field_by_id + ), + ), + ), + ) + aggregations.append((replaced, id)) + elif isinstance(aggregation, ex.BinaryAggregation): + replaced = typing.cast( + ex.Aggregation, + dataclasses.replace( + aggregation, + left=typing.cast( + ex.RefOrConstant, + ex.bind_schema_fields( + aggregation.left, node.child.field_by_id + ), + ), + right=typing.cast( + ex.RefOrConstant, + ex.bind_schema_fields( + aggregation.right, node.child.field_by_id + ), + ), + ), + ) + aggregations.append((replaced, id)) + else: + aggregations.append((aggregation, id)) + + return dataclasses.replace( + node, + aggregations=tuple(aggregations), + ) + return node diff --git a/bigframes/core/tools/datetimes.py b/bigframes/core/tools/datetimes.py index 26afdc7910..7edf2fa2e4 100644 --- a/bigframes/core/tools/datetimes.py +++ b/bigframes/core/tools/datetimes.py @@ -13,7 +13,7 @@ # limitations under the License. from collections.abc import Mapping -from datetime import datetime +from datetime import date, datetime from typing import Optional, Union import bigframes_vendored.constants as constants @@ -28,7 +28,7 @@ def to_datetime( arg: Union[ - Union[int, float, str, datetime], + Union[int, float, str, datetime, date], vendored_pandas_datetimes.local_iterables, bigframes.series.Series, bigframes.dataframe.DataFrame, @@ -38,7 +38,7 @@ def to_datetime( format: Optional[str] = None, unit: Optional[str] = None, ) -> Union[pd.Timestamp, datetime, bigframes.series.Series]: - if isinstance(arg, (int, float, str, datetime)): + if isinstance(arg, (int, float, str, datetime, date)): return pd.to_datetime( arg, utc=utc, @@ -62,7 +62,11 @@ def to_datetime( f"Unit parameter is not supported for non-numerical input types. {constants.FEEDBACK_LINK}" ) - if arg.dtype in (bigframes.dtypes.TIMESTAMP_DTYPE, bigframes.dtypes.DATETIME_DTYPE): + if arg.dtype in ( + bigframes.dtypes.TIMESTAMP_DTYPE, + bigframes.dtypes.DATETIME_DTYPE, + bigframes.dtypes.DATE_DTYPE, + ): to_type = ( bigframes.dtypes.TIMESTAMP_DTYPE if utc else bigframes.dtypes.DATETIME_DTYPE ) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 1884f0beff..7de4bdbc91 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -196,6 +196,9 @@ def __init__( block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype)) else: + if isinstance(dtype, str) and dtype.lower() == "json": + dtype = bigframes.dtypes.JSON_DTYPE + import bigframes.pandas pd_dataframe = pandas.DataFrame( @@ -371,6 +374,9 @@ def __len__(self): def __iter__(self): return iter(self.columns) + def __contains__(self, key) -> bool: + return key in self.columns + def astype( self, dtype: Union[ @@ -776,22 +782,7 @@ def _repr_html_(self) -> str: if opts.repr_mode == "deferred": return formatter.repr_query_job(self._compute_dry_run()) - if opts.repr_mode == "anywidget": - import anywidget # type: ignore - - # create an iterator for the data batches - batches = self.to_pandas_batches() - - # get the first page result - try: - first_page = next(iter(batches)) - except StopIteration: - first_page = pandas.DataFrame(columns=self.columns) - - # Instantiate and return the widget. The widget's frontend will - # handle the display of the table and pagination - return anywidget.AnyWidget(dataframe=first_page) - + # Process blob columns first, regardless of display mode self._cached() df = self.copy() if bigframes.options.display.blob_display: @@ -803,7 +794,31 @@ def _repr_html_(self) -> str: for col in blob_cols: # TODO(garrettwu): Not necessary to get access urls for all the rows. Update when having a to get URLs from local data. df[col] = df[col].blob._get_runtime(mode="R", with_metadata=True) + else: + blob_cols = [] + + if opts.repr_mode == "anywidget": + try: + from IPython.display import display as ipython_display + + from bigframes import display + + # Always create a new widget instance for each display call + # This ensures that each cell gets its own widget and prevents + # unintended sharing between cells + widget = display.TableWidget(df.copy()) + ipython_display(widget) + return "" # Return empty string since we used display() + + except (AttributeError, ValueError, ImportError): + # Fallback if anywidget is not available + warnings.warn( + "Anywidget mode is not available. Please `pip install anywidget traitlets` or `pip install 'bigframes[anywidget]'` to use interactive tables. Falling back to deferred mode." + ) + return formatter.repr_query_job(self._compute_dry_run()) + + # Continue with regular HTML rendering for non-anywidget modes # TODO(swast): pass max_columns and get the true column count back. Maybe # get 1 more column than we have requested so that pandas can add the # ... for us? @@ -812,7 +827,6 @@ def _repr_html_(self) -> str: ) self._set_internal_query_job(query_job) - column_count = len(pandas_df.columns) with display_options.pandas_repr(opts): @@ -1046,14 +1060,17 @@ def radd( ) -> DataFrame: # TODO(swast): Support fill_value parameter. # TODO(swast): Support level parameter with MultiIndex. - return self.add(other, axis=axis) + return self._apply_binop(other, ops.add_op, axis=axis, reverse=True) def __add__(self, other) -> DataFrame: return self.add(other) __add__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__add__) - __radd__ = __add__ + def __radd__(self, other) -> DataFrame: + return self.radd(other) + + __radd__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__radd__) def sub( self, @@ -2802,6 +2819,7 @@ def dropna( *, axis: int | str = 0, how: str = "any", + thresh: typing.Optional[int] = None, subset: typing.Union[None, blocks.Label, Sequence[blocks.Label]] = None, inplace: bool = False, ignore_index=False, @@ -2810,8 +2828,18 @@ def dropna( raise NotImplementedError( f"'inplace'=True not supported. {constants.FEEDBACK_LINK}" ) - if how not in ("any", "all"): - raise ValueError("'how' must be one of 'any', 'all'") + + # Check if both thresh and how are explicitly provided + if thresh is not None: + # cannot specify both thresh and how parameters + if how != "any": + raise TypeError( + "You cannot set both the how and thresh arguments at the same time." + ) + else: + # Only validate 'how' when thresh is not provided + if how not in ("any", "all"): + raise ValueError("'how' must be one of 'any', 'all'") axis_n = utils.get_axis_number(axis) @@ -2833,21 +2861,38 @@ def dropna( for id_ in self._block.label_to_col_id[label] ] - result = block_ops.dropna(self._block, self._block.value_columns, how=how, subset=subset_ids) # type: ignore + result = block_ops.dropna( + self._block, + self._block.value_columns, + how=how, + thresh=thresh, + subset=subset_ids, + ) # type: ignore if ignore_index: result = result.reset_index() return DataFrame(result) else: - isnull_block = self._block.multi_apply_unary_op(ops.isnull_op) - if how == "any": - null_locations = DataFrame(isnull_block).any().to_pandas() - else: # 'all' - null_locations = DataFrame(isnull_block).all().to_pandas() - keep_columns = [ - col - for col, to_drop in zip(self._block.value_columns, null_locations) - if not to_drop - ] + if thresh is not None: + # Keep columns with at least 'thresh' non-null values + notnull_block = self._block.multi_apply_unary_op(ops.notnull_op) + notnull_counts = DataFrame(notnull_block).sum().to_pandas() + + keep_columns = [ + col + for col, count in zip(self._block.value_columns, notnull_counts) + if count >= thresh + ] + else: + isnull_block = self._block.multi_apply_unary_op(ops.isnull_op) + if how == "any": + null_locations = DataFrame(isnull_block).any().to_pandas() + else: # 'all' + null_locations = DataFrame(isnull_block).all().to_pandas() + keep_columns = [ + col + for col, to_drop in zip(self._block.value_columns, null_locations) + if not to_drop + ] return DataFrame(self._block.select_columns(keep_columns)) def any( @@ -3389,15 +3434,9 @@ def merge( ) return DataFrame(result_block) - if on is None: - if left_on is None or right_on is None: - raise ValueError("Must specify `on` or `left_on` + `right_on`.") - else: - if left_on is not None or right_on is not None: - raise ValueError( - "Can not pass both `on` and `left_on` + `right_on` params." - ) - left_on, right_on = on, on + left_on, right_on = self._validate_left_right_on( + right, on, left_on=left_on, right_on=right_on + ) if utils.is_list_like(left_on): left_on = list(left_on) # type: ignore @@ -3434,6 +3473,41 @@ def merge( ) return DataFrame(block) + def _validate_left_right_on( + self, + right: DataFrame, + on: Union[blocks.Label, Sequence[blocks.Label], None] = None, + *, + left_on: Union[blocks.Label, Sequence[blocks.Label], None] = None, + right_on: Union[blocks.Label, Sequence[blocks.Label], None] = None, + ): + if on is not None: + if left_on is not None or right_on is not None: + raise ValueError( + "Can not pass both `on` and `left_on` + `right_on` params." + ) + return on, on + + if left_on is not None and right_on is not None: + return left_on, right_on + + left_cols = self.columns + right_cols = right.columns + common_cols = left_cols.intersection(right_cols) + if len(common_cols) == 0: + raise ValueError( + "No common columns to perform merge on." + f"Merge options: left_on={left_on}, " + f"right_on={right_on}, " + ) + if ( + not left_cols.join(common_cols, how="inner").is_unique + or not right_cols.join(common_cols, how="inner").is_unique + ): + raise ValueError(f"Data columns not unique: {repr(common_cols)}") + + return common_cols, common_cols + def join( self, other: Union[DataFrame, bigframes.series.Series], diff --git a/bigframes/display/__init__.py b/bigframes/display/__init__.py new file mode 100644 index 0000000000..48e52bc766 --- /dev/null +++ b/bigframes/display/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +try: + import anywidget # noqa + + from bigframes.display.anywidget import TableWidget + + __all__ = ["TableWidget"] +except Exception: + pass diff --git a/bigframes/display/anywidget.py b/bigframes/display/anywidget.py new file mode 100644 index 0000000000..04d82c97fe --- /dev/null +++ b/bigframes/display/anywidget.py @@ -0,0 +1,179 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from importlib import resources +import functools +import math +from typing import Any, Dict, Iterator, List, Optional, Type +import uuid + +import pandas as pd + +import bigframes + +# anywidget and traitlets are optional dependencies. We don't want the import of this +# module to fail if they aren't installed, though. Instead, we try to limit the surface that +# these packages could affect. This makes unit testing easier and ensures we don't +# accidentally make these required packages. +try: + import anywidget + import traitlets + + ANYWIDGET_INSTALLED = True +except Exception: + ANYWIDGET_INSTALLED = False + +WIDGET_BASE: Type[Any] +if ANYWIDGET_INSTALLED: + WIDGET_BASE = anywidget.AnyWidget +else: + WIDGET_BASE = object + + +class TableWidget(WIDGET_BASE): + """ + An interactive, paginated table widget for BigFrames DataFrames. + """ + + def __init__(self, dataframe: bigframes.dataframe.DataFrame): + """Initialize the TableWidget. + + Args: + dataframe: The Bigframes Dataframe to display in the widget. + """ + if not ANYWIDGET_INSTALLED: + raise ImportError( + "Please `pip install anywidget traitlets` or `pip install 'bigframes[anywidget]'` to use TableWidget." + ) + + super().__init__() + self._dataframe = dataframe + + # respect display options + self.page_size = bigframes.options.display.max_rows + + # Initialize data fetching attributes. + self._batches = dataframe.to_pandas_batches(page_size=self.page_size) + + # Use list of DataFrames to avoid memory copies from concatenation + self._cached_batches: List[pd.DataFrame] = [] + + # Unique identifier for HTML table element + self._table_id = str(uuid.uuid4()) + self._all_data_loaded = False + # Renamed from _batch_iterator to _batch_iter to avoid naming conflict + self._batch_iter: Optional[Iterator[pd.DataFrame]] = None + + # len(dataframe) is expensive, since it will trigger a + # SELECT COUNT(*) query. It is a must have however. + # TODO(b/428238610): Start iterating over the result of `to_pandas_batches()` + # before we get here so that the count might already be cached. + self.row_count = len(dataframe) + + # get the initial page + self._set_table_html() + + @functools.cached_property + def _esm(self): + """Load JavaScript code from external file.""" + return resources.read_text(bigframes.display, "table_widget.js") + + page = traitlets.Int(0).tag(sync=True) + page_size = traitlets.Int(25).tag(sync=True) + row_count = traitlets.Int(0).tag(sync=True) + table_html = traitlets.Unicode().tag(sync=True) + + @traitlets.validate("page") + def _validate_page(self, proposal: Dict[str, Any]): + """Validate and clamp the page number to a valid range. + + Args: + proposal: A dictionary from the traitlets library containing the + proposed change. The new value is in proposal["value"]. + """ + + value = proposal["value"] + if self.row_count == 0 or self.page_size == 0: + return 0 + + # Calculate the zero-indexed maximum page number. + max_page = max(0, math.ceil(self.row_count / self.page_size) - 1) + + # Clamp the proposed value to the valid range [0, max_page]. + return max(0, min(value, max_page)) + + def _get_next_batch(self) -> bool: + """ + Gets the next batch of data from the generator and appends to cache. + + Return: + True if a batch was successfully loaded, False otherwise. + """ + if self._all_data_loaded: + return False + + try: + iterator = self._batch_iterator + batch = next(iterator) + self._cached_batches.append(batch) + return True + except StopIteration: + self._all_data_loaded = True + return False + + @property + def _batch_iterator(self) -> Iterator[pd.DataFrame]: + """Lazily initializes and returns the batch iterator.""" + if self._batch_iter is None: + self._batch_iter = iter(self._batches) + return self._batch_iter + + @property + def _cached_data(self) -> pd.DataFrame: + """Combine all cached batches into a single DataFrame.""" + if not self._cached_batches: + return pd.DataFrame(columns=self._dataframe.columns) + return pd.concat(self._cached_batches, ignore_index=True) + + def _set_table_html(self): + """Sets the current html data based on the current page and page size.""" + start = self.page * self.page_size + end = start + self.page_size + + # fetch more data if the requested page is outside our cache + cached_data = self._cached_data + while len(cached_data) < end and not self._all_data_loaded: + if self._get_next_batch(): + cached_data = self._cached_data + else: + break + + # Get the data for the current page + page_data = cached_data.iloc[start:end] + + # Generate HTML table + self.table_html = page_data.to_html( + index=False, + max_rows=None, + table_id=f"table-{self._table_id}", + classes="table table-striped table-hover", + escape=False, + ) + + @traitlets.observe("page") + def _page_changed(self, change): + """Handler for when the page number is changed from the frontend.""" + self._set_table_html() diff --git a/bigframes/display/table_widget.js b/bigframes/display/table_widget.js new file mode 100644 index 0000000000..71484af4d5 --- /dev/null +++ b/bigframes/display/table_widget.js @@ -0,0 +1,95 @@ +/** + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const ModelProperty = { + TABLE_HTML: "table_html", + ROW_COUNT: "row_count", + PAGE_SIZE: "page_size", + PAGE: "page", +}; + +const Event = { + CHANGE_TABLE_HTML: `change:${ModelProperty.TABLE_HTML}`, + CLICK: "click", +}; + +/** + * Renders a paginated table and its controls into a given element. + * @param {{ + * model: !Backbone.Model, + * el: !HTMLElement + * }} options + */ +function render({ model, el }) { + const container = document.createElement("div"); + container.innerHTML = model.get(ModelProperty.TABLE_HTML); + + const buttonContainer = document.createElement("div"); + const prevPage = document.createElement("button"); + const label = document.createElement("span"); + const nextPage = document.createElement("button"); + + prevPage.type = "button"; + nextPage.type = "button"; + prevPage.textContent = "Prev"; + nextPage.textContent = "Next"; + + /** Updates the button states and page label based on the model. */ + function updateButtonStates() { + const totalPages = Math.ceil( + model.get(ModelProperty.ROW_COUNT) / model.get(ModelProperty.PAGE_SIZE), + ); + const currentPage = model.get(ModelProperty.PAGE); + + label.textContent = `Page ${currentPage + 1} of ${totalPages}`; + prevPage.disabled = currentPage === 0; + nextPage.disabled = currentPage >= totalPages - 1; + } + + /** + * Updates the page in the model. + * @param {number} direction -1 for previous, 1 for next. + */ + function handlePageChange(direction) { + const currentPage = model.get(ModelProperty.PAGE); + const newPage = Math.max(0, currentPage + direction); + if (newPage !== currentPage) { + model.set(ModelProperty.PAGE, newPage); + model.save_changes(); + } + } + + prevPage.addEventListener(Event.CLICK, () => handlePageChange(-1)); + nextPage.addEventListener(Event.CLICK, () => handlePageChange(1)); + + model.on(Event.CHANGE_TABLE_HTML, () => { + // Note: Using innerHTML can be a security risk if the content is + // user-generated. Ensure 'table_html' is properly sanitized. + container.innerHTML = model.get(ModelProperty.TABLE_HTML); + updateButtonStates(); + }); + + // Initial setup + updateButtonStates(); + + buttonContainer.appendChild(prevPage); + buttonContainer.appendChild(label); + buttonContainer.appendChild(nextPage); + el.appendChild(container); + el.appendChild(buttonContainer); +} + +export default { render }; diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index c316d28321..f2bbcb3320 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -121,6 +121,8 @@ def __init__( bf_dtype = bigframes.dtypes.bigframes_type(dtype) block = block.multi_apply_unary_op(ops.AsTypeOp(to_type=bf_dtype)) else: + if isinstance(dtype, str) and dtype.lower() == "json": + dtype = bigframes.dtypes.JSON_DTYPE pd_series = pd.Series( data=data, index=index, # type:ignore diff --git a/bigframes/operations/datetime_ops.py b/bigframes/operations/datetime_ops.py index 7c760b689b..6f44952488 100644 --- a/bigframes/operations/datetime_ops.py +++ b/bigframes/operations/datetime_ops.py @@ -50,6 +50,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.FLOAT_DTYPE, dtypes.INT_DTYPE, dtypes.STRING_DTYPE, + dtypes.DATE_DTYPE, ): raise TypeError("expected string or numeric input") return pd.ArrowDtype(pa.timestamp("us", tz=None)) @@ -67,6 +68,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.FLOAT_DTYPE, dtypes.INT_DTYPE, dtypes.STRING_DTYPE, + dtypes.DATE_DTYPE, ): raise TypeError("expected string or numeric input") return pd.ArrowDtype(pa.timestamp("us", tz="UTC")) diff --git a/bigframes/operations/plotting.py b/bigframes/operations/plotting.py index e9a86be6c9..a741ed5dd9 100644 --- a/bigframes/operations/plotting.py +++ b/bigframes/operations/plotting.py @@ -17,9 +17,11 @@ import bigframes_vendored.constants as constants import bigframes_vendored.pandas.plotting._core as vendordt +from bigframes.core import log_adapter import bigframes.operations._matplotlib as bfplt +@log_adapter.class_logger class PlotAccessor(vendordt.PlotAccessor): __doc__ = vendordt.PlotAccessor.__doc__ diff --git a/bigframes/series.py b/bigframes/series.py index ebc2913f78..3a1af0bb1d 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -257,6 +257,9 @@ def __iter__(self) -> typing.Iterator: map(lambda x: x.squeeze(axis=1), self._block.to_pandas_batches()) ) + def __contains__(self, key) -> bool: + return key in self.index + def copy(self) -> Series: return Series(self._block) @@ -648,13 +651,12 @@ def to_pandas_batches( form the original Series. Results stream from bigquery, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable """ - df = self._block.to_pandas_batches( + batches = self._block.to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, - squeeze=True, ) - return df + return map(lambda df: cast(pandas.Series, df.squeeze(1)), batches) def _compute_dry_run(self) -> bigquery.QueryJob: _, query_job = self._block._compute_dry_run((self._value_column,)) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 9d113743cf..2c9dea2d19 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -339,13 +339,15 @@ def bytes_processed_sum(self): @property def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" - msg = bfe.format_message( - "Queries executed with `allow_large_results=False` within the session will not " - "have their slot milliseconds counted in this sum. If you need precise slot " - "milliseconds information, query the `INFORMATION_SCHEMA` tables " - "to get relevant metrics.", - ) - warnings.warn(msg, UserWarning) + if not bigframes.options._allow_large_results: + msg = bfe.format_message( + "Queries executed with `allow_large_results=False` within the session will not " + "have their slot milliseconds counted in this sum. If you need precise slot " + "milliseconds information, query the `INFORMATION_SCHEMA` tables " + "to get relevant metrics.", + ) + warnings.warn(msg, UserWarning) + return self._metrics.slot_millis @property diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 6750652bc2..a970e75a0f 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -100,6 +100,7 @@ def cache_results_table( original_root: nodes.BigFrameNode, table: bigquery.Table, ordering: order.RowOrdering, + num_rows: Optional[int] = None, ): # Assumption: GBQ cached table uses field name as bq column name scan_list = nodes.ScanList( @@ -112,7 +113,7 @@ def cache_results_table( source=nodes.BigqueryDataSource( nodes.GbqTable.from_table(table), ordering=ordering, - n_rows=table.num_rows, + n_rows=num_rows, ), scan_list=scan_list, table_session=original_root.session, @@ -468,14 +469,16 @@ def _cache_with_cluster_cols( plan, sort_rows=False, materialize_all_order_keys=True ) ) - tmp_table_ref = self._sql_as_cached_temp_table( + tmp_table_ref, num_rows = self._sql_as_cached_temp_table( compiled.sql, compiled.sql_schema, cluster_cols=bq_io.select_cluster_cols(compiled.sql_schema, cluster_cols), ) tmp_table = self.bqclient.get_table(tmp_table_ref) assert compiled.row_order is not None - self.cache.cache_results_table(array_value.node, tmp_table, compiled.row_order) + self.cache.cache_results_table( + array_value.node, tmp_table, compiled.row_order, num_rows=num_rows + ) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" @@ -487,14 +490,16 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): sort_rows=False, ) ) - tmp_table_ref = self._sql_as_cached_temp_table( + tmp_table_ref, num_rows = self._sql_as_cached_temp_table( compiled.sql, compiled.sql_schema, cluster_cols=[offset_column], ) tmp_table = self.bqclient.get_table(tmp_table_ref) assert compiled.row_order is not None - self.cache.cache_results_table(array_value.node, tmp_table, compiled.row_order) + self.cache.cache_results_table( + array_value.node, tmp_table, compiled.row_order, num_rows=num_rows + ) def _cache_with_session_awareness( self, @@ -552,7 +557,7 @@ def _sql_as_cached_temp_table( sql: str, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str], - ) -> bigquery.TableReference: + ) -> tuple[bigquery.TableReference, Optional[int]]: assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS temp_table = self.storage_manager.create_temp_table(schema, cluster_cols) @@ -567,8 +572,8 @@ def _sql_as_cached_temp_table( job_config=job_config, ) assert query_job is not None - query_job.result() - return query_job.destination + iter = query_job.result() + return query_job.destination, iter.total_rows def _validate_result_schema( self, diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py index 1d46192ac3..ff91747a62 100644 --- a/bigframes/session/direct_gbq_execution.py +++ b/bigframes/session/direct_gbq_execution.py @@ -13,13 +13,14 @@ # limitations under the License. from __future__ import annotations -from typing import Optional, Tuple +from typing import Literal, Optional, Tuple from google.cloud import bigquery import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table from bigframes.core import compile, nodes +from bigframes.core.compile import sqlglot from bigframes.session import executor, semi_executor import bigframes.session._io.bigquery as bq_io @@ -29,8 +30,15 @@ # or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a # reference for validating more complex executors. class DirectGbqExecutor(semi_executor.SemiExecutor): - def __init__(self, bqclient: bigquery.Client): + def __init__( + self, bqclient: bigquery.Client, compiler: Literal["ibis", "sqlglot"] = "ibis" + ): self.bqclient = bqclient + self._compile_fn = ( + compile.compile_sql + if compiler == "ibis" + else sqlglot.SQLGlotCompiler()._compile_sql + ) def execute( self, @@ -42,9 +50,10 @@ def execute( # TODO(swast): plumb through the api_name of the user-facing api that # caused this query. - compiled = compile.compile_sql( + compiled = self._compile_fn( compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) ) + iterator, query_job = self._run_execute_query( sql=compiled.sql, ) diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py index 28ab421905..3c23e4c200 100644 --- a/bigframes/session/polars_executor.py +++ b/bigframes/session/polars_executor.py @@ -36,6 +36,8 @@ nodes.SliceNode, nodes.AggregateNode, nodes.FilterNode, + nodes.ConcatNode, + nodes.JoinNode, ) _COMPATIBLE_SCALAR_OPS = ( diff --git a/bigframes/version.py b/bigframes/version.py index 4d26fb9b8c..9e7a386601 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.10.0" +__version__ = "2.11.0" # {x-release-please-start-date} -__release_date__ = "2025-07-08" +__release_date__ = "2025-07-15" # {x-release-please-end} diff --git a/notebooks/dataframes/anywidget_mode.ipynb b/notebooks/dataframes/anywidget_mode.ipynb index c54f52da59..072e5c6504 100644 --- a/notebooks/dataframes/anywidget_mode.ipynb +++ b/notebooks/dataframes/anywidget_mode.ipynb @@ -63,7 +63,7 @@ "id": "0a354c69", "metadata": {}, "source": [ - "Display the dataframe in anywidget mode" + "Load Sample Data" ] }, { @@ -75,7 +75,7 @@ { "data": { "text/html": [ - "Query job 91997f19-1768-4360-afa7-4a431b3e2d22 is DONE. 0 Bytes processed. Open Job" + "Query job 0b22b0f5-b952-4546-a969-41a89e343e9b is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -123,6 +123,193 @@ "test_series = df[\"year\"]\n", "print(test_series)" ] + }, + { + "cell_type": "markdown", + "id": "7bcf1bb7", + "metadata": {}, + "source": [ + "Display with Pagination" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "ce250157", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job 8e57da45-b6a7-44fb-8c4f-4b87058d94cb is DONE. 171.4 MB processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "4d00aaf284984cbc97483c651b9c5110", + "version_major": 2, + "version_minor": 1 + }, + "text/plain": [ + "TableWidget(row_count=5552452, table_html='=0.9.18", + "traitlets>=5.0.0", ], } extras["all"] = list(sorted(frozenset(itertools.chain.from_iterable(extras.values())))) diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index dff245d176..155d4388a4 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -33,3 +33,6 @@ pytz==2022.7 toolz==0.11 typing-extensions==4.5.0 rich==12.4.4 +# For anywidget mode +anywidget>=0.9.18 +traitlets==5.0.0 diff --git a/tests/benchmark/read_gbq_colab/filter_output.py b/tests/benchmark/read_gbq_colab/filter_output.py index 5e872bb727..0db7ac5fd6 100644 --- a/tests/benchmark/read_gbq_colab/filter_output.py +++ b/tests/benchmark/read_gbq_colab/filter_output.py @@ -14,7 +14,6 @@ import pathlib import benchmark.utils as utils -import pytest import bigframes.session @@ -40,11 +39,8 @@ def filter_output( # It's possible we don't have any pages at all, since we filtered out all # matching rows. - if rows == 0: - with pytest.raises(StopIteration): - next(iter(df_filtered.to_pandas_batches(page_size=PAGE_SIZE))) - else: - next(iter(df_filtered.to_pandas_batches(page_size=PAGE_SIZE))) + first_page = next(iter(df_filtered.to_pandas_batches(page_size=PAGE_SIZE))) + assert len(first_page.index) <= rows if __name__ == "__main__": diff --git a/tests/system/small/engines/conftest.py b/tests/system/small/engines/conftest.py index 249bd59260..4f0f875b34 100644 --- a/tests/system/small/engines/conftest.py +++ b/tests/system/small/engines/conftest.py @@ -44,7 +44,7 @@ def fake_session() -> Generator[bigframes.Session, None, None]: yield session -@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq"]) +@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"]) def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor: if request.param == "pyarrow": return local_scan_executor.LocalScanExecutor() @@ -52,6 +52,10 @@ def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecu return polars_executor.PolarsExecutor() if request.param == "bq": return direct_gbq_execution.DirectGbqExecutor(bigquery_client) + if request.param == "bq-sqlglot": + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, compiler="sqlglot" + ) raise ValueError(f"Unrecognized param: {request.param}") diff --git a/tests/system/small/engines/test_concat.py b/tests/system/small/engines/test_concat.py new file mode 100644 index 0000000000..e10570fab2 --- /dev/null +++ b/tests/system/small/engines/test_concat.py @@ -0,0 +1,51 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from bigframes.core import array_value, ordering +from bigframes.session import polars_executor +from bigframes.testing.engine_utils import assert_equivalence_execution + +pytest.importorskip("polars") + +# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree. +REFERENCE_ENGINE = polars_executor.PolarsExecutor() + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +def test_engines_concat_self( + scalars_array_value: array_value.ArrayValue, + engine, +): + result = scalars_array_value.concat([scalars_array_value, scalars_array_value]) + + assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine) + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +def test_engines_concat_filtered_sorted( + scalars_array_value: array_value.ArrayValue, + engine, +): + input_1 = scalars_array_value.select_columns(["float64_col", "int64_col"]).order_by( + [ordering.ascending_over("int64_col")] + ) + input_2 = scalars_array_value.filter_by_id("bool_col").select_columns( + ["float64_col", "int64_too"] + ) + + result = input_1.concat([input_2, input_1, input_2]) + + assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine) diff --git a/tests/system/small/engines/test_join.py b/tests/system/small/engines/test_join.py new file mode 100644 index 0000000000..402a41134b --- /dev/null +++ b/tests/system/small/engines/test_join.py @@ -0,0 +1,90 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Literal + +import pytest + +from bigframes import operations as ops +from bigframes.core import array_value, expression, ordering +from bigframes.session import polars_executor +from bigframes.testing.engine_utils import assert_equivalence_execution + +pytest.importorskip("polars") + +# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree. +REFERENCE_ENGINE = polars_executor.PolarsExecutor() + + +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) +@pytest.mark.parametrize("join_type", ["left", "inner", "right", "outer"]) +def test_engines_join_on_key( + scalars_array_value: array_value.ArrayValue, + engine, + join_type: Literal["inner", "outer", "left", "right"], +): + result, _ = scalars_array_value.relational_join( + scalars_array_value, conditions=(("int64_col", "int64_col"),), type=join_type + ) + + assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine) + + +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) +@pytest.mark.parametrize("join_type", ["left", "inner", "right", "outer"]) +def test_engines_join_on_coerced_key( + scalars_array_value: array_value.ArrayValue, + engine, + join_type: Literal["inner", "outer", "left", "right"], +): + result, _ = scalars_array_value.relational_join( + scalars_array_value, conditions=(("int64_col", "float64_col"),), type=join_type + ) + + assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine) + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("join_type", ["left", "inner", "right", "outer"]) +def test_engines_join_multi_key( + scalars_array_value: array_value.ArrayValue, + engine, + join_type: Literal["inner", "outer", "left", "right"], +): + l_input = scalars_array_value.order_by([ordering.ascending_over("float64_col")]) + l_input, l_join_cols = scalars_array_value.compute_values( + [ + ops.mod_op.as_expr("int64_col", expression.const(2)), + ops.invert_op.as_expr("bool_col"), + ] + ) + r_input, r_join_cols = scalars_array_value.compute_values( + [ops.mod_op.as_expr("int64_col", expression.const(3)), expression.const(True)] + ) + + conditions = tuple((l_col, r_col) for l_col, r_col in zip(l_join_cols, r_join_cols)) + + result, _ = l_input.relational_join(r_input, conditions=conditions, type=join_type) + + assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine) + + +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) +def test_engines_cross_join( + scalars_array_value: array_value.ArrayValue, + engine, +): + result, _ = scalars_array_value.relational_join(scalars_array_value, type="cross") + + assert_equivalence_execution(result.node, REFERENCE_ENGINE, engine) diff --git a/tests/system/small/engines/test_read_local.py b/tests/system/small/engines/test_read_local.py index 82af7c984d..bf1a10beec 100644 --- a/tests/system/small/engines/test_read_local.py +++ b/tests/system/small/engines/test_read_local.py @@ -88,6 +88,8 @@ def test_engines_read_local_w_zero_row_source( assert_equivalence_execution(local_node, REFERENCE_ENGINE, engine) +# TODO: Fix sqlglot impl +@pytest.mark.parametrize("engine", ["polars", "bq", "pyarrow"], indirect=True) def test_engines_read_local_w_nested_source( fake_session: bigframes.Session, nested_data_source: local_data.ManagedArrowTable, diff --git a/tests/system/small/engines/test_sorting.py b/tests/system/small/engines/test_sorting.py index d1929afa44..ec1c0d95ee 100644 --- a/tests/system/small/engines/test_sorting.py +++ b/tests/system/small/engines/test_sorting.py @@ -25,7 +25,7 @@ REFERENCE_ENGINE = polars_executor.PolarsExecutor() -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) def test_engines_reverse( scalars_array_value: array_value.ArrayValue, engine, @@ -34,7 +34,7 @@ def test_engines_reverse( assert_equivalence_execution(node, REFERENCE_ENGINE, engine) -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) def test_engines_double_reverse( scalars_array_value: array_value.ArrayValue, engine, @@ -43,7 +43,7 @@ def test_engines_double_reverse( assert_equivalence_execution(node, REFERENCE_ENGINE, engine) -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) @pytest.mark.parametrize( "sort_col", [ @@ -70,7 +70,7 @@ def test_engines_sort_over_column( assert_equivalence_execution(node, REFERENCE_ENGINE, engine) -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) def test_engines_sort_multi_column_refs( scalars_array_value: array_value.ArrayValue, engine, diff --git a/tests/system/small/operations/test_datetimes.py b/tests/system/small/operations/test_datetimes.py index 4e2beb9c19..8ce0cb9beb 100644 --- a/tests/system/small/operations/test_datetimes.py +++ b/tests/system/small/operations/test_datetimes.py @@ -13,12 +13,15 @@ # limitations under the License. import datetime +import typing import numpy +from packaging import version from pandas import testing import pandas as pd import pytest +import bigframes.pandas as bpd import bigframes.series from bigframes.testing.utils import assert_series_equal @@ -548,3 +551,23 @@ def test_timedelta_dt_accessors_on_wrong_type_raise_exception(scalars_dfs, acces with pytest.raises(TypeError): access(bf_df["timestamp_col"]) + + +@pytest.mark.parametrize( + "col", + # TODO(b/431276706) test timestamp_col too. + ["date_col", "datetime_col"], +) +def test_to_datetime(scalars_dfs, col): + if version.Version(pd.__version__) <= version.Version("2.1.0"): + pytest.skip("timezone conversion bug") + bf_df, pd_df = scalars_dfs + + actual_result = typing.cast( + bigframes.series.Series, bpd.to_datetime(bf_df[col]) + ).to_pandas() + + expected_result = pd.Series(pd.to_datetime(pd_df[col])) + testing.assert_series_equal( + actual_result, expected_result, check_dtype=False, check_index_type=False + ) diff --git a/tests/system/small/test_anywidget.py b/tests/system/small/test_anywidget.py new file mode 100644 index 0000000000..b6dfb22934 --- /dev/null +++ b/tests/system/small/test_anywidget.py @@ -0,0 +1,343 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +import pytest + +import bigframes as bf + +pytest.importorskip("anywidget") + +# Test constants to avoid change detector tests +EXPECTED_ROW_COUNT = 6 +EXPECTED_PAGE_SIZE = 2 +EXPECTED_TOTAL_PAGES = 3 + + +@pytest.fixture(scope="module") +def paginated_pandas_df() -> pd.DataFrame: + """Create a minimal test DataFrame with exactly 3 pages of 2 rows each.""" + test_data = pd.DataFrame( + { + "id": [0, 1, 2, 3, 4, 5], + "page_indicator": [ + # Page 1 (rows 1-2) + "page_1_row_1", + "page_1_row_2", + # Page 2 (rows 3-4) + "page_2_row_1", + "page_2_row_2", + # Page 3 (rows 5-6) + "page_3_row_1", + "page_3_row_2", + ], + "value": [0, 1, 2, 3, 4, 5], + } + ) + return test_data + + +@pytest.fixture(scope="module") +def paginated_bf_df( + session: bf.Session, paginated_pandas_df: pd.DataFrame +) -> bf.dataframe.DataFrame: + return session.read_pandas(paginated_pandas_df) + + +@pytest.fixture +def table_widget(paginated_bf_df: bf.dataframe.DataFrame): + """ + Helper fixture to create a TableWidget instance with a fixed page size. + This reduces duplication across tests that use the same widget configuration. + """ + from bigframes import display + + with bf.option_context("display.repr_mode", "anywidget", "display.max_rows", 2): + # Delay context manager cleanup of `max_rows` until after tests finish. + yield display.TableWidget(paginated_bf_df) + + +@pytest.fixture(scope="module") +def small_pandas_df() -> pd.DataFrame: + """Create a DataFrame smaller than the page size for edge case testing.""" + return pd.DataFrame( + { + "id": [0, 1], + "page_indicator": ["small_row_1", "small_row_2"], + "value": [0, 1], + } + ) + + +@pytest.fixture(scope="module") +def small_bf_df( + session: bf.Session, small_pandas_df: pd.DataFrame +) -> bf.dataframe.DataFrame: + return session.read_pandas(small_pandas_df) + + +@pytest.fixture +def small_widget(small_bf_df): + """Helper fixture for tests using a DataFrame smaller than the page size.""" + from bigframes import display + + with bf.option_context("display.repr_mode", "anywidget", "display.max_rows", 5): + yield display.TableWidget(small_bf_df) + + +@pytest.fixture(scope="module") +def empty_pandas_df() -> pd.DataFrame: + """Create an empty DataFrame for edge case testing.""" + return pd.DataFrame(columns=["id", "page_indicator", "value"]) + + +@pytest.fixture(scope="module") +def empty_bf_df( + session: bf.Session, empty_pandas_df: pd.DataFrame +) -> bf.dataframe.DataFrame: + return session.read_pandas(empty_pandas_df) + + +def _assert_html_matches_pandas_slice( + table_html: str, + expected_pd_slice: pd.DataFrame, + full_pd_df: pd.DataFrame, +): + """ + Assertion helper to verify that the rendered HTML contains exactly the + rows from the expected pandas DataFrame slice and no others. This is + inspired by the pattern of comparing BigFrames output to pandas output. + """ + # Check that the unique indicator from each expected row is present. + for _, row in expected_pd_slice.iterrows(): + assert row["page_indicator"] in table_html + + # Create a DataFrame of all rows that should NOT be present. + unexpected_pd_df = full_pd_df.drop(expected_pd_slice.index) + + # Check that no unique indicators from unexpected rows are present. + for _, row in unexpected_pd_df.iterrows(): + assert row["page_indicator"] not in table_html + + +def test_widget_initialization_should_calculate_total_row_count( + paginated_bf_df: bf.dataframe.DataFrame, +): + """A TableWidget should correctly calculate the total row count on creation.""" + from bigframes import display + + with bf.option_context("display.repr_mode", "anywidget", "display.max_rows", 2): + widget = display.TableWidget(paginated_bf_df) + + assert widget.row_count == EXPECTED_ROW_COUNT + + +def test_widget_initialization_should_set_default_pagination( + table_widget, +): + """A TableWidget should initialize with page 0 and the correct page size.""" + # The `table_widget` fixture already creates the widget. + # Assert its state. + assert table_widget.page == 0 + assert table_widget.page_size == EXPECTED_PAGE_SIZE + + +def test_widget_display_should_show_first_page_on_load( + table_widget, paginated_pandas_df: pd.DataFrame +): + """ + Given a widget, when it is first loaded, then it should display + the first page of data. + """ + expected_slice = paginated_pandas_df.iloc[0:2] + + html = table_widget.table_html + + _assert_html_matches_pandas_slice(html, expected_slice, paginated_pandas_df) + + +def test_widget_navigation_should_display_second_page( + table_widget, paginated_pandas_df: pd.DataFrame +): + """ + Given a widget, when the page is set to 1, then it should display + the second page of data. + """ + expected_slice = paginated_pandas_df.iloc[2:4] + + table_widget.page = 1 + html = table_widget.table_html + + assert table_widget.page == 1 + _assert_html_matches_pandas_slice(html, expected_slice, paginated_pandas_df) + + +def test_widget_navigation_should_display_last_page( + table_widget, paginated_pandas_df: pd.DataFrame +): + """ + Given a widget, when the page is set to the last page (2), + then it should display the final page of data. + """ + expected_slice = paginated_pandas_df.iloc[4:6] + + table_widget.page = 2 + html = table_widget.table_html + + assert table_widget.page == 2 + _assert_html_matches_pandas_slice(html, expected_slice, paginated_pandas_df) + + +def test_widget_navigation_should_clamp_to_zero_for_negative_input( + table_widget, paginated_pandas_df: pd.DataFrame +): + """ + Given a widget, when a negative page number is set, + then the page number should be clamped to 0 and display the first page. + """ + expected_slice = paginated_pandas_df.iloc[0:2] + + table_widget.page = -1 + html = table_widget.table_html + + assert table_widget.page == 0 + _assert_html_matches_pandas_slice(html, expected_slice, paginated_pandas_df) + + +def test_widget_navigation_should_clamp_to_last_page_for_out_of_bounds_input( + table_widget, paginated_pandas_df: pd.DataFrame +): + """ + Given a widget, when a page number greater than the max is set, + then the page number should be clamped to the last valid page. + """ + expected_slice = paginated_pandas_df.iloc[4:6] + + table_widget.page = 100 + html = table_widget.table_html + + assert table_widget.page == 2 + _assert_html_matches_pandas_slice(html, expected_slice, paginated_pandas_df) + + +@pytest.mark.parametrize( + "page, start_row, end_row", + [ + (0, 0, 3), # Page 0: rows 0-2 + (1, 3, 6), # Page 1: rows 3-5 + ], + ids=[ + "Page 0 (Rows 0-2)", + "Page 1 (Rows 3-5)", + ], +) +def test_widget_pagination_should_work_with_custom_page_size( + paginated_bf_df: bf.dataframe.DataFrame, + paginated_pandas_df: pd.DataFrame, + page: int, + start_row: int, + end_row: int, +): + """ + A widget should paginate correctly with a custom page size of 3. + """ + with bf.option_context("display.repr_mode", "anywidget", "display.max_rows", 3): + from bigframes.display import TableWidget + + widget = TableWidget(paginated_bf_df) + assert widget.page_size == 3 + + expected_slice = paginated_pandas_df.iloc[start_row:end_row] + + widget.page = page + html = widget.table_html + + assert widget.page == page + _assert_html_matches_pandas_slice(html, expected_slice, paginated_pandas_df) + + +def test_widget_with_few_rows_should_display_all_rows(small_widget, small_pandas_df): + """ + Given a DataFrame smaller than the page size, the widget should + display all rows on the first page. + """ + html = small_widget.table_html + + _assert_html_matches_pandas_slice(html, small_pandas_df, small_pandas_df) + + +def test_widget_with_few_rows_should_have_only_one_page(small_widget): + """ + Given a DataFrame smaller than the page size, the widget should + clamp page navigation, effectively having only one page. + """ + assert small_widget.page == 0 + + # Attempt to navigate past the end + small_widget.page = 1 + + # Should be clamped back to the only valid page + assert small_widget.page == 0 + + +def test_widget_page_size_should_be_immutable_after_creation( + paginated_bf_df: bf.dataframe.DataFrame, +): + """ + A widget's page size should be fixed on creation and not be affected + by subsequent changes to global options. + """ + with bf.option_context("display.repr_mode", "anywidget", "display.max_rows", 2): + from bigframes.display import TableWidget + + widget = TableWidget(paginated_bf_df) + assert widget.page_size == 2 + + # Navigate to second page to ensure widget is in a non-default state + widget.page = 1 + assert widget.page == 1 + + # Change global max_rows - widget should not be affected + bf.options.display.max_rows = 10 + + assert widget.page_size == 2 # Should remain unchanged + assert widget.page == 1 # Should remain on same page + + +def test_empty_widget_should_have_zero_row_count(empty_bf_df: bf.dataframe.DataFrame): + """Given an empty DataFrame, the widget's row count should be 0.""" + with bf.option_context("display.repr_mode", "anywidget"): + from bigframes.display import TableWidget + + widget = TableWidget(empty_bf_df) + + assert widget.row_count == 0 + + +def test_empty_widget_should_render_table_headers(empty_bf_df: bf.dataframe.DataFrame): + """Given an empty DataFrame, the widget should still render table headers.""" + with bf.option_context("display.repr_mode", "anywidget"): + from bigframes.display import TableWidget + + widget = TableWidget(empty_bf_df) + + html = widget.table_html + + assert " [3 rows x 3 columns] + Keep rows with at least 2 non-null values. + + >>> df.dropna(thresh=2) + name toy born + 1 Batman Batmobile 1940-04-25 + 2 Catwoman Bullwhip + + [2 rows x 3 columns] + + Keep columns with at least 2 non-null values: + + >>> df.dropna(axis='columns', thresh=2) + name toy + 0 Alfred + 1 Batman Batmobile + 2 Catwoman Bullwhip + + [3 rows x 2 columns] + Define in which columns to look for missing values. >>> df.dropna(subset=['name', 'toy']) @@ -1822,7 +1842,7 @@ def dropna( [2 rows x 3 columns] Args: - axis ({0 or 'index', 1 or 'columns'}, default 'columns'): + axis ({0 or 'index', 1 or 'columns'}, default 0): Determine if rows or columns which contain missing values are removed. @@ -1834,6 +1854,8 @@ def dropna( * 'any' : If any NA values are present, drop that row or column. * 'all' : If all values are NA, drop that row or column. + thresh (int, optional): + Require that many non-NA values. Cannot be combined with how. subset (column label or sequence of labels, optional): Labels along other axis to consider, e.g. if you are dropping rows these would be a list of columns to include. @@ -1851,6 +1873,8 @@ def dropna( Raises: ValueError: If ``how`` is not one of ``any`` or ``all``. + TyperError: + If both ``how`` and ``thresh`` are specified. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -3043,6 +3067,20 @@ def radd(self, other, axis: str | int = "columns") -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def __radd__(self, other) -> DataFrame: + """Get addition of other and DataFrame, element-wise (binary operator `+`). + + Equivalent to ``DataFrame.radd(other)``. + + Args: + other (float, int, or Series): + Any single or multiple element data structure, or list-like object. + + Returns: + bigframes.pandas.DataFrame: DataFrame result of the arithmetic operation. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def sub(self, other, axis: str | int = "columns") -> DataFrame: """Get subtraction of DataFrame and other, element-wise (binary operator `-`). diff --git a/third_party/bigframes_vendored/pandas/core/tools/datetimes.py b/third_party/bigframes_vendored/pandas/core/tools/datetimes.py index d6048d1208..9c17b9632e 100644 --- a/third_party/bigframes_vendored/pandas/core/tools/datetimes.py +++ b/third_party/bigframes_vendored/pandas/core/tools/datetimes.py @@ -1,17 +1,22 @@ # Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/tools/datetimes.py -from datetime import datetime +from datetime import date, datetime from typing import List, Mapping, Tuple, Union import pandas as pd -from bigframes import constants, series +from bigframes import constants, dataframe, series local_iterables = Union[List, Tuple, pd.Series, pd.DataFrame, Mapping] def to_datetime( - arg, + arg: Union[ + Union[int, float, str, datetime, date], + local_iterables, + series.Series, + dataframe.DataFrame, + ], *, utc=False, format=None, @@ -58,7 +63,7 @@ def to_datetime( dtype: timestamp[us, tz=UTC][pyarrow] Args: - arg (int, float, str, datetime, list, tuple, 1-d array, Series): + arg (int, float, str, datetime, date, list, tuple, 1-d array, Series): The object to convert to a datetime. utc (bool, default False): Control timezone-related parsing, localization and conversion. If True, the diff --git a/third_party/bigframes_vendored/version.py b/third_party/bigframes_vendored/version.py index 4d26fb9b8c..9e7a386601 100644 --- a/third_party/bigframes_vendored/version.py +++ b/third_party/bigframes_vendored/version.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.10.0" +__version__ = "2.11.0" # {x-release-please-start-date} -__release_date__ = "2025-07-08" +__release_date__ = "2025-07-15" # {x-release-please-end}