diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 6c78a07f3b..4653f0ab6a 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -14,29 +14,21 @@ from __future__ import annotations from dataclasses import dataclass -import functools -import math -import textwrap +import io import typing -from typing import Collection, Iterable, Literal, Optional, Sequence, Tuple +from typing import Iterable, Literal, Optional, Sequence, Tuple from google.cloud import bigquery import ibis -import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types import pandas -import bigframes.constants as constants +import bigframes.core.compile as compiled import bigframes.core.guid -from bigframes.core.ordering import ( - encode_order_string, - ExpressionOrdering, - IntegerEncoding, - OrderingColumnReference, - reencode_order_string, - StringEncoding, -) -import bigframes.core.utils as utils +import bigframes.core.nodes as nodes +from bigframes.core.ordering import OrderingColumnReference +import bigframes.core.ordering as orderings +from bigframes.core.window_spec import WindowSpec import bigframes.dtypes import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -49,470 +41,190 @@ @dataclass(frozen=True) -class WindowSpec: +class ArrayValue: """ - Specifies a window over which aggregate and analytic function may be applied. - grouping_keys: set of column ids to group on - preceding: Number of preceding rows in the window - following: Number of preceding rows in the window - ordering: List of columns ids and ordering direction to override base ordering + ArrayValue is an immutable type representing a 2D array with per-column types. """ - grouping_keys: typing.Sequence[str] = tuple() - ordering: typing.Sequence[OrderingColumnReference] = tuple() - preceding: typing.Optional[int] = None - following: typing.Optional[int] = None - min_periods: int = 0 - - -# TODO(swast): We might want to move this to it's own sub-module. -class ArrayValue: - """Immutable BigQuery DataFrames expression tree. - - Note: Usage of this class is considered to be private and subject to change - at any time. + node: nodes.BigFrameNode - This class is a wrapper around Ibis expressions. Its purpose is to defer - Ibis projection operations to keep generated SQL small and correct when - mixing and matching columns from different versions of a DataFrame. - - Args: - session: - A BigQuery DataFrames session to allow more flexibility in running - queries. - table: An Ibis table expression. - columns: Ibis value expressions that can be projected as columns. - hidden_ordering_columns: Ibis value expressions to store ordering. - ordering: An ordering property of the data frame. - predicates: A list of filters on the data frame. - """ - - def __init__( - self, + @classmethod + def from_ibis( + cls, session: Session, table: ibis_types.Table, columns: Sequence[ibis_types.Value], - hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None, - ordering: ExpressionOrdering = ExpressionOrdering(), - predicates: Optional[Collection[ibis_types.BooleanValue]] = None, + hidden_ordering_columns: Sequence[ibis_types.Value], + ordering: orderings.ExpressionOrdering, ): - self._session = session - self._table = table - self._predicates = tuple(predicates) if predicates is not None else () - # TODO: Validate ordering - if not ordering.total_ordering_columns: - raise ValueError("Must have total ordering defined by one or more columns") - self._ordering = ordering - # Allow creating a DataFrame directly from an Ibis table expression. - # TODO(swast): Validate that each column references the same table (or - # no table for literal values). - self._columns = tuple(columns) - - # Meta columns store ordering, or other data that doesn't correspond to dataframe columns - self._hidden_ordering_columns = ( - tuple(hidden_ordering_columns) - if hidden_ordering_columns is not None - else () - ) - - # To allow for more efficient lookup by column name, create a - # dictionary mapping names to column values. - self._column_names = {column.get_name(): column for column in self._columns} - self._hidden_ordering_column_names = { - column.get_name(): column for column in self._hidden_ordering_columns - } - ### Validation - value_col_ids = self._column_names.keys() - hidden_col_ids = self._hidden_ordering_column_names.keys() - - all_columns = value_col_ids | hidden_col_ids - ordering_valid = all( - col.column_id in all_columns for col in ordering.all_ordering_columns + node = nodes.ReadGbqNode( + table=table, + table_session=session, + columns=tuple(columns), + hidden_ordering_columns=tuple(hidden_ordering_columns), + ordering=ordering, ) - if value_col_ids & hidden_col_ids: - raise ValueError( - f"Keys in both hidden and exposed list: {value_col_ids & hidden_col_ids}" - ) - if not ordering_valid: - raise ValueError(f"Illegal ordering keys: {ordering.all_ordering_columns}") + return cls(node) @classmethod - def mem_expr_from_pandas( - cls, - pd_df: pandas.DataFrame, - session: Optional[Session], - ) -> ArrayValue: - """ - Builds an in-memory only (SQL only) expr from a pandas dataframe. + def from_pandas(cls, pd_df: pandas.DataFrame): + iobytes = io.BytesIO() + # Discard row labels and use simple string ids for columns + column_ids = tuple(str(label) for label in pd_df.columns) + pd_df.reset_index(drop=True).set_axis(column_ids, axis=1).to_feather(iobytes) + node = nodes.ReadLocalNode(iobytes.getvalue(), column_ids=column_ids) + return cls(node) - Caution: If session is None, only a subset of expr functionality will - be available (null Session is usually not supported). - """ - # We can't include any hidden columns in the ArrayValue constructor, so - # grab the column names before we add the hidden ordering column. - column_names = [str(column) for column in pd_df.columns] - # Make sure column names are all strings. - pd_df = pd_df.set_axis(column_names, axis="columns") - pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))}) - - # ibis memtable cannot handle NA, must convert to None - pd_df = pd_df.astype("object") # type: ignore - pd_df = pd_df.where(pandas.notnull(pd_df), None) + @property + def column_ids(self) -> typing.Sequence[str]: + return self.compile().column_ids - # NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases. - keys_memtable = ibis.memtable(pd_df) - schema = keys_memtable.schema() - new_schema = [] - for column_index, column in enumerate(schema): - if column == ORDER_ID_COLUMN: - new_type: ibis_dtypes.DataType = ibis_dtypes.int64 - else: - column_type = schema[column] - # The autodetected type might not be one we can support, such - # as NULL type for empty rows, so convert to a type we do - # support. - new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype( - bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type) - ) - # TODO(swast): Ibis memtable doesn't use backticks in struct - # field names, so spaces and other characters aren't allowed in - # the memtable context. Blocked by - # https://github.com/ibis-project/ibis/issues/7187 - column = f"col_{column_index}" - new_schema.append((column, new_type)) + @property + def session(self) -> Session: + required_session = self.node.session + from bigframes import get_global_session - # must set non-null column labels. these are not the user-facing labels - pd_df = pd_df.set_axis( - [column for column, _ in new_schema], - axis="columns", - ) - keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema)) + return self.node.session[0] if required_session else get_global_session() - return cls( - session, # type: ignore # Session cannot normally be none, see "caution" above - keys_memtable, - columns=[ - keys_memtable[f"col_{column_index}"].name(column) - for column_index, column in enumerate(column_names) - ], - ordering=ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - ), - hidden_ordering_columns=(keys_memtable[ORDER_ID_COLUMN],), - ) - - @property - def columns(self) -> typing.Tuple[ibis_types.Value, ...]: - return self._columns + def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: + return self.compile().get_column_type(key) - @property - def column_ids(self) -> typing.Sequence[str]: - return tuple(self._column_names.keys()) + def compile(self) -> compiled.CompiledArrayValue: + return compiled.compile_node(self.node) - @property - def _hidden_column_ids(self) -> typing.Sequence[str]: - return tuple(self._hidden_ordering_column_names.keys()) + def shape(self) -> typing.Tuple[int, int]: + """Returns dimensions as (length, width) tuple.""" + width = len(self.compile().columns) + count_expr = self.compile()._to_ibis_expr("unordered").count() - @property - def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: - """Returns the frame's predicates as an equivalent boolean value, useful where a single predicate value is preferred.""" - return ( - _reduce_predicate_list(self._predicates).name(PREDICATE_COLUMN) - if self._predicates - else None + # Support in-memory engines for hermetic unit tests. + if not self.node.session: + try: + length = ibis.pandas.connect({}).execute(count_expr) + return (length, width) + except Exception: + # Not all cases can be handled by pandas engine + pass + + sql = self.session.ibis_client.compile(count_expr) + row_iterator, _ = self.session._start_query( + sql=sql, + max_results=1, ) + length = next(row_iterator)[0] + return (length, width) - @property - def _ibis_order(self) -> Sequence[ibis_types.Value]: - """Returns a sequence of ibis values which can be directly used to order a table expression. Has direction modifiers applied.""" - return _convert_ordering_to_table_values( - {**self._column_names, **self._hidden_ordering_column_names}, - self._ordering.all_ordering_columns, + def to_sql( + self, + offset_column: typing.Optional[str] = None, + col_id_overrides: typing.Mapping[str, str] = {}, + sorted: bool = False, + ) -> str: + return self.compile().to_sql( + offset_column=offset_column, + col_id_overrides=col_id_overrides, + sorted=sorted, ) - def builder(self) -> ArrayValueBuilder: - """Creates a mutable builder for expressions.""" - # Since ArrayValue is intended to be immutable (immutability offers - # potential opportunities for caching, though we might need to introduce - # more node types for that to be useful), we create a builder class. - return ArrayValueBuilder( - self._session, - self._table, - columns=self._columns, - hidden_ordering_columns=self._hidden_ordering_columns, - ordering=self._ordering, - predicates=self._predicates, + def start_query( + self, + job_config: Optional[bigquery.job.QueryJobConfig] = None, + max_results: Optional[int] = None, + *, + sorted: bool = True, + ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + """Execute a query and return metadata about the results.""" + # TODO(swast): Cache the job ID so we can look it up again if they ask + # for the results? We'd need a way to invalidate the cache if DataFrame + # becomes mutable, though. Or move this method to the immutable + # expression class. + # TODO(swast): We might want to move this method to Session and/or + # provide our own minimal metadata class. Tight coupling to the + # BigQuery client library isn't ideal, especially if we want to support + # a LocalSession for unit testing. + # TODO(swast): Add a timeout here? If the query is taking a long time, + # maybe we just print the job metadata that we have so far? + sql = self.to_sql(sorted=sorted) # type:ignore + return self.session._start_query( + sql=sql, + job_config=job_config, + max_results=max_results, ) - def drop_columns(self, columns: Iterable[str]) -> ArrayValue: - # Must generate offsets if we are dropping a column that ordering depends on - expr = self - for ordering_column in set(columns).intersection( - [col.column_id for col in self._ordering.ordering_value_columns] - ): - expr = self._hide_column(ordering_column) - - expr_builder = expr.builder() - remain_cols = [ - column for column in expr.columns if column.get_name() not in columns - ] - expr_builder.columns = remain_cols - return expr_builder.build() - - def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: - ibis_type = typing.cast( - bigframes.dtypes.IbisDtype, self._get_any_column(key).type() + def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue: + """Write the ArrayValue to a session table and create a new block object that references it.""" + compiled = self.compile() + ibis_expr = compiled._to_ibis_expr("unordered", expose_hidden_cols=True) + destination = self.session._ibis_to_session_table( + ibis_expr, cluster_cols=cluster_cols, api_name="cache" ) - return typing.cast( - bigframes.dtypes.Dtype, - bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type), + table_expression = self.session.ibis_client.table( + f"{destination.project}.{destination.dataset_id}.{destination.table_id}" + ) + new_columns = [table_expression[column] for column in compiled.column_ids] + new_hidden_columns = [ + table_expression[column] + for column in compiled._hidden_ordering_column_names + ] + return ArrayValue.from_ibis( + self.session, + table_expression, + columns=new_columns, + hidden_ordering_columns=new_hidden_columns, + ordering=compiled._ordering, ) - def _get_ibis_column(self, key: str) -> ibis_types.Value: - """Gets the Ibis expression for a given column.""" - if key not in self.column_ids: - raise ValueError( - "Column name {} not in set of values: {}".format(key, self.column_ids) - ) - return typing.cast(ibis_types.Value, self._column_names[key]) - - def _get_any_column(self, key: str) -> ibis_types.Value: - """Gets the Ibis expression for a given column. Will also get hidden columns.""" - all_columns = {**self._column_names, **self._hidden_ordering_column_names} - if key not in all_columns.keys(): - raise ValueError( - "Column name {} not in set of values: {}".format( - key, all_columns.keys() - ) - ) - return typing.cast(ibis_types.Value, all_columns[key]) + # Operations - def _get_hidden_ordering_column(self, key: str) -> ibis_types.Column: - """Gets the Ibis expression for a given hidden column.""" - if key not in self._hidden_ordering_column_names.keys(): - raise ValueError( - "Column name {} not in set of values: {}".format( - key, self._hidden_ordering_column_names.keys() - ) - ) - return typing.cast(ibis_types.Column, self._hidden_ordering_column_names[key]) + def drop_columns(self, columns: Iterable[str]) -> ArrayValue: + return ArrayValue( + nodes.DropColumnsNode(child=self.node, columns=tuple(columns)) + ) def filter(self, predicate_id: str, keep_null: bool = False) -> ArrayValue: """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" - condition = typing.cast( - ibis_types.BooleanValue, self._get_ibis_column(predicate_id) - ) - if keep_null: - condition = typing.cast( - ibis_types.BooleanValue, - condition.fillna( - typing.cast(ibis_types.BooleanScalar, ibis_types.literal(True)) - ), + return ArrayValue( + nodes.FilterNode( + child=self.node, predicate_id=predicate_id, keep_null=keep_null ) - return self._filter(condition) - - def _filter(self, predicate_value: ibis_types.BooleanValue) -> ArrayValue: - """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" - expr = self.builder() - expr.ordering = expr.ordering.with_non_sequential() - expr.predicates = [*self._predicates, predicate_value] - return expr.build() + ) def order_by( self, by: Sequence[OrderingColumnReference], stable: bool = False ) -> ArrayValue: - expr_builder = self.builder() - expr_builder.ordering = self._ordering.with_ordering_columns(by, stable=stable) - return expr_builder.build() - - def reversed(self) -> ArrayValue: - expr_builder = self.builder() - expr_builder.ordering = self._ordering.with_reverse() - return expr_builder.build() - - def _uniform_sampling(self, fraction: float) -> ArrayValue: - """Sampling the table on given fraction. - - .. warning:: - The row numbers of result is non-deterministic, avoid to use. - """ - table = self._to_ibis_expr( - "unordered", expose_hidden_cols=True, fraction=fraction - ) - columns = [table[column_name] for column_name in self._column_names] - hidden_ordering_columns = [ - table[column_name] for column_name in self._hidden_ordering_column_names - ] return ArrayValue( - self._session, - table, - columns=columns, - hidden_ordering_columns=hidden_ordering_columns, - ordering=self._ordering, + nodes.OrderByNode(child=self.node, by=tuple(by), stable=stable) ) - @property - def _offsets(self) -> ibis_types.IntegerColumn: - if not self._ordering.is_sequential: - raise ValueError( - "Expression does not have offsets. Generate them first using project_offsets." - ) - if not self._ordering.total_order_col: - raise ValueError( - "Ordering is invalid. Marked as sequential but no total order columns." - ) - column = self._get_any_column(self._ordering.total_order_col.column_id) - return typing.cast(ibis_types.IntegerColumn, column) - - def _project_offsets(self) -> ArrayValue: - """Create a new expression that contains offsets. Should only be executed when offsets are needed for an operations. Has no effect on expression semantics.""" - if self._ordering.is_sequential: - return self - # TODO(tbergeron): Enforce total ordering - table = self._to_ibis_expr( - ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN - ) - columns = [table[column_name] for column_name in self._column_names] - ordering = ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - integer_encoding=IntegerEncoding(True, is_sequential=True), - ) - return ArrayValue( - self._session, - table, - columns=columns, - hidden_ordering_columns=[table[ORDER_ID_COLUMN]], - ordering=ordering, - ) - - def _hide_column(self, column_id) -> ArrayValue: - """Pushes columns to hidden columns list. Used to hide ordering columns that have been dropped or destructively mutated.""" - expr_builder = self.builder() - # Need to rename column as caller might be creating a new row with the same name but different values. - # Can avoid this if don't allow callers to determine ids and instead generate unique ones in this class. - new_name = bigframes.core.guid.generate_guid(prefix="bigframes_hidden_") - expr_builder.hidden_ordering_columns = [ - *self._hidden_ordering_columns, - self._get_ibis_column(column_id).name(new_name), - ] - expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name}) - return expr_builder.build() + def reversed(self) -> ArrayValue: + return ArrayValue(nodes.ReversedNode(child=self.node)) def promote_offsets(self, col_id: str) -> ArrayValue: """ Convenience function to promote copy of column offsets to a value column. Can be used to reset index. """ - # Special case: offsets already exist - ordering = self._ordering - - if (not ordering.is_sequential) or (not ordering.total_order_col): - return self._project_offsets().promote_offsets(col_id) - expr_builder = self.builder() - expr_builder.columns = [ - self._get_any_column(ordering.total_order_col.column_id).name(col_id), - *self.columns, - ] - return expr_builder.build() + return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id)) def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue: - return self._projection( - [self._get_ibis_column(col_id) for col_id in column_ids] + return ArrayValue( + nodes.SelectNode(child=self.node, column_ids=tuple(column_ids)) ) - def _projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue: - """Creates a new expression based on this expression with new columns.""" - # TODO(swast): We might want to do validation here that columns derive - # from the same table expression instead of (in addition to?) at - # construction time. - - expr = self - for ordering_column in set(self.column_ids).intersection( - [col_ref.column_id for col_ref in self._ordering.ordering_value_columns] - ): - # Need to hide ordering columns that are being dropped. Alternatively, could project offsets - expr = expr._hide_column(ordering_column) - builder = expr.builder() - builder.columns = list(columns) - new_expr = builder.build() - return new_expr - - def shape(self) -> typing.Tuple[int, int]: - """Returns dimensions as (length, width) tuple.""" - width = len(self.columns) - count_expr = self._to_ibis_expr("unordered").count() - sql = self._session.ibis_client.compile(count_expr) - - # Support in-memory engines for hermetic unit tests. - if not isinstance(sql, str): - length = self._session.ibis_client.execute(count_expr) - else: - row_iterator, _ = self._session._start_query( - sql=sql, - max_results=1, - ) - length = next(row_iterator)[0] - return (length, width) - def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: """Append together multiple ArrayValue objects.""" - if len(other) == 0: - return self - tables = [] - prefix_base = 10 - prefix_size = math.ceil(math.log(len(other) + 1, prefix_base)) - # Must normalize all ids to the same encoding size - max_encoding_size = max( - self._ordering.string_encoding.length, - *[expression._ordering.string_encoding.length for expression in other], - ) - for i, expr in enumerate([self, *other]): - ordering_prefix = str(i).zfill(prefix_size) - table = expr._to_ibis_expr( - ordering_mode="string_encoded", order_col_name=ORDER_ID_COLUMN - ) - # Rename the value columns based on horizontal offset before applying union. - table = table.select( - [ - table[col].name(f"column_{i}") - if col != ORDER_ID_COLUMN - else ( - ordering_prefix - + reencode_order_string( - table[ORDER_ID_COLUMN], max_encoding_size - ) - ).name(ORDER_ID_COLUMN) - for i, col in enumerate(table.columns) - ] - ) - tables.append(table) - combined_table = ibis.union(*tables) - ordering = ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - string_encoding=StringEncoding(True, prefix_size + max_encoding_size), - ) return ArrayValue( - self._session, - combined_table, - columns=[ - combined_table[col] - for col in combined_table.columns - if col != ORDER_ID_COLUMN - ], - hidden_ordering_columns=[combined_table[ORDER_ID_COLUMN]], - ordering=ordering, + nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]])) ) def project_unary_op( self, column_name: str, op: ops.UnaryOp, output_name=None ) -> ArrayValue: """Creates a new expression based on this expression with unary operation applied to one column.""" - value = op._as_ibis(self._get_ibis_column(column_name)).name( - output_name or column_name + return ArrayValue( + nodes.ProjectUnaryOpNode( + child=self.node, input_id=column_name, op=op, output_id=output_name + ) ) - return self._set_or_replace_by_id(output_name or column_name, value) def project_binary_op( self, @@ -522,11 +234,15 @@ def project_binary_op( output_column_id: str, ) -> ArrayValue: """Creates a new expression based on this expression with binary operation applied to two columns.""" - value = op( - self._get_ibis_column(left_column_id), - self._get_ibis_column(right_column_id), - ).name(output_column_id) - return self._set_or_replace_by_id(output_column_id, value) + return ArrayValue( + nodes.ProjectBinaryOpNode( + child=self.node, + left_input_id=left_column_id, + right_input_id=right_column_id, + op=op, + output_id=output_column_id, + ) + ) def project_ternary_op( self, @@ -537,12 +253,16 @@ def project_ternary_op( output_column_id: str, ) -> ArrayValue: """Creates a new expression based on this expression with ternary operation applied to three columns.""" - value = op( - self._get_ibis_column(col_id_1), - self._get_ibis_column(col_id_2), - self._get_ibis_column(col_id_3), - ).name(output_column_id) - return self._set_or_replace_by_id(output_column_id, value) + return ArrayValue( + nodes.ProjectTernaryOpNode( + child=self.node, + input_id1=col_id_1, + input_id2=col_id_2, + input_id3=col_id_3, + op=op, + output_id=output_column_id, + ) + ) def aggregate( self, @@ -557,46 +277,14 @@ def aggregate( by_column_id: column id of the aggregation key, this is preserved through the transform dropna: whether null keys should be dropped """ - table = self._to_ibis_expr("unordered") - stats = { - col_out: agg_op._as_ibis(table[col_in]) - for col_in, agg_op, col_out in aggregations - } - if by_column_ids: - result = table.group_by(by_column_ids).aggregate(**stats) - # Must have deterministic ordering, so order by the unique "by" column - ordering = ExpressionOrdering( - [ - OrderingColumnReference(column_id=column_id) - for column_id in by_column_ids - ], - total_ordering_columns=frozenset(by_column_ids), - ) - columns = tuple(result[key] for key in result.columns) - expr = ArrayValue(self._session, result, columns=columns, ordering=ordering) - if dropna: - for column_id in by_column_ids: - expr = expr._filter( - ops.notnull_op._as_ibis(expr._get_ibis_column(column_id)) - ) - # Can maybe remove this as Ordering id is redundant as by_column is unique after aggregation - return expr._project_offsets() - else: - aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} - result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. - ordering = ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), - ) - return ArrayValue( - self._session, - result, - columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, + return ArrayValue( + nodes.AggregateNode( + child=self.node, + aggregations=tuple(aggregations), + by_column_ids=tuple(by_column_ids), + dropna=dropna, ) + ) def corr_aggregate( self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]] @@ -607,25 +295,8 @@ def corr_aggregate( Arguments: corr_aggregations: left_column_id, right_column_id, output_column_id tuples """ - table = self._to_ibis_expr("unordered") - stats = { - col_out: table[col_left].corr(table[col_right], how="pop") - for col_left, col_right, col_out in corr_aggregations - } - aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} - result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. - ordering = ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), - ) return ArrayValue( - self._session, - result, - columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, + nodes.CorrNode(child=self.node, corr_aggregations=tuple(corr_aggregations)) ) def project_window_op( @@ -647,231 +318,17 @@ def project_window_op( never_skip_nulls: will disable null skipping for operators that would otherwise do so skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ - column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name)) - window = self._ibis_window_from_spec(window_spec, allow_ties=op.handles_ties) - - window_op = op._as_ibis(column, window) - - clauses = [] - if op.skips_nulls and not never_skip_nulls: - clauses.append((column.isnull(), ibis.NA)) - if window_spec.min_periods: - if op.skips_nulls: - # Most operations do not count NULL values towards min_periods - observation_count = agg_ops.count_op._as_ibis(column, window) - else: - # Operations like count treat even NULLs as valid observations for the sake of min_periods - # notnull is just used to convert null values to non-null (FALSE) values to be counted - denulled_value = typing.cast(ibis_types.BooleanColumn, column.notnull()) - observation_count = agg_ops.count_op._as_ibis(denulled_value, window) - clauses.append( - ( - observation_count < ibis_types.literal(window_spec.min_periods), - ibis.NA, - ) - ) - if clauses: - case_statement = ibis.case() - for clause in clauses: - case_statement = case_statement.when(clause[0], clause[1]) - case_statement = case_statement.else_(window_op).end() - window_op = case_statement - - result = self._set_or_replace_by_id(output_name or column_name, window_op) - # TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation. - return result._reproject_to_table() if not skip_reproject_unsafe else result - - def to_sql( - self, - offset_column: typing.Optional[str] = None, - col_id_overrides: typing.Mapping[str, str] = {}, - sorted: bool = False, - ) -> str: - offsets_id = offset_column or ORDER_ID_COLUMN - - sql = self._session.ibis_client.compile( - self._to_ibis_expr( - ordering_mode="offset_col" - if (offset_column or sorted) - else "unordered", - order_col_name=offsets_id, - col_id_overrides=col_id_overrides, - ) - ) - if sorted: - sql = textwrap.dedent( - f""" - SELECT * EXCEPT (`{offsets_id}`) - FROM ({sql}) - ORDER BY `{offsets_id}` - """ - ) - return typing.cast(str, sql) - - def _to_ibis_expr( - self, - ordering_mode: Literal["string_encoded", "offset_col", "unordered"], - order_col_name: Optional[str] = ORDER_ID_COLUMN, - expose_hidden_cols: bool = False, - fraction: Optional[float] = None, - col_id_overrides: typing.Mapping[str, str] = {}, - ): - """ - Creates an Ibis table expression representing the DataFrame. - - ArrayValue objects are sorted, so the following options are available - to reflect this in the ibis expression. - - * "offset_col": Zero-based offsets are generated as a column, this will - not sort the rows however. - * "string_encoded": An ordered string column is provided in output table. - * "unordered": No ordering information will be provided in output. Only - value columns are projected. - - For offset or ordered column, order_col_name can be used to assign the - output label for the ordering column. If none is specified, the default - column name will be 'bigframes_ordering_id' - - Args: - ordering_mode: - How to construct the Ibis expression from the ArrayValue. See - above for details. - order_col_name: - If the ordering mode outputs a single ordering or offsets - column, use this as the column name. - expose_hidden_cols: - If True, include the hidden ordering columns in the results. - Only compatible with `order_by` and `unordered` - ``ordering_mode``. - col_id_overrides: - overrides the column ids for the result - Returns: - An ibis expression representing the data help by the ArrayValue object. - """ - assert ordering_mode in ( - "string_encoded", - "offset_col", - "unordered", - ) - if expose_hidden_cols and ordering_mode in ("ordered_col", "offset_col"): - raise ValueError( - f"Cannot expose hidden ordering columns with ordering_mode {ordering_mode}" + return ArrayValue( + nodes.WindowOpNode( + child=self.node, + column_name=column_name, + op=op, + window_spec=window_spec, + output_name=output_name, + never_skip_nulls=never_skip_nulls, + skip_reproject_unsafe=skip_reproject_unsafe, ) - - columns = list(self._columns) - columns_to_drop: list[ - str - ] = [] # Ordering/Filtering columns that will be dropped at end - - if self._reduced_predicate is not None: - columns.append(self._reduced_predicate) - # Usually drop predicate as it is will be all TRUE after filtering - if not expose_hidden_cols: - columns_to_drop.append(self._reduced_predicate.get_name()) - - order_columns = self._create_order_columns( - ordering_mode, order_col_name, expose_hidden_cols ) - columns.extend(order_columns) - - # Special case for empty tables, since we can't create an empty - # projection. - if not columns: - return ibis.memtable([]) - - # Make sure all dtypes are the "canonical" ones for BigFrames. This is - # important for operations like UNION where the schema must match. - table = self._table.select( - bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns - ) - base_table = table - if self._reduced_predicate is not None: - table = table.filter(base_table[PREDICATE_COLUMN]) - table = table.drop(*columns_to_drop) - if col_id_overrides: - table = table.relabel(col_id_overrides) - if fraction is not None: - table = table.filter(ibis.random() < ibis.literal(fraction)) - return table - - def _create_order_columns( - self, - ordering_mode: str, - order_col_name: Optional[str], - expose_hidden_cols: bool, - ) -> typing.Sequence[ibis_types.Value]: - # Generate offsets if current ordering id semantics are not sufficiently strict - if ordering_mode == "offset_col": - return (self._create_offset_column().name(order_col_name),) - elif ordering_mode == "string_encoded": - return (self._create_string_ordering_column().name(order_col_name),) - elif expose_hidden_cols: - return self._hidden_ordering_columns - return () - - def _create_offset_column(self) -> ibis_types.IntegerColumn: - if self._ordering.total_order_col and self._ordering.is_sequential: - offsets = self._get_any_column(self._ordering.total_order_col.column_id) - return typing.cast(ibis_types.IntegerColumn, offsets) - else: - window = ibis.window(order_by=self._ibis_order) - if self._predicates: - window = window.group_by(self._reduced_predicate) - offsets = ibis.row_number().over(window) - return typing.cast(ibis_types.IntegerColumn, offsets) - - def _create_string_ordering_column(self) -> ibis_types.StringColumn: - if self._ordering.total_order_col and self._ordering.is_string_encoded: - string_order_ids = self._get_any_column( - self._ordering.total_order_col.column_id - ) - return typing.cast(ibis_types.StringColumn, string_order_ids) - if ( - self._ordering.total_order_col - and self._ordering.integer_encoding.is_encoded - ): - # Special case: non-negative integer ordering id can be converted directly to string without regenerating row numbers - int_values = self._get_any_column(self._ordering.total_order_col.column_id) - return encode_order_string( - typing.cast(ibis_types.IntegerColumn, int_values), - ) - else: - # Have to build string from scratch - window = ibis.window(order_by=self._ibis_order) - if self._predicates: - window = window.group_by(self._reduced_predicate) - row_nums = typing.cast( - ibis_types.IntegerColumn, ibis.row_number().over(window) - ) - return encode_order_string(row_nums) - - def start_query( - self, - job_config: Optional[bigquery.job.QueryJobConfig] = None, - max_results: Optional[int] = None, - *, - sorted: bool = True, - ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: - """Execute a query and return metadata about the results.""" - # TODO(swast): Cache the job ID so we can look it up again if they ask - # for the results? We'd need a way to invalidate the cache if DataFrame - # becomes mutable, though. Or move this method to the immutable - # expression class. - # TODO(swast): We might want to move this method to Session and/or - # provide our own minimal metadata class. Tight coupling to the - # BigQuery client library isn't ideal, especially if we want to support - # a LocalSession for unit testing. - # TODO(swast): Add a timeout here? If the query is taking a long time, - # maybe we just print the job metadata that we have so far? - sql = self.to_sql(sorted=True) # type:ignore - return self._session._start_query( - sql=sql, - job_config=job_config, - max_results=max_results, - ) - - def _get_table_size(self, destination_table): - return self._session._get_table_size(destination_table) def _reproject_to_table(self) -> ArrayValue: """ @@ -881,74 +338,25 @@ def _reproject_to_table(self) -> ArrayValue: some operations such as window operations that cannot be used recursively in projections. """ - table = self._to_ibis_expr( - "unordered", - expose_hidden_cols=True, - ) - columns = [table[column_name] for column_name in self._column_names] - ordering_col_ids = [ - ref.column_id for ref in self._ordering.all_ordering_columns - ] - hidden_ordering_columns = [ - table[column_name] - for column_name in self._hidden_ordering_column_names - if column_name in ordering_col_ids - ] return ArrayValue( - self._session, - table, - columns=columns, - hidden_ordering_columns=hidden_ordering_columns, - ordering=self._ordering, - ) - - def _ibis_window_from_spec(self, window_spec: WindowSpec, allow_ties: bool = False): - group_by: typing.List[ibis_types.Value] = ( - [ - typing.cast( - ibis_types.Column, _as_identity(self._get_ibis_column(column)) - ) - for column in window_spec.grouping_keys - ] - if window_spec.grouping_keys - else [] - ) - if self._reduced_predicate is not None: - group_by.append(self._reduced_predicate) - if window_spec.ordering: - order_by = _convert_ordering_to_table_values( - {**self._column_names, **self._hidden_ordering_column_names}, - window_spec.ordering, + nodes.ReprojectOpNode( + child=self.node, ) - if not allow_ties: - # Most operator need an unambiguous ordering, so the table's total ordering is appended - order_by = tuple([*order_by, *self._ibis_order]) - elif (window_spec.following is not None) or (window_spec.preceding is not None): - # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. - order_by = tuple(self._ibis_order) - else: - # Unbound grouping window. Suitable for aggregations but not for analytic function application. - order_by = None - return ibis.window( - preceding=window_spec.preceding, - following=window_spec.following, - order_by=order_by, - group_by=group_by, ) def unpivot( self, row_labels: typing.Sequence[typing.Hashable], unpivot_columns: typing.Sequence[ - typing.Tuple[str, typing.Sequence[typing.Optional[str]]] + typing.Tuple[str, typing.Tuple[typing.Optional[str], ...]] ], *, passthrough_columns: typing.Sequence[str] = (), index_col_ids: typing.Sequence[str] = ["index"], dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] + bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] ] = pandas.Float64Dtype(), - how="left", + how: typing.Literal["left", "right"] = "left", ) -> ArrayValue: """ Unpivot ArrayValue columns. @@ -963,133 +371,23 @@ def unpivot( Returns: ArrayValue: The unpivoted ArrayValue """ - if how not in ("left", "right"): - raise ValueError("'how' must be 'left' or 'right'") - table = self._to_ibis_expr("unordered", expose_hidden_cols=True) - row_n = len(row_labels) - hidden_col_ids = self._hidden_ordering_column_names.keys() - if not all( - len(source_columns) == row_n for _, source_columns in unpivot_columns - ): - raise ValueError("Columns and row labels must all be same length.") - - unpivot_offset_id = bigframes.core.guid.generate_guid("unpivot_offsets_") - unpivot_table = table.cross_join( - ibis.memtable({unpivot_offset_id: range(row_n)}) - ) - # Use ibis memtable to infer type of rowlabels (if possible) - # TODO: Allow caller to specify dtype - if isinstance(row_labels[0], tuple): - labels_table = ibis.memtable(row_labels) - labels_ibis_types = [ - labels_table[col].type() for col in labels_table.columns - ] - else: - labels_ibis_types = [ibis.memtable({"col": row_labels})["col"].type()] - labels_dtypes = [ - bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type) - for ibis_type in labels_ibis_types - ] - - label_columns = [] - for label_part, (col_id, label_dtype) in enumerate( - zip(index_col_ids, labels_dtypes) - ): - # interpret as tuples even if it wasn't originally so can apply same logic for multi-column labels - labels_as_tuples = [ - label if isinstance(label, tuple) else (label,) for label in row_labels - ] - cases = [ - ( - i, - bigframes.dtypes.literal_to_ibis_scalar( - label_tuple[label_part], # type:ignore - force_dtype=label_dtype, # type:ignore - ), - ) - for i, label_tuple in enumerate(labels_as_tuples) - ] - labels_value = ( - typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]) - .cases(cases, default=None) # type:ignore - .name(col_id) - ) - label_columns.append(labels_value) - - unpivot_values = [] - for j in range(len(unpivot_columns)): - col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype - result_col, source_cols = unpivot_columns[j] - null_value = bigframes.dtypes.literal_to_ibis_scalar( - None, force_dtype=col_dtype - ) - ibis_values = [ - ops.AsTypeOp(col_dtype)._as_ibis(unpivot_table[col]) - if col is not None - else null_value - for col in source_cols - ] - cases = [(i, ibis_values[i]) for i in range(len(ibis_values))] - unpivot_value = typing.cast( - ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id] - ).cases( - cases, default=null_value # type:ignore - ) - unpivot_values.append(unpivot_value.name(result_col)) - - unpivot_table = unpivot_table.select( - passthrough_columns, - *label_columns, - *unpivot_values, - *hidden_col_ids, - unpivot_offset_id, - ) - - # Extend the original ordering using unpivot_offset_id - old_ordering = self._ordering - if how == "left": - new_ordering = ExpressionOrdering( - ordering_value_columns=[ - *old_ordering.ordering_value_columns, - OrderingColumnReference(unpivot_offset_id), - ], - total_ordering_columns=frozenset( - [*old_ordering.total_ordering_columns, unpivot_offset_id] - ), - ) - else: # how=="right" - new_ordering = ExpressionOrdering( - ordering_value_columns=[ - OrderingColumnReference(unpivot_offset_id), - *old_ordering.ordering_value_columns, - ], - total_ordering_columns=frozenset( - [*old_ordering.total_ordering_columns, unpivot_offset_id] - ), - ) - value_columns = [ - unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns - ] - passthrough_values = [unpivot_table[col] for col in passthrough_columns] - hidden_ordering_columns = [ - unpivot_table[unpivot_offset_id], - *[unpivot_table[hidden_col] for hidden_col in hidden_col_ids], - ] return ArrayValue( - session=self._session, - table=unpivot_table, - columns=[ - *[unpivot_table[col_id] for col_id in index_col_ids], - *value_columns, - *passthrough_values, - ], - hidden_ordering_columns=hidden_ordering_columns, - ordering=new_ordering, + nodes.UnpivotNode( + child=self.node, + row_labels=tuple(row_labels), + unpivot_columns=tuple(unpivot_columns), + passthrough_columns=tuple(passthrough_columns), + index_col_ids=tuple(index_col_ids), + dtype=dtype, + how=how, + ) ) def assign(self, source_id: str, destination_id: str) -> ArrayValue: - return self._set_or_replace_by_id( - destination_id, self._get_ibis_column(source_id) + return ArrayValue( + nodes.AssignNode( + child=self.node, source_id=source_id, destination_id=destination_id + ) ) def assign_constant( @@ -1098,128 +396,41 @@ def assign_constant( value: typing.Any, dtype: typing.Optional[bigframes.dtypes.Dtype], ) -> ArrayValue: - # TODO(b/281587571): Solve scalar constant aggregation problem w/Ibis. - ibis_value = bigframes.dtypes.literal_to_ibis_scalar(value, dtype) - if ibis_value is None: - raise NotImplementedError( - f"Type not supported as scalar value {type(value)}. {constants.FEEDBACK_LINK}" - ) - expr = self._set_or_replace_by_id(destination_id, ibis_value) - return expr._reproject_to_table() - - def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> ArrayValue: - """Safely assign by id while maintaining ordering integrity.""" - # TODO: Split into explicit set and replace methods - ordering_col_ids = [ - col_ref.column_id for col_ref in self._ordering.ordering_value_columns - ] - if id in ordering_col_ids: - return self._hide_column(id)._set_or_replace_by_id(id, new_value) - - builder = self.builder() - if id in self.column_ids: - builder.columns = [ - val if (col_id != id) else new_value.name(id) - for col_id, val in zip(self.column_ids, self._columns) - ] - else: - builder.columns = [*self.columns, new_value.name(id)] - return builder.build() - - def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue: - """Write the ArrayValue to a session table and create a new block object that references it.""" - ibis_expr = self._to_ibis_expr("unordered", expose_hidden_cols=True) - destination = self._session._ibis_to_session_table( - ibis_expr, cluster_cols=cluster_cols, api_name="cache" - ) - table_expression = self._session.ibis_client.table( - f"{destination.project}.{destination.dataset_id}.{destination.table_id}" - ) - new_columns = [table_expression[column] for column in self.column_ids] - new_hidden_columns = [ - table_expression[column] for column in self._hidden_ordering_column_names - ] return ArrayValue( - self._session, - table_expression, - columns=new_columns, - hidden_ordering_columns=new_hidden_columns, - ordering=self._ordering, + nodes.AssignConstantNode( + child=self.node, destination_id=destination_id, value=value, dtype=dtype + ) ) - -class ArrayValueBuilder: - """Mutable expression class. - Use ArrayValue.builder() to create from a ArrayValue object. - """ - - def __init__( + def join( self, - session: Session, - table: ibis_types.Table, - ordering: ExpressionOrdering, - columns: Collection[ibis_types.Value] = (), - hidden_ordering_columns: Collection[ibis_types.Value] = (), - predicates: Optional[Collection[ibis_types.BooleanValue]] = None, + self_column_ids: typing.Sequence[str], + other: ArrayValue, + other_column_ids: typing.Sequence[str], + *, + how: Literal[ + "inner", + "left", + "outer", + "right", + ], + allow_row_identity_join: bool = True, ): - self.session = session - self.table = table - self.columns = list(columns) - self.hidden_ordering_columns = list(hidden_ordering_columns) - self.ordering = ordering - self.predicates = list(predicates) if predicates is not None else None - - def build(self) -> ArrayValue: return ArrayValue( - session=self.session, - table=self.table, - columns=self.columns, - hidden_ordering_columns=self.hidden_ordering_columns, - ordering=self.ordering, - predicates=self.predicates, - ) - - -def _reduce_predicate_list( - predicate_list: typing.Collection[ibis_types.BooleanValue], -) -> ibis_types.BooleanValue: - """Converts a list of predicates BooleanValues into a single BooleanValue.""" - if len(predicate_list) == 0: - raise ValueError("Cannot reduce empty list of predicates") - if len(predicate_list) == 1: - (item,) = predicate_list - return item - return functools.reduce(lambda acc, pred: acc.__and__(pred), predicate_list) - - -def _convert_ordering_to_table_values( - value_lookup: typing.Mapping[str, ibis_types.Value], - ordering_columns: typing.Sequence[OrderingColumnReference], -) -> typing.Sequence[ibis_types.Value]: - column_refs = ordering_columns - ordering_values = [] - for ordering_col in column_refs: - column = typing.cast(ibis_types.Column, value_lookup[ordering_col.column_id]) - ordering_value = ( - ibis.asc(column) - if ordering_col.direction.is_ascending - else ibis.desc(column) + nodes.JoinNode( + left_child=self.node, + right_child=other.node, + left_column_ids=tuple(self_column_ids), + right_column_ids=tuple(other_column_ids), + how=how, + allow_row_identity_join=allow_row_identity_join, + ) ) - # Bigquery SQL considers NULLS to be "smallest" values, but we need to override in these cases. - if (not ordering_col.na_last) and (not ordering_col.direction.is_ascending): - # Force nulls to be first - is_null_val = typing.cast(ibis_types.Column, column.isnull()) - ordering_values.append(ibis.desc(is_null_val)) - elif (ordering_col.na_last) and (ordering_col.direction.is_ascending): - # Force nulls to be last - is_null_val = typing.cast(ibis_types.Column, column.isnull()) - ordering_values.append(ibis.asc(is_null_val)) - ordering_values.append(ordering_value) - return ordering_values + def _uniform_sampling(self, fraction: float) -> ArrayValue: + """Sampling the table on given fraction. -def _as_identity(value: ibis_types.Value): - # Some types need to be converted to string to enable groupby - if value.type().is_float64() or value.type().is_geospatial(): - return value.cast(ibis_dtypes.str) - return value + .. warning:: + The row numbers of result is non-deterministic, avoid to use. + """ + return ArrayValue(nodes.RandomSampleNode(self.node, fraction)) diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index b0f05f4798..3706bf1681 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -21,6 +21,7 @@ import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.ordering as ordering +import bigframes.core.window_spec as windows import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -68,21 +69,21 @@ def indicate_duplicates( if keep == "first": # Count how many copies occur up to current copy of value # Discard this value if there are copies BEFORE - window_spec = core.WindowSpec( + window_spec = windows.WindowSpec( grouping_keys=tuple(columns), following=0, ) elif keep == "last": # Count how many copies occur up to current copy of values # Discard this value if there are copies AFTER - window_spec = core.WindowSpec( + window_spec = windows.WindowSpec( grouping_keys=tuple(columns), preceding=0, ) else: # keep == False # Count how many copies of the value occur in entire series. # Discard this value if there are copies ANYWHERE - window_spec = core.WindowSpec(grouping_keys=tuple(columns)) + window_spec = windows.WindowSpec(grouping_keys=tuple(columns)) block, dummy = block.create_constant(1) block, val_count_col_id = block.apply_window_op( dummy, @@ -131,7 +132,7 @@ def value_counts( ) count_id = agg_ids[0] if normalize: - unbound_window = core.WindowSpec() + unbound_window = windows.WindowSpec() block, total_count_id = block.apply_window_op( count_id, agg_ops.sum_op, unbound_window ) @@ -153,7 +154,7 @@ def value_counts( def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block: column_labels = block.column_labels - window_spec = core.WindowSpec( + window_spec = windows.WindowSpec( preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) @@ -195,7 +196,7 @@ def rank( ops.isnull_op, ) nullity_col_ids.append(nullity_col_id) - window = core.WindowSpec( + window = windows.WindowSpec( # BigQuery has syntax to reorder nulls with "NULLS FIRST/LAST", but that is unavailable through ibis presently, so must order on a separate nullity expression first. ordering=( ordering.OrderingColumnReference( @@ -229,7 +230,7 @@ def rank( block, result_id = block.apply_window_op( rownum_col_ids[i], agg_op, - window_spec=core.WindowSpec(grouping_keys=[columns[i]]), + window_spec=windows.WindowSpec(grouping_keys=(columns[i],)), skip_reproject_unsafe=(i < (len(columns) - 1)), ) post_agg_rownum_col_ids.append(result_id) @@ -311,7 +312,7 @@ def nsmallest( block, counter = block.apply_window_op( column_ids[0], agg_ops.rank_op, - window_spec=core.WindowSpec(ordering=order_refs), + window_spec=windows.WindowSpec(ordering=tuple(order_refs)), ) block, condition = block.apply_unary_op( counter, ops.partial_right(ops.le_op, n) @@ -343,7 +344,7 @@ def nlargest( block, counter = block.apply_window_op( column_ids[0], agg_ops.rank_op, - window_spec=core.WindowSpec(ordering=order_refs), + window_spec=windows.WindowSpec(ordering=tuple(order_refs)), ) block, condition = block.apply_unary_op( counter, ops.partial_right(ops.le_op, n) @@ -440,14 +441,14 @@ def _mean_delta_to_power( grouping_column_ids: typing.Sequence[str], ) -> typing.Tuple[blocks.Block, typing.Sequence[str]]: """Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis.""" - window = core.WindowSpec(grouping_keys=grouping_column_ids) + window = windows.WindowSpec(grouping_keys=tuple(grouping_column_ids)) block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window) delta_ids = [] cube_op = ops.partial_right(ops.pow_op, n_power) for val_id, mean_val_id in zip(column_ids, mean_ids): block, delta_id = block.apply_binary_op(val_id, mean_val_id, ops.sub_op) block, delta_power_id = block.apply_unary_op(delta_id, cube_op) - block = block.drop_columns(delta_id) + block = block.drop_columns([delta_id]) delta_ids.append(delta_power_id) return block, delta_ids @@ -645,7 +646,7 @@ def _idx_extrema( for idx_col in original_block.index_columns ], ] - window_spec = core.WindowSpec(ordering=order_refs) + window_spec = windows.WindowSpec(ordering=tuple(order_refs)) idx_col = original_block.index_columns[0] block, result_col = block.apply_window_op( idx_col, agg_ops.first_op, window_spec diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9db193a04e..cc13edeaf9 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -35,7 +35,6 @@ import bigframes.core as core import bigframes.core.guid as guid import bigframes.core.indexes as indexes -import bigframes.core.joins as joins import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as ordering import bigframes.core.utils @@ -378,7 +377,7 @@ def _to_dataframe(self, result) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" dtypes = dict(zip(self.index_columns, self.index_dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) - return self._expr._session._rows_to_dataframe(result, dtypes) + return self._expr.session._rows_to_dataframe(result, dtypes) def to_pandas( self, @@ -422,7 +421,7 @@ def to_pandas_batches(self): dtypes.update(zip(self.value_columns, self.dtypes)) results_iterator, _ = self._expr.start_query() for arrow_table in results_iterator.to_arrow_iterable( - bqstorage_client=self._expr._session.bqstoragereadclient + bqstorage_client=self._expr.session.bqstoragereadclient ): df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) self._copy_index_to_pandas(df) @@ -454,7 +453,9 @@ def _compute_and_count( results_iterator, query_job = expr.start_query(max_results=max_results) - table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES + table_size = ( + expr.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES + ) fraction = ( max_download_size / table_size if (max_download_size is not None) and (table_size != 0) @@ -819,7 +820,9 @@ def aggregate_all_and_stack( axis: int | str = 0, value_col_id: str = "values", dropna: bool = True, - dtype=pd.Float64Dtype(), + dtype: typing.Union[ + bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] + ] = pd.Float64Dtype(), ) -> Block: axis_n = utils.get_axis_number(axis) if axis_n == 0: @@ -829,7 +832,7 @@ def aggregate_all_and_stack( result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot( row_labels=self.column_labels.to_list(), index_col_ids=["index"], - unpivot_columns=[(value_col_id, self.value_columns)], + unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]), dtype=dtype, ) return Block(result_expr, index_columns=["index"], column_labels=[None]) @@ -841,7 +844,7 @@ def aggregate_all_and_stack( stacked_expr = expr_with_offsets.unpivot( row_labels=self.column_labels.to_list(), index_col_ids=[guid.generate_guid()], - unpivot_columns=[(value_col_id, self.value_columns)], + unpivot_columns=[(value_col_id, tuple(self.value_columns))], passthrough_columns=[*self.index_columns, offset_col], dtype=dtype, ) @@ -1029,13 +1032,13 @@ def summarize( for col_id in column_ids ] columns = [ - (col_id, [f"{col_id}-{stat.name}" for stat in stats]) + (col_id, tuple(f"{col_id}-{stat.name}" for stat in stats)) for col_id in column_ids ] expr = self.expr.aggregate(aggregations).unpivot( labels, - unpivot_columns=columns, - index_col_ids=[label_col_id], + unpivot_columns=tuple(columns), + index_col_ids=tuple([label_col_id]), ) labels = self._get_labels_for_columns(column_ids) return Block(expr, column_labels=labels, index_columns=[label_col_id]) @@ -1342,7 +1345,7 @@ def stack(self, how="left", levels: int = 1): passthrough_columns=self.index_columns, unpivot_columns=unpivot_columns, index_col_ids=added_index_columns, - dtype=dtypes, + dtype=tuple(dtypes), how=how, ) new_index_level_names = self.column_labels.names[-levels:] @@ -1382,7 +1385,7 @@ def _create_stack_column( dtype = self._column_type(input_id) input_columns.append(input_id) # Input column i is the first one that - return input_columns, dtype or pd.Float64Dtype() + return tuple(input_columns), dtype or pd.Float64Dtype() def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype: col_offset = self.value_columns.index(col_id) @@ -1497,8 +1500,7 @@ def merge( sort: bool, suffixes: tuple[str, str] = ("_x", "_y"), ) -> Block: - joined_expr = joins.join_by_column( - self.expr, + joined_expr = self.expr.join( left_join_ids, other.expr, right_join_ids, @@ -1708,7 +1710,7 @@ def _is_monotonic( return result -def block_from_local(data, session=None) -> Block: +def block_from_local(data) -> Block: pd_data = pd.DataFrame(data) columns = pd_data.columns @@ -1730,7 +1732,7 @@ def block_from_local(data, session=None) -> Block: ) index_ids = pd_data.columns[: len(index_labels)] - keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session) + keys_expr = core.ArrayValue.from_pandas(pd_data) return Block( keys_expr, column_labels=columns, diff --git a/bigframes/core/compile/__init__.py b/bigframes/core/compile/__init__.py new file mode 100644 index 0000000000..c86f4463dc --- /dev/null +++ b/bigframes/core/compile/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bigframes.core.compile.compiled import CompiledArrayValue +from bigframes.core.compile.compiler import compile_node + +__all__ = [ + "compile_node", + "CompiledArrayValue", +] diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py new file mode 100644 index 0000000000..1134f1aab0 --- /dev/null +++ b/bigframes/core/compile/compiled.py @@ -0,0 +1,1121 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import functools +import math +import textwrap +import typing +from typing import Collection, Iterable, Literal, Optional, Sequence + +import ibis +import ibis.backends.bigquery as ibis_bigquery +import ibis.expr.datatypes as ibis_dtypes +import ibis.expr.types as ibis_types +import pandas + +import bigframes.constants as constants +import bigframes.core.guid +from bigframes.core.ordering import ( + encode_order_string, + ExpressionOrdering, + IntegerEncoding, + OrderingColumnReference, + reencode_order_string, + StringEncoding, +) +import bigframes.core.utils as utils +from bigframes.core.window_spec import WindowSpec +import bigframes.dtypes +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops + +ORDER_ID_COLUMN = "bigframes_ordering_id" +PREDICATE_COLUMN = "bigframes_predicate" + + +class CompiledArrayValue: + """Immutable BigQuery DataFrames expression tree. + + Note: Usage of this class is considered to be private and subject to change + at any time. + + This class is a wrapper around Ibis expressions. Its purpose is to defer + Ibis projection operations to keep generated SQL small and correct when + mixing and matching columns from different versions of a DataFrame. + + Args: + table: An Ibis table expression. + columns: Ibis value expressions that can be projected as columns. + hidden_ordering_columns: Ibis value expressions to store ordering. + ordering: An ordering property of the data frame. + predicates: A list of filters on the data frame. + """ + + def __init__( + self, + table: ibis_types.Table, + columns: Sequence[ibis_types.Value], + hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None, + ordering: ExpressionOrdering = ExpressionOrdering(), + predicates: Optional[Collection[ibis_types.BooleanValue]] = None, + ): + self._table = table + self._predicates = tuple(predicates) if predicates is not None else () + # TODO: Validate ordering + if not ordering.total_ordering_columns: + raise ValueError("Must have total ordering defined by one or more columns") + self._ordering = ordering + # Allow creating a DataFrame directly from an Ibis table expression. + # TODO(swast): Validate that each column references the same table (or + # no table for literal values). + self._columns = tuple(columns) + + # Meta columns store ordering, or other data that doesn't correspond to dataframe columns + self._hidden_ordering_columns = ( + tuple(hidden_ordering_columns) + if hidden_ordering_columns is not None + else () + ) + + # To allow for more efficient lookup by column name, create a + # dictionary mapping names to column values. + self._column_names = {column.get_name(): column for column in self._columns} + self._hidden_ordering_column_names = { + column.get_name(): column for column in self._hidden_ordering_columns + } + ### Validation + value_col_ids = self._column_names.keys() + hidden_col_ids = self._hidden_ordering_column_names.keys() + + all_columns = value_col_ids | hidden_col_ids + ordering_valid = all( + col.column_id in all_columns for col in ordering.all_ordering_columns + ) + if value_col_ids & hidden_col_ids: + raise ValueError( + f"Keys in both hidden and exposed list: {value_col_ids & hidden_col_ids}" + ) + if not ordering_valid: + raise ValueError(f"Illegal ordering keys: {ordering.all_ordering_columns}") + + @classmethod + def mem_expr_from_pandas( + cls, + pd_df: pandas.DataFrame, + ) -> CompiledArrayValue: + """ + Builds an in-memory only (SQL only) expr from a pandas dataframe. + """ + # We can't include any hidden columns in the ArrayValue constructor, so + # grab the column names before we add the hidden ordering column. + column_names = [str(column) for column in pd_df.columns] + # Make sure column names are all strings. + pd_df = pd_df.set_axis(column_names, axis="columns") + pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))}) + + # ibis memtable cannot handle NA, must convert to None + pd_df = pd_df.astype("object") # type: ignore + pd_df = pd_df.where(pandas.notnull(pd_df), None) + + # NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases. + keys_memtable = ibis.memtable(pd_df) + schema = keys_memtable.schema() + new_schema = [] + for column_index, column in enumerate(schema): + if column == ORDER_ID_COLUMN: + new_type: ibis_dtypes.DataType = ibis_dtypes.int64 + else: + column_type = schema[column] + # The autodetected type might not be one we can support, such + # as NULL type for empty rows, so convert to a type we do + # support. + new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype( + bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type) + ) + # TODO(swast): Ibis memtable doesn't use backticks in struct + # field names, so spaces and other characters aren't allowed in + # the memtable context. Blocked by + # https://github.com/ibis-project/ibis/issues/7187 + column = f"col_{column_index}" + new_schema.append((column, new_type)) + + # must set non-null column labels. these are not the user-facing labels + pd_df = pd_df.set_axis( + [column for column, _ in new_schema], + axis="columns", + ) + keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema)) + + return cls( + keys_memtable, + columns=[ + keys_memtable[f"col_{column_index}"].name(column) + for column_index, column in enumerate(column_names) + ], + ordering=ExpressionOrdering( + ordering_value_columns=tuple( + [OrderingColumnReference(ORDER_ID_COLUMN)] + ), + total_ordering_columns=frozenset([ORDER_ID_COLUMN]), + ), + hidden_ordering_columns=(keys_memtable[ORDER_ID_COLUMN],), + ) + + @property + def columns(self) -> typing.Tuple[ibis_types.Value, ...]: + return self._columns + + @property + def column_ids(self) -> typing.Sequence[str]: + return tuple(self._column_names.keys()) + + @property + def _hidden_column_ids(self) -> typing.Sequence[str]: + return tuple(self._hidden_ordering_column_names.keys()) + + @property + def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: + """Returns the frame's predicates as an equivalent boolean value, useful where a single predicate value is preferred.""" + return ( + _reduce_predicate_list(self._predicates).name(PREDICATE_COLUMN) + if self._predicates + else None + ) + + @property + def _ibis_order(self) -> Sequence[ibis_types.Value]: + """Returns a sequence of ibis values which can be directly used to order a table expression. Has direction modifiers applied.""" + return _convert_ordering_to_table_values( + {**self._column_names, **self._hidden_ordering_column_names}, + self._ordering.all_ordering_columns, + ) + + def builder(self) -> ArrayValueBuilder: + """Creates a mutable builder for expressions.""" + # Since ArrayValue is intended to be immutable (immutability offers + # potential opportunities for caching, though we might need to introduce + # more node types for that to be useful), we create a builder class. + return ArrayValueBuilder( + self._table, + columns=self._columns, + hidden_ordering_columns=self._hidden_ordering_columns, + ordering=self._ordering, + predicates=self._predicates, + ) + + def drop_columns(self, columns: Iterable[str]) -> CompiledArrayValue: + # Must generate offsets if we are dropping a column that ordering depends on + expr = self + for ordering_column in set(columns).intersection( + [col.column_id for col in self._ordering.ordering_value_columns] + ): + expr = self._hide_column(ordering_column) + + expr_builder = expr.builder() + remain_cols = [ + column for column in expr.columns if column.get_name() not in columns + ] + expr_builder.columns = remain_cols + return expr_builder.build() + + def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: + ibis_type = typing.cast( + bigframes.dtypes.IbisDtype, self._get_any_column(key).type() + ) + return typing.cast( + bigframes.dtypes.Dtype, + bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type), + ) + + def _get_ibis_column(self, key: str) -> ibis_types.Value: + """Gets the Ibis expression for a given column.""" + if key not in self.column_ids: + raise ValueError( + "Column name {} not in set of values: {}".format(key, self.column_ids) + ) + return typing.cast(ibis_types.Value, self._column_names[key]) + + def _get_any_column(self, key: str) -> ibis_types.Value: + """Gets the Ibis expression for a given column. Will also get hidden columns.""" + all_columns = {**self._column_names, **self._hidden_ordering_column_names} + if key not in all_columns.keys(): + raise ValueError( + "Column name {} not in set of values: {}".format( + key, all_columns.keys() + ) + ) + return typing.cast(ibis_types.Value, all_columns[key]) + + def _get_hidden_ordering_column(self, key: str) -> ibis_types.Column: + """Gets the Ibis expression for a given hidden column.""" + if key not in self._hidden_ordering_column_names.keys(): + raise ValueError( + "Column name {} not in set of values: {}".format( + key, self._hidden_ordering_column_names.keys() + ) + ) + return typing.cast(ibis_types.Column, self._hidden_ordering_column_names[key]) + + def filter(self, predicate_id: str, keep_null: bool = False) -> CompiledArrayValue: + """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" + condition = typing.cast( + ibis_types.BooleanValue, self._get_ibis_column(predicate_id) + ) + if keep_null: + condition = typing.cast( + ibis_types.BooleanValue, + condition.fillna( + typing.cast(ibis_types.BooleanScalar, ibis_types.literal(True)) + ), + ) + return self._filter(condition) + + def _filter(self, predicate_value: ibis_types.BooleanValue) -> CompiledArrayValue: + """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" + expr = self.builder() + expr.ordering = expr.ordering.with_non_sequential() + expr.predicates = [*self._predicates, predicate_value] + return expr.build() + + def order_by( + self, by: Sequence[OrderingColumnReference], stable: bool = False + ) -> CompiledArrayValue: + expr_builder = self.builder() + expr_builder.ordering = self._ordering.with_ordering_columns(by, stable=stable) + return expr_builder.build() + + def reversed(self) -> CompiledArrayValue: + expr_builder = self.builder() + expr_builder.ordering = self._ordering.with_reverse() + return expr_builder.build() + + def _uniform_sampling(self, fraction: float) -> CompiledArrayValue: + """Sampling the table on given fraction. + + .. warning:: + The row numbers of result is non-deterministic, avoid to use. + """ + table = self._to_ibis_expr( + "unordered", expose_hidden_cols=True, fraction=fraction + ) + columns = [table[column_name] for column_name in self._column_names] + hidden_ordering_columns = [ + table[column_name] for column_name in self._hidden_ordering_column_names + ] + return CompiledArrayValue( + table, + columns=columns, + hidden_ordering_columns=hidden_ordering_columns, + ordering=self._ordering, + ) + + @property + def _offsets(self) -> ibis_types.IntegerColumn: + if not self._ordering.is_sequential: + raise ValueError( + "Expression does not have offsets. Generate them first using project_offsets." + ) + if not self._ordering.total_order_col: + raise ValueError( + "Ordering is invalid. Marked as sequential but no total order columns." + ) + column = self._get_any_column(self._ordering.total_order_col.column_id) + return typing.cast(ibis_types.IntegerColumn, column) + + def _project_offsets(self) -> CompiledArrayValue: + """Create a new expression that contains offsets. Should only be executed when offsets are needed for an operations. Has no effect on expression semantics.""" + if self._ordering.is_sequential: + return self + # TODO(tbergeron): Enforce total ordering + table = self._to_ibis_expr( + ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN + ) + columns = [table[column_name] for column_name in self._column_names] + ordering = ExpressionOrdering( + ordering_value_columns=tuple([OrderingColumnReference(ORDER_ID_COLUMN)]), + total_ordering_columns=frozenset([ORDER_ID_COLUMN]), + integer_encoding=IntegerEncoding(True, is_sequential=True), + ) + return CompiledArrayValue( + table, + columns=columns, + hidden_ordering_columns=[table[ORDER_ID_COLUMN]], + ordering=ordering, + ) + + def _hide_column(self, column_id) -> CompiledArrayValue: + """Pushes columns to hidden columns list. Used to hide ordering columns that have been dropped or destructively mutated.""" + expr_builder = self.builder() + # Need to rename column as caller might be creating a new row with the same name but different values. + # Can avoid this if don't allow callers to determine ids and instead generate unique ones in this class. + new_name = bigframes.core.guid.generate_guid(prefix="bigframes_hidden_") + expr_builder.hidden_ordering_columns = [ + *self._hidden_ordering_columns, + self._get_ibis_column(column_id).name(new_name), + ] + expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name}) + return expr_builder.build() + + def promote_offsets(self, col_id: str) -> CompiledArrayValue: + """ + Convenience function to promote copy of column offsets to a value column. Can be used to reset index. + """ + # Special case: offsets already exist + ordering = self._ordering + + if (not ordering.is_sequential) or (not ordering.total_order_col): + return self._project_offsets().promote_offsets(col_id) + expr_builder = self.builder() + expr_builder.columns = [ + self._get_any_column(ordering.total_order_col.column_id).name(col_id), + *self.columns, + ] + return expr_builder.build() + + def select_columns(self, column_ids: typing.Sequence[str]) -> CompiledArrayValue: + """Creates a new expression based on this expression with new columns.""" + columns = [self._get_ibis_column(col_id) for col_id in column_ids] + expr = self + for ordering_column in set(self.column_ids).intersection( + [col_ref.column_id for col_ref in self._ordering.ordering_value_columns] + ): + # Need to hide ordering columns that are being dropped. Alternatively, could project offsets + expr = expr._hide_column(ordering_column) + builder = expr.builder() + builder.columns = list(columns) + new_expr = builder.build() + return new_expr + + def concat(self, other: typing.Sequence[CompiledArrayValue]) -> CompiledArrayValue: + """Append together multiple ArrayValue objects.""" + if len(other) == 0: + return self + tables = [] + prefix_base = 10 + prefix_size = math.ceil(math.log(len(other) + 1, prefix_base)) + # Must normalize all ids to the same encoding size + max_encoding_size = max( + self._ordering.string_encoding.length, + *[expression._ordering.string_encoding.length for expression in other], + ) + for i, expr in enumerate([self, *other]): + ordering_prefix = str(i).zfill(prefix_size) + table = expr._to_ibis_expr( + ordering_mode="string_encoded", order_col_name=ORDER_ID_COLUMN + ) + # Rename the value columns based on horizontal offset before applying union. + table = table.select( + [ + table[col].name(f"column_{i}") + if col != ORDER_ID_COLUMN + else ( + ordering_prefix + + reencode_order_string( + table[ORDER_ID_COLUMN], max_encoding_size + ) + ).name(ORDER_ID_COLUMN) + for i, col in enumerate(table.columns) + ] + ) + tables.append(table) + combined_table = ibis.union(*tables) + ordering = ExpressionOrdering( + ordering_value_columns=tuple([OrderingColumnReference(ORDER_ID_COLUMN)]), + total_ordering_columns=frozenset([ORDER_ID_COLUMN]), + string_encoding=StringEncoding(True, prefix_size + max_encoding_size), + ) + return CompiledArrayValue( + combined_table, + columns=[ + combined_table[col] + for col in combined_table.columns + if col != ORDER_ID_COLUMN + ], + hidden_ordering_columns=[combined_table[ORDER_ID_COLUMN]], + ordering=ordering, + ) + + def project_unary_op( + self, column_name: str, op: ops.UnaryOp, output_name=None + ) -> CompiledArrayValue: + """Creates a new expression based on this expression with unary operation applied to one column.""" + value = op._as_ibis(self._get_ibis_column(column_name)).name( + output_name or column_name + ) + return self._set_or_replace_by_id(output_name or column_name, value) + + def project_binary_op( + self, + left_column_id: str, + right_column_id: str, + op: ops.BinaryOp, + output_column_id: str, + ) -> CompiledArrayValue: + """Creates a new expression based on this expression with binary operation applied to two columns.""" + value = op( + self._get_ibis_column(left_column_id), + self._get_ibis_column(right_column_id), + ).name(output_column_id) + return self._set_or_replace_by_id(output_column_id, value) + + def project_ternary_op( + self, + col_id_1: str, + col_id_2: str, + col_id_3: str, + op: ops.TernaryOp, + output_column_id: str, + ) -> CompiledArrayValue: + """Creates a new expression based on this expression with ternary operation applied to three columns.""" + value = op( + self._get_ibis_column(col_id_1), + self._get_ibis_column(col_id_2), + self._get_ibis_column(col_id_3), + ).name(output_column_id) + return self._set_or_replace_by_id(output_column_id, value) + + def aggregate( + self, + aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp, str]], + by_column_ids: typing.Sequence[str] = (), + dropna: bool = True, + ) -> CompiledArrayValue: + """ + Apply aggregations to the expression. + Arguments: + aggregations: input_column_id, operation, output_column_id tuples + by_column_id: column id of the aggregation key, this is preserved through the transform + dropna: whether null keys should be dropped + """ + table = self._to_ibis_expr("unordered") + stats = { + col_out: agg_op._as_ibis(table[col_in]) + for col_in, agg_op, col_out in aggregations + } + if by_column_ids: + result = table.group_by(by_column_ids).aggregate(**stats) + # Must have deterministic ordering, so order by the unique "by" column + ordering = ExpressionOrdering( + tuple( + [ + OrderingColumnReference(column_id=column_id) + for column_id in by_column_ids + ] + ), + total_ordering_columns=frozenset(by_column_ids), + ) + columns = tuple(result[key] for key in result.columns) + expr = CompiledArrayValue(result, columns=columns, ordering=ordering) + if dropna: + for column_id in by_column_ids: + expr = expr._filter( + ops.notnull_op._as_ibis(expr._get_ibis_column(column_id)) + ) + # Can maybe remove this as Ordering id is redundant as by_column is unique after aggregation + return expr._project_offsets() + else: + aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} + result = table.aggregate(**aggregates) + # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. + ordering = ExpressionOrdering( + ordering_value_columns=tuple( + [OrderingColumnReference(ORDER_ID_COLUMN)] + ), + total_ordering_columns=frozenset([ORDER_ID_COLUMN]), + integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), + ) + return CompiledArrayValue( + result, + columns=[result[col_id] for col_id in [*stats.keys()]], + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], + ordering=ordering, + ) + + def corr_aggregate( + self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]] + ) -> CompiledArrayValue: + """ + Get correlations between each lef_column_id and right_column_id, stored in the respective output_column_id. + This uses BigQuery's CORR under the hood, and thus only Pearson's method is used. + Arguments: + corr_aggregations: left_column_id, right_column_id, output_column_id tuples + """ + table = self._to_ibis_expr("unordered") + stats = { + col_out: table[col_left].corr(table[col_right], how="pop") + for col_left, col_right, col_out in corr_aggregations + } + aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} + result = table.aggregate(**aggregates) + # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. + ordering = ExpressionOrdering( + ordering_value_columns=tuple([OrderingColumnReference(ORDER_ID_COLUMN)]), + total_ordering_columns=frozenset([ORDER_ID_COLUMN]), + integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), + ) + return CompiledArrayValue( + result, + columns=[result[col_id] for col_id in [*stats.keys()]], + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], + ordering=ordering, + ) + + def project_window_op( + self, + column_name: str, + op: agg_ops.WindowOp, + window_spec: WindowSpec, + output_name=None, + *, + never_skip_nulls=False, + skip_reproject_unsafe: bool = False, + ) -> CompiledArrayValue: + """ + Creates a new expression based on this expression with unary operation applied to one column. + column_name: the id of the input column present in the expression + op: the windowable operator to apply to the input column + window_spec: a specification of the window over which to apply the operator + output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided + never_skip_nulls: will disable null skipping for operators that would otherwise do so + skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection + """ + column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name)) + window = self._ibis_window_from_spec(window_spec, allow_ties=op.handles_ties) + + window_op = op._as_ibis(column, window) + + clauses = [] + if op.skips_nulls and not never_skip_nulls: + clauses.append((column.isnull(), ibis.NA)) + if window_spec.min_periods: + if op.skips_nulls: + # Most operations do not count NULL values towards min_periods + observation_count = agg_ops.count_op._as_ibis(column, window) + else: + # Operations like count treat even NULLs as valid observations for the sake of min_periods + # notnull is just used to convert null values to non-null (FALSE) values to be counted + denulled_value = typing.cast(ibis_types.BooleanColumn, column.notnull()) + observation_count = agg_ops.count_op._as_ibis(denulled_value, window) + clauses.append( + ( + observation_count < ibis_types.literal(window_spec.min_periods), + ibis.NA, + ) + ) + if clauses: + case_statement = ibis.case() + for clause in clauses: + case_statement = case_statement.when(clause[0], clause[1]) + case_statement = case_statement.else_(window_op).end() + window_op = case_statement + + result = self._set_or_replace_by_id(output_name or column_name, window_op) + # TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation. + return result._reproject_to_table() if not skip_reproject_unsafe else result + + def to_sql( + self, + offset_column: typing.Optional[str] = None, + col_id_overrides: typing.Mapping[str, str] = {}, + sorted: bool = False, + ) -> str: + offsets_id = offset_column or ORDER_ID_COLUMN + + sql = ibis_bigquery.Backend().compile( + self._to_ibis_expr( + ordering_mode="offset_col" + if (offset_column or sorted) + else "unordered", + order_col_name=offsets_id, + col_id_overrides=col_id_overrides, + ) + ) + if sorted: + sql = textwrap.dedent( + f""" + SELECT * EXCEPT (`{offsets_id}`) + FROM ({sql}) + ORDER BY `{offsets_id}` + """ + ) + return typing.cast(str, sql) + + def _to_ibis_expr( + self, + ordering_mode: Literal["string_encoded", "offset_col", "unordered"], + order_col_name: Optional[str] = ORDER_ID_COLUMN, + expose_hidden_cols: bool = False, + fraction: Optional[float] = None, + col_id_overrides: typing.Mapping[str, str] = {}, + ): + """ + Creates an Ibis table expression representing the DataFrame. + + ArrayValue objects are sorted, so the following options are available + to reflect this in the ibis expression. + + * "offset_col": Zero-based offsets are generated as a column, this will + not sort the rows however. + * "string_encoded": An ordered string column is provided in output table. + * "unordered": No ordering information will be provided in output. Only + value columns are projected. + + For offset or ordered column, order_col_name can be used to assign the + output label for the ordering column. If none is specified, the default + column name will be 'bigframes_ordering_id' + + Args: + ordering_mode: + How to construct the Ibis expression from the ArrayValue. See + above for details. + order_col_name: + If the ordering mode outputs a single ordering or offsets + column, use this as the column name. + expose_hidden_cols: + If True, include the hidden ordering columns in the results. + Only compatible with `order_by` and `unordered` + ``ordering_mode``. + col_id_overrides: + overrides the column ids for the result + Returns: + An ibis expression representing the data help by the ArrayValue object. + """ + assert ordering_mode in ( + "string_encoded", + "offset_col", + "unordered", + ) + if expose_hidden_cols and ordering_mode in ("ordered_col", "offset_col"): + raise ValueError( + f"Cannot expose hidden ordering columns with ordering_mode {ordering_mode}" + ) + + columns = list(self._columns) + columns_to_drop: list[ + str + ] = [] # Ordering/Filtering columns that will be dropped at end + + if self._reduced_predicate is not None: + columns.append(self._reduced_predicate) + # Usually drop predicate as it is will be all TRUE after filtering + if not expose_hidden_cols: + columns_to_drop.append(self._reduced_predicate.get_name()) + + order_columns = self._create_order_columns( + ordering_mode, order_col_name, expose_hidden_cols + ) + columns.extend(order_columns) + + # Special case for empty tables, since we can't create an empty + # projection. + if not columns: + return ibis.memtable([]) + + # Make sure all dtypes are the "canonical" ones for BigFrames. This is + # important for operations like UNION where the schema must match. + table = self._table.select( + bigframes.dtypes.ibis_value_to_canonical_type(column) for column in columns + ) + base_table = table + if self._reduced_predicate is not None: + table = table.filter(base_table[PREDICATE_COLUMN]) + table = table.drop(*columns_to_drop) + if col_id_overrides: + table = table.relabel(col_id_overrides) + if fraction is not None: + table = table.filter(ibis.random() < ibis.literal(fraction)) + return table + + def _create_order_columns( + self, + ordering_mode: str, + order_col_name: Optional[str], + expose_hidden_cols: bool, + ) -> typing.Sequence[ibis_types.Value]: + # Generate offsets if current ordering id semantics are not sufficiently strict + if ordering_mode == "offset_col": + return (self._create_offset_column().name(order_col_name),) + elif ordering_mode == "string_encoded": + return (self._create_string_ordering_column().name(order_col_name),) + elif expose_hidden_cols: + return self._hidden_ordering_columns + return () + + def _create_offset_column(self) -> ibis_types.IntegerColumn: + if self._ordering.total_order_col and self._ordering.is_sequential: + offsets = self._get_any_column(self._ordering.total_order_col.column_id) + return typing.cast(ibis_types.IntegerColumn, offsets) + else: + window = ibis.window(order_by=self._ibis_order) + if self._predicates: + window = window.group_by(self._reduced_predicate) + offsets = ibis.row_number().over(window) + return typing.cast(ibis_types.IntegerColumn, offsets) + + def _create_string_ordering_column(self) -> ibis_types.StringColumn: + if self._ordering.total_order_col and self._ordering.is_string_encoded: + string_order_ids = self._get_any_column( + self._ordering.total_order_col.column_id + ) + return typing.cast(ibis_types.StringColumn, string_order_ids) + if ( + self._ordering.total_order_col + and self._ordering.integer_encoding.is_encoded + ): + # Special case: non-negative integer ordering id can be converted directly to string without regenerating row numbers + int_values = self._get_any_column(self._ordering.total_order_col.column_id) + return encode_order_string( + typing.cast(ibis_types.IntegerColumn, int_values), + ) + else: + # Have to build string from scratch + window = ibis.window(order_by=self._ibis_order) + if self._predicates: + window = window.group_by(self._reduced_predicate) + row_nums = typing.cast( + ibis_types.IntegerColumn, ibis.row_number().over(window) + ) + return encode_order_string(row_nums) + + def _reproject_to_table(self) -> CompiledArrayValue: + """ + Internal operators that projects the internal representation into a + new ibis table expression where each value column is a direct + reference to a column in that table expression. Needed after + some operations such as window operations that cannot be used + recursively in projections. + """ + table = self._to_ibis_expr( + "unordered", + expose_hidden_cols=True, + ) + columns = [table[column_name] for column_name in self._column_names] + ordering_col_ids = [ + ref.column_id for ref in self._ordering.all_ordering_columns + ] + hidden_ordering_columns = [ + table[column_name] + for column_name in self._hidden_ordering_column_names + if column_name in ordering_col_ids + ] + return CompiledArrayValue( + table, + columns=columns, + hidden_ordering_columns=hidden_ordering_columns, + ordering=self._ordering, + ) + + def _ibis_window_from_spec(self, window_spec: WindowSpec, allow_ties: bool = False): + group_by: typing.List[ibis_types.Value] = ( + [ + typing.cast( + ibis_types.Column, _as_identity(self._get_ibis_column(column)) + ) + for column in window_spec.grouping_keys + ] + if window_spec.grouping_keys + else [] + ) + if self._reduced_predicate is not None: + group_by.append(self._reduced_predicate) + if window_spec.ordering: + order_by = _convert_ordering_to_table_values( + {**self._column_names, **self._hidden_ordering_column_names}, + window_spec.ordering, + ) + if not allow_ties: + # Most operator need an unambiguous ordering, so the table's total ordering is appended + order_by = tuple([*order_by, *self._ibis_order]) + elif (window_spec.following is not None) or (window_spec.preceding is not None): + # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. + order_by = tuple(self._ibis_order) + else: + # Unbound grouping window. Suitable for aggregations but not for analytic function application. + order_by = None + return ibis.window( + preceding=window_spec.preceding, + following=window_spec.following, + order_by=order_by, + group_by=group_by, + ) + + def unpivot( + self, + row_labels: typing.Sequence[typing.Hashable], + unpivot_columns: typing.Sequence[ + typing.Tuple[str, typing.Sequence[typing.Optional[str]]] + ], + *, + passthrough_columns: typing.Sequence[str] = (), + index_col_ids: typing.Sequence[str] = ["index"], + dtype: typing.Union[ + bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] + ] = pandas.Float64Dtype(), + how="left", + ) -> CompiledArrayValue: + """ + Unpivot ArrayValue columns. + + Args: + row_labels: Identifies the source of the row. Must be equal to length to source column list in unpivot_columns argument. + unpivot_columns: Mapping of column id to list of input column ids. Lists of input columns may use None. + passthrough_columns: Columns that will not be unpivoted. Column id will be preserved. + index_col_id (str): The column id to be used for the row labels. + dtype (dtype or list of dtype): Dtype to use for the unpivot columns. If list, must be equal in number to unpivot_columns. + + Returns: + ArrayValue: The unpivoted ArrayValue + """ + if how not in ("left", "right"): + raise ValueError("'how' must be 'left' or 'right'") + table = self._to_ibis_expr("unordered", expose_hidden_cols=True) + row_n = len(row_labels) + hidden_col_ids = self._hidden_ordering_column_names.keys() + if not all( + len(source_columns) == row_n for _, source_columns in unpivot_columns + ): + raise ValueError("Columns and row labels must all be same length.") + + unpivot_offset_id = bigframes.core.guid.generate_guid("unpivot_offsets_") + unpivot_table = table.cross_join( + ibis.memtable({unpivot_offset_id: range(row_n)}) + ) + # Use ibis memtable to infer type of rowlabels (if possible) + # TODO: Allow caller to specify dtype + if isinstance(row_labels[0], tuple): + labels_table = ibis.memtable(row_labels) + labels_ibis_types = [ + labels_table[col].type() for col in labels_table.columns + ] + else: + labels_ibis_types = [ibis.memtable({"col": row_labels})["col"].type()] + labels_dtypes = [ + bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type) + for ibis_type in labels_ibis_types + ] + + label_columns = [] + for label_part, (col_id, label_dtype) in enumerate( + zip(index_col_ids, labels_dtypes) + ): + # interpret as tuples even if it wasn't originally so can apply same logic for multi-column labels + labels_as_tuples = [ + label if isinstance(label, tuple) else (label,) for label in row_labels + ] + cases = [ + ( + i, + bigframes.dtypes.literal_to_ibis_scalar( + label_tuple[label_part], # type:ignore + force_dtype=label_dtype, # type:ignore + ), + ) + for i, label_tuple in enumerate(labels_as_tuples) + ] + labels_value = ( + typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]) + .cases(cases, default=None) # type:ignore + .name(col_id) + ) + label_columns.append(labels_value) + + unpivot_values = [] + for j in range(len(unpivot_columns)): + col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype + result_col, source_cols = unpivot_columns[j] + null_value = bigframes.dtypes.literal_to_ibis_scalar( + None, force_dtype=col_dtype + ) + ibis_values = [ + ops.AsTypeOp(col_dtype)._as_ibis(unpivot_table[col]) + if col is not None + else null_value + for col in source_cols + ] + cases = [(i, ibis_values[i]) for i in range(len(ibis_values))] + unpivot_value = typing.cast( + ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id] + ).cases( + cases, default=null_value # type:ignore + ) + unpivot_values.append(unpivot_value.name(result_col)) + + unpivot_table = unpivot_table.select( + passthrough_columns, + *label_columns, + *unpivot_values, + *hidden_col_ids, + unpivot_offset_id, + ) + + # Extend the original ordering using unpivot_offset_id + old_ordering = self._ordering + if how == "left": + new_ordering = ExpressionOrdering( + ordering_value_columns=tuple( + [ + *old_ordering.ordering_value_columns, + OrderingColumnReference(unpivot_offset_id), + ] + ), + total_ordering_columns=frozenset( + [*old_ordering.total_ordering_columns, unpivot_offset_id] + ), + ) + else: # how=="right" + new_ordering = ExpressionOrdering( + ordering_value_columns=tuple( + [ + OrderingColumnReference(unpivot_offset_id), + *old_ordering.ordering_value_columns, + ] + ), + total_ordering_columns=frozenset( + [*old_ordering.total_ordering_columns, unpivot_offset_id] + ), + ) + value_columns = [ + unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns + ] + passthrough_values = [unpivot_table[col] for col in passthrough_columns] + hidden_ordering_columns = [ + unpivot_table[unpivot_offset_id], + *[unpivot_table[hidden_col] for hidden_col in hidden_col_ids], + ] + return CompiledArrayValue( + table=unpivot_table, + columns=[ + *[unpivot_table[col_id] for col_id in index_col_ids], + *value_columns, + *passthrough_values, + ], + hidden_ordering_columns=hidden_ordering_columns, + ordering=new_ordering, + ) + + def assign(self, source_id: str, destination_id: str) -> CompiledArrayValue: + return self._set_or_replace_by_id( + destination_id, self._get_ibis_column(source_id) + ) + + def assign_constant( + self, + destination_id: str, + value: typing.Any, + dtype: typing.Optional[bigframes.dtypes.Dtype], + ) -> CompiledArrayValue: + # TODO(b/281587571): Solve scalar constant aggregation problem w/Ibis. + ibis_value = bigframes.dtypes.literal_to_ibis_scalar(value, dtype) + if ibis_value is None: + raise NotImplementedError( + f"Type not supported as scalar value {type(value)}. {constants.FEEDBACK_LINK}" + ) + expr = self._set_or_replace_by_id(destination_id, ibis_value) + return expr._reproject_to_table() + + def _set_or_replace_by_id( + self, id: str, new_value: ibis_types.Value + ) -> CompiledArrayValue: + """Safely assign by id while maintaining ordering integrity.""" + # TODO: Split into explicit set and replace methods + ordering_col_ids = [ + col_ref.column_id for col_ref in self._ordering.ordering_value_columns + ] + if id in ordering_col_ids: + return self._hide_column(id)._set_or_replace_by_id(id, new_value) + + builder = self.builder() + if id in self.column_ids: + builder.columns = [ + val if (col_id != id) else new_value.name(id) + for col_id, val in zip(self.column_ids, self._columns) + ] + else: + builder.columns = [*self.columns, new_value.name(id)] + return builder.build() + + +class ArrayValueBuilder: + """Mutable expression class. + Use ArrayValue.builder() to create from a ArrayValue object. + """ + + def __init__( + self, + table: ibis_types.Table, + ordering: ExpressionOrdering, + columns: Collection[ibis_types.Value] = (), + hidden_ordering_columns: Collection[ibis_types.Value] = (), + predicates: Optional[Collection[ibis_types.BooleanValue]] = None, + ): + self.table = table + self.columns = list(columns) + self.hidden_ordering_columns = list(hidden_ordering_columns) + self.ordering = ordering + self.predicates = list(predicates) if predicates is not None else None + + def build(self) -> CompiledArrayValue: + return CompiledArrayValue( + table=self.table, + columns=self.columns, + hidden_ordering_columns=self.hidden_ordering_columns, + ordering=self.ordering, + predicates=self.predicates, + ) + + +def _reduce_predicate_list( + predicate_list: typing.Collection[ibis_types.BooleanValue], +) -> ibis_types.BooleanValue: + """Converts a list of predicates BooleanValues into a single BooleanValue.""" + if len(predicate_list) == 0: + raise ValueError("Cannot reduce empty list of predicates") + if len(predicate_list) == 1: + (item,) = predicate_list + return item + return functools.reduce(lambda acc, pred: acc.__and__(pred), predicate_list) + + +def _convert_ordering_to_table_values( + value_lookup: typing.Mapping[str, ibis_types.Value], + ordering_columns: typing.Sequence[OrderingColumnReference], +) -> typing.Sequence[ibis_types.Value]: + column_refs = ordering_columns + ordering_values = [] + for ordering_col in column_refs: + column = typing.cast(ibis_types.Column, value_lookup[ordering_col.column_id]) + ordering_value = ( + ibis.asc(column) + if ordering_col.direction.is_ascending + else ibis.desc(column) + ) + # Bigquery SQL considers NULLS to be "smallest" values, but we need to override in these cases. + if (not ordering_col.na_last) and (not ordering_col.direction.is_ascending): + # Force nulls to be first + is_null_val = typing.cast(ibis_types.Column, column.isnull()) + ordering_values.append(ibis.desc(is_null_val)) + elif (ordering_col.na_last) and (ordering_col.direction.is_ascending): + # Force nulls to be last + is_null_val = typing.cast(ibis_types.Column, column.isnull()) + ordering_values.append(ibis.asc(is_null_val)) + ordering_values.append(ordering_value) + return ordering_values + + +def _as_identity(value: ibis_types.Value): + # Some types need to be converted to string to enable groupby + if value.type().is_float64() or value.type().is_geospatial(): + return value.cast(ibis_dtypes.str) + return value diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py new file mode 100644 index 0000000000..195d830122 --- /dev/null +++ b/bigframes/core/compile/compiler.py @@ -0,0 +1,185 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import functools +import io +import typing + +import pandas as pd + +import bigframes.core.compile as compiled +import bigframes.core.compile.single_column +import bigframes.core.nodes as nodes + +if typing.TYPE_CHECKING: + import bigframes.core + import bigframes.session + + +@functools.cache +def compile_node(node: nodes.BigFrameNode) -> compiled.CompiledArrayValue: + """Compile node into CompileArrayValue. Caches result.""" + return _compile_node(node) + + +@functools.singledispatch +def _compile_node(node: nodes.BigFrameNode) -> compiled.CompiledArrayValue: + """Defines transformation but isn't cached, always use compile_node instead""" + raise ValueError(f"Can't compile unnrecognized node: {node}") + + +@_compile_node.register +def compile_join(node: nodes.JoinNode): + compiled_left = compile_node(node.left_child) + compiled_right = compile_node(node.right_child) + return bigframes.core.compile.single_column.join_by_column( + compiled_left, + node.left_column_ids, + compiled_right, + node.right_column_ids, + how=node.how, + allow_row_identity_join=node.allow_row_identity_join, + ) + + +@_compile_node.register +def compile_select(node: nodes.SelectNode): + return compile_node(node.child).select_columns(node.column_ids) + + +@_compile_node.register +def compile_drop(node: nodes.DropColumnsNode): + return compile_node(node.child).drop_columns(node.columns) + + +@_compile_node.register +def compile_readlocal(node: nodes.ReadLocalNode): + array_as_pd = pd.read_feather(io.BytesIO(node.feather_bytes)) + return compiled.CompiledArrayValue.mem_expr_from_pandas(array_as_pd) + + +@_compile_node.register +def compile_readgbq(node: nodes.ReadGbqNode): + return compiled.CompiledArrayValue( + node.table, + node.columns, + node.hidden_ordering_columns, + node.ordering, + ) + + +@_compile_node.register +def compile_promote_offsets(node: nodes.PromoteOffsetsNode): + return compile_node(node.child).promote_offsets(node.col_id) + + +@_compile_node.register +def compile_filter(node: nodes.FilterNode): + return compile_node(node.child).filter(node.predicate_id, node.keep_null) + + +@_compile_node.register +def compile_orderby(node: nodes.OrderByNode): + return compile_node(node.child).order_by(node.by, node.stable) + + +@_compile_node.register +def compile_reversed(node: nodes.ReversedNode): + return compile_node(node.child).reversed() + + +@_compile_node.register +def compile_project_unary(node: nodes.ProjectUnaryOpNode): + return compile_node(node.child).project_unary_op( + node.input_id, node.op, node.output_id + ) + + +@_compile_node.register +def compile_project_binary(node: nodes.ProjectBinaryOpNode): + return compile_node(node.child).project_binary_op( + node.left_input_id, node.right_input_id, node.op, node.output_id + ) + + +@_compile_node.register +def compile_project_ternary(node: nodes.ProjectTernaryOpNode): + return compile_node(node.child).project_ternary_op( + node.input_id1, node.input_id2, node.input_id3, node.op, node.output_id + ) + + +@_compile_node.register +def compile_concat(node: nodes.ConcatNode): + compiled_nodes = [compile_node(node) for node in node.children] + return compiled_nodes[0].concat(compiled_nodes[1:]) + + +@_compile_node.register +def compile_aggregate(node: nodes.AggregateNode): + return compile_node(node.child).aggregate( + node.aggregations, node.by_column_ids, node.dropna + ) + + +@_compile_node.register +def compile_corr(node: nodes.CorrNode): + return compile_node(node.child).corr_aggregate(node.corr_aggregations) + + +@_compile_node.register +def compile_window(node: nodes.WindowOpNode): + return compile_node(node.child).project_window_op( + node.column_name, + node.op, + node.window_spec, + node.output_name, + never_skip_nulls=node.never_skip_nulls, + skip_reproject_unsafe=node.skip_reproject_unsafe, + ) + + +@_compile_node.register +def compile_reproject(node: nodes.ReprojectOpNode): + return compile_node(node.child)._reproject_to_table() + + +@_compile_node.register +def compile_unpivot(node: nodes.UnpivotNode): + return compile_node(node.child).unpivot( + node.row_labels, + node.unpivot_columns, + passthrough_columns=node.passthrough_columns, + index_col_ids=node.index_col_ids, + dtype=node.dtype, + how=node.how, + ) + + +@_compile_node.register +def compile_assign(node: nodes.AssignNode): + return compile_node(node.child).assign(node.source_id, node.destination_id) + + +@_compile_node.register +def compile_assign_constant(node: nodes.AssignConstantNode): + return compile_node(node.child).assign_constant( + node.destination_id, node.value, node.dtype + ) + + +@_compile_node.register +def compiler_random_sample(node: nodes.RandomSampleNode): + return compile_node(node.child)._uniform_sampling(node.fraction) diff --git a/bigframes/core/joins/row_identity.py b/bigframes/core/compile/row_identity.py similarity index 94% rename from bigframes/core/joins/row_identity.py rename to bigframes/core/compile/row_identity.py index 76e456ec94..2e9bc0527c 100644 --- a/bigframes/core/joins/row_identity.py +++ b/bigframes/core/compile/row_identity.py @@ -23,15 +23,16 @@ import ibis.expr.types as ibis_types import bigframes.constants as constants -import bigframes.core as core +import bigframes.core.compile as compiled import bigframes.core.joins.name_resolution as naming +import bigframes.core.ordering as orderings SUPPORTED_ROW_IDENTITY_HOW = {"outer", "left", "inner"} def join_by_row_identity( - left: core.ArrayValue, right: core.ArrayValue, *, how: str -) -> core.ArrayValue: + left: compiled.CompiledArrayValue, right: compiled.CompiledArrayValue, *, how: str +) -> compiled.CompiledArrayValue: """Compute join when we are joining by row identity not a specific column.""" if how not in SUPPORTED_ROW_IDENTITY_HOW: raise NotImplementedError( @@ -101,8 +102,8 @@ def join_by_row_identity( ) # Assume that left ordering is sufficient since 1:1 join over same base table join_total_order_cols = left_total_order_cols - new_ordering = core.ExpressionOrdering( - ordering_columns, total_ordering_columns=join_total_order_cols + new_ordering = orderings.ExpressionOrdering( + tuple(ordering_columns), total_ordering_columns=join_total_order_cols ) hidden_ordering_columns = [ @@ -117,8 +118,7 @@ def join_by_row_identity( if key.column_id in right._hidden_ordering_column_names.keys() ] - joined_expr = core.ArrayValue( - left._session, + joined_expr = compiled.CompiledArrayValue( left._table, columns=joined_columns, hidden_ordering_columns=hidden_ordering_columns, diff --git a/bigframes/core/joins/single_column.py b/bigframes/core/compile/single_column.py similarity index 87% rename from bigframes/core/joins/single_column.py rename to bigframes/core/compile/single_column.py index 0c0e2008b5..b992aa1d1d 100644 --- a/bigframes/core/joins/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -23,16 +23,16 @@ import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types -import bigframes.core as core -import bigframes.core.joins.name_resolution as naming -import bigframes.core.joins.row_identity -import bigframes.core.ordering +import bigframes.core.compile as compiled +import bigframes.core.compile.row_identity +import bigframes.core.joins as joining +import bigframes.core.ordering as orderings def join_by_column( - left: core.ArrayValue, + left: compiled.CompiledArrayValue, left_column_ids: typing.Sequence[str], - right: core.ArrayValue, + right: compiled.CompiledArrayValue, right_column_ids: typing.Sequence[str], *, how: Literal[ @@ -42,7 +42,7 @@ def join_by_column( "right", ], allow_row_identity_join: bool = True, -) -> core.ArrayValue: +) -> compiled.CompiledArrayValue: """Join two expressions by column equality. Arguments: @@ -61,7 +61,7 @@ def join_by_column( """ if ( allow_row_identity_join - and how in bigframes.core.joins.row_identity.SUPPORTED_ROW_IDENTITY_HOW + and how in bigframes.core.compile.row_identity.SUPPORTED_ROW_IDENTITY_HOW and left._table.equals(right._table) # Make sure we're joining on exactly the same column(s), at least with # regards to value its possible that they both have the same names but @@ -73,15 +73,15 @@ def join_by_column( for lcol, rcol in zip(left_column_ids, right_column_ids) ) ): - return bigframes.core.joins.row_identity.join_by_row_identity( + return bigframes.core.compile.row_identity.join_by_row_identity( left, right, how=how ) else: # Value column mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result - l_public_mapping, r_public_mapping = naming.JOIN_NAME_REMAPPER( + l_public_mapping, r_public_mapping = joining.JOIN_NAME_REMAPPER( left.column_ids, right.column_ids ) - l_hidden_mapping, r_hidden_mapping = naming.JoinNameRemapper( + l_hidden_mapping, r_hidden_mapping = joining.JoinNameRemapper( namespace="hidden" )(left._hidden_column_ids, right._hidden_column_ids) l_mapping = {**l_public_mapping, **l_hidden_mapping} @@ -134,8 +134,7 @@ def join_by_column( for col in right._hidden_ordering_columns ], ] - return core.ArrayValue( - left._session, + return compiled.CompiledArrayValue( combined_table, columns=columns, hidden_ordering_columns=hidden_ordering_columns, @@ -151,12 +150,12 @@ def value_to_join_key(value: ibis_types.Value): def join_orderings( - left: core.ExpressionOrdering, - right: core.ExpressionOrdering, + left: orderings.ExpressionOrdering, + right: orderings.ExpressionOrdering, left_id_mapping: Mapping[str, str], right_id_mapping: Mapping[str, str], left_order_dominates: bool = True, -) -> core.ExpressionOrdering: +) -> orderings.ExpressionOrdering: left_ordering_refs = [ ref.with_name(left_id_mapping[ref.column_id]) for ref in left.all_ordering_columns @@ -176,7 +175,7 @@ def join_orderings( right_total_order_cols = frozenset( [right_id_mapping[id] for id in right.total_ordering_columns] ) - return core.ExpressionOrdering( - ordering_value_columns=joined_refs, + return orderings.ExpressionOrdering( + ordering_value_columns=tuple(joined_refs), total_ordering_columns=left_total_order_cols | right_total_order_cols, ) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index db0843fcbc..2a19a83dd5 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -193,7 +193,7 @@ def cumprod(self, *args, **kwargs) -> df.DataFrame: def shift(self, periods=1) -> series.Series: window = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) @@ -201,7 +201,7 @@ def shift(self, periods=1) -> series.Series: def diff(self, periods=1) -> series.Series: window = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) @@ -210,7 +210,7 @@ def diff(self, periods=1) -> series.Series: def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), preceding=window - 1, following=0, min_periods=min_periods or window, @@ -225,7 +225,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), following=0, min_periods=min_periods, ) @@ -389,7 +389,7 @@ def _apply_window_op( ): """Apply window op to groupby. Defaults to grouped cumulative window.""" window_spec = window or core.WindowSpec( - grouping_keys=self._by_col_ids, following=0 + grouping_keys=tuple(self._by_col_ids), following=0 ) columns = self._aggregated_columns(numeric_only=numeric_only) block, result_ids = self._block.multi_apply_window_op( @@ -528,7 +528,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) @@ -536,7 +536,7 @@ def shift(self, periods=1) -> series.Series: def diff(self, periods=1) -> series.Series: window = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) @@ -545,7 +545,7 @@ def diff(self, periods=1) -> series.Series: def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), preceding=window - 1, following=0, min_periods=min_periods or window, @@ -564,7 +564,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = core.WindowSpec( - grouping_keys=self._by_col_ids, + grouping_keys=tuple(self._by_col_ids), following=0, min_periods=min_periods, ) @@ -597,7 +597,7 @@ def _apply_window_op( ): """Apply window op to groupby. Defaults to grouped cumulative window.""" window_spec = window or core.WindowSpec( - grouping_keys=self._by_col_ids, following=0 + grouping_keys=tuple(self._by_col_ids), following=0 ) label = self._value_name if not discard_name else None diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index d18a0a38ef..f6ce084714 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -311,7 +311,7 @@ def _loc_getitem_series_or_dataframe( values = [entry[i] for entry in key] index_cols_dict[index_name] = values keys_df = bigframes.dataframe.DataFrame( - index_cols_dict, session=series_or_dataframe._get_block().expr._session + index_cols_dict, session=series_or_dataframe._get_block().expr.session ) keys_df = keys_df.set_index(temporary_index_names, drop=True) keys_df = keys_df.rename_axis(original_index_names) @@ -324,7 +324,7 @@ def _loc_getitem_series_or_dataframe( index_name = "unnamed_col" keys_df = bigframes.dataframe.DataFrame( {index_name: key}, - session=series_or_dataframe._get_block().expr._session, + session=series_or_dataframe._get_block().expr.session, ) keys_df = keys_df.set_index(index_name, drop=True) if index_name_is_none: @@ -343,7 +343,7 @@ def _loc_getitem_series_or_dataframe( elif pd.api.types.is_scalar(key): index_name = "unnamed_col" keys_df = bigframes.dataframe.DataFrame( - {index_name: [key]}, session=series_or_dataframe._get_block().expr._session + {index_name: [key]}, session=series_or_dataframe._get_block().expr.session ) keys_df = keys_df.set_index(index_name, drop=True) keys_df.index.name = None diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index b9ffdff21e..6c66c36062 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -26,8 +26,7 @@ import bigframes.core as core import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks -import bigframes.core.joins as joins -import bigframes.core.joins.name_resolution as join_names +import bigframes.core.joins as joining import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.dtypes @@ -402,7 +401,7 @@ def to_pandas(self) -> pandas.Index: dtypes = dict(zip(index_columns, self.dtypes)) expr = self._expr.select_columns(index_columns) results, _ = expr.start_query() - df = expr._session._rows_to_dataframe(results, dtypes) + df = expr.session._rows_to_dataframe(results, dtypes) df = df.set_index(index_columns) index = df.index index.names = list(self._block._index_labels) @@ -461,11 +460,10 @@ def join_mono_indexed( ) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: left_expr = left._block.expr right_expr = right._block.expr - get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + get_column_left, get_column_right = joining.JOIN_NAME_REMAPPER( left_expr.column_ids, right_expr.column_ids ) - combined_expr = joins.join_by_column( - left._block.expr, + combined_expr = left._block.expr.join( left._block.index_columns, right._block.expr, right._block.index_columns, @@ -520,12 +518,11 @@ def join_multi_indexed( left_expr = left._block.expr right_expr = right._block.expr - get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + get_column_left, get_column_right = joining.JOIN_NAME_REMAPPER( left_expr.column_ids, right_expr.column_ids ) - combined_expr = joins.join_by_column( - left_expr, + combined_expr = left_expr.join( left_join_ids, right_expr, right_join_ids, diff --git a/bigframes/core/joins/__init__.py b/bigframes/core/joins/__init__.py index 3f9447aef0..5d407ec22b 100644 --- a/bigframes/core/joins/__init__.py +++ b/bigframes/core/joins/__init__.py @@ -15,11 +15,6 @@ """Helpers to join ArrayValue objects.""" from bigframes.core.joins.merge import merge -from bigframes.core.joins.row_identity import join_by_row_identity -from bigframes.core.joins.single_column import join_by_column +from bigframes.core.joins.name_resolution import JOIN_NAME_REMAPPER, JoinNameRemapper -__all__ = ( - "join_by_row_identity", - "join_by_column", - "merge", -) +__all__ = ("merge", "JoinNameRemapper", "JOIN_NAME_REMAPPER") diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py new file mode 100644 index 0000000000..7b252b164f --- /dev/null +++ b/bigframes/core/nodes.py @@ -0,0 +1,245 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from dataclasses import dataclass, field +import functools +import typing +from typing import Optional, Tuple + +import pandas + +import bigframes.core.guid +from bigframes.core.ordering import OrderingColumnReference +import bigframes.core.window_spec as window +import bigframes.dtypes +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops + +if typing.TYPE_CHECKING: + import ibis.expr.types as ibis_types + + import bigframes.core.ordering as orderings + import bigframes.session + + +@dataclass(frozen=True) +class BigFrameNode: + """ + Immutable node for representing 2D typed array as a tree of operators. + + All subclasses must be hashable so as to be usable as caching key. + """ + + @property + def deterministic(self) -> bool: + """Whether this node will evaluates deterministically.""" + return True + + @property + def child_nodes(self) -> typing.Sequence[BigFrameNode]: + """Direct children of this node""" + return tuple([]) + + @functools.cached_property + def session(self): + sessions = [] + for child in self.child_nodes: + if child.session is not None: + sessions.append(child.session) + unique_sessions = len(set(sessions)) + if unique_sessions > 1: + raise ValueError("Cannot use combine sources from multiple sessions.") + elif unique_sessions == 1: + return sessions[0] + return None + + +@dataclass(frozen=True) +class UnaryNode(BigFrameNode): + child: BigFrameNode + + @property + def child_nodes(self) -> typing.Sequence[BigFrameNode]: + return (self.child,) + + +@dataclass(frozen=True) +class JoinNode(BigFrameNode): + left_child: BigFrameNode + right_child: BigFrameNode + left_column_ids: typing.Tuple[str, ...] + right_column_ids: typing.Tuple[str, ...] + how: typing.Literal[ + "inner", + "left", + "outer", + "right", + ] + allow_row_identity_join: bool = True + + @property + def child_nodes(self) -> typing.Sequence[BigFrameNode]: + return (self.left_child, self.right_child) + + +@dataclass(frozen=True) +class ConcatNode(BigFrameNode): + children: Tuple[BigFrameNode, ...] + + @property + def child_nodes(self) -> typing.Sequence[BigFrameNode]: + return self.children + + +# Input Nodex +@dataclass(frozen=True) +class ReadLocalNode(BigFrameNode): + feather_bytes: bytes + column_ids: typing.Tuple[str, ...] + + +# TODO: Refactor to take raw gbq object reference +@dataclass(frozen=True) +class ReadGbqNode(BigFrameNode): + table: ibis_types.Table = field() + table_session: bigframes.session.Session = field() + columns: Tuple[ibis_types.Value, ...] = field() + hidden_ordering_columns: Tuple[ibis_types.Value, ...] = field() + ordering: orderings.ExpressionOrdering = field() + + @property + def session(self): + return (self.table_session,) + + +# Unary nodes +@dataclass(frozen=True) +class DropColumnsNode(UnaryNode): + columns: Tuple[str, ...] + + +@dataclass(frozen=True) +class PromoteOffsetsNode(UnaryNode): + col_id: str + + +@dataclass(frozen=True) +class FilterNode(UnaryNode): + predicate_id: str + keep_null: bool = False + + +@dataclass(frozen=True) +class OrderByNode(UnaryNode): + by: Tuple[OrderingColumnReference, ...] + stable: bool = False + + +@dataclass(frozen=True) +class ReversedNode(UnaryNode): + pass + + +@dataclass(frozen=True) +class SelectNode(UnaryNode): + column_ids: typing.Tuple[str, ...] + + +@dataclass(frozen=True) +class ProjectUnaryOpNode(UnaryNode): + input_id: str + op: ops.UnaryOp + output_id: Optional[str] = None + + +@dataclass(frozen=True) +class ProjectBinaryOpNode(UnaryNode): + left_input_id: str + right_input_id: str + op: ops.BinaryOp + output_id: str + + +@dataclass(frozen=True) +class ProjectTernaryOpNode(UnaryNode): + input_id1: str + input_id2: str + input_id3: str + op: ops.TernaryOp + output_id: str + + +@dataclass(frozen=True) +class AggregateNode(UnaryNode): + aggregations: typing.Tuple[typing.Tuple[str, agg_ops.AggregateOp, str], ...] + by_column_ids: typing.Tuple[str, ...] = tuple([]) + dropna: bool = True + + +# TODO: Unify into aggregate +@dataclass(frozen=True) +class CorrNode(UnaryNode): + corr_aggregations: typing.Tuple[typing.Tuple[str, str, str], ...] + + +@dataclass(frozen=True) +class WindowOpNode(UnaryNode): + column_name: str + op: agg_ops.WindowOp + window_spec: window.WindowSpec + output_name: typing.Optional[str] = None + never_skip_nulls: bool = False + skip_reproject_unsafe: bool = False + + +@dataclass(frozen=True) +class ReprojectOpNode(UnaryNode): + pass + + +@dataclass(frozen=True) +class UnpivotNode(UnaryNode): + row_labels: typing.Tuple[typing.Hashable, ...] + unpivot_columns: typing.Tuple[ + typing.Tuple[str, typing.Tuple[typing.Optional[str], ...]], ... + ] + passthrough_columns: typing.Tuple[str, ...] = () + index_col_ids: typing.Tuple[str, ...] = ("index",) + dtype: typing.Union[ + bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] + ] = (pandas.Float64Dtype(),) + how: typing.Literal["left", "right"] = "left" + + +@dataclass(frozen=True) +class AssignNode(UnaryNode): + source_id: str + destination_id: str + + +@dataclass(frozen=True) +class AssignConstantNode(UnaryNode): + destination_id: str + value: typing.Hashable + dtype: typing.Optional[bigframes.dtypes.Dtype] + + +@dataclass(frozen=True) +class RandomSampleNode(UnaryNode): + fraction: float + + @property + def deterministic(self) -> bool: + return False diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index d5f07ecf91..2cecd2fe7b 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -86,7 +86,7 @@ class IntegerEncoding: class ExpressionOrdering: """Immutable object that holds information about the ordering of rows in a ArrayValue object.""" - ordering_value_columns: Sequence[OrderingColumnReference] = () + ordering_value_columns: typing.Tuple[OrderingColumnReference, ...] = () integer_encoding: IntegerEncoding = IntegerEncoding(False) string_encoding: StringEncoding = StringEncoding(False) # A table has a total ordering defined by the identities of a set of 1 or more columns. @@ -170,7 +170,7 @@ def with_column_remap(self, mapping: typing.Mapping[str, str]): mapping.get(col_id, col_id) for col_id in self.total_ordering_columns ) return ExpressionOrdering( - new_value_columns, + tuple(new_value_columns), integer_encoding=self.integer_encoding, string_encoding=self.string_encoding, total_ordering_columns=new_total_order, diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py new file mode 100644 index 0000000000..3458bfb1b8 --- /dev/null +++ b/bigframes/core/window_spec.py @@ -0,0 +1,35 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +import typing + +import bigframes.core.ordering as orderings + + +@dataclass(frozen=True) +class WindowSpec: + """ + Specifies a window over which aggregate and analytic function may be applied. + grouping_keys: set of column ids to group on + preceding: Number of preceding rows in the window + following: Number of preceding rows in the window + ordering: List of columns ids and ordering direction to override base ordering + """ + + grouping_keys: typing.Tuple[str, ...] = tuple() + ordering: typing.Tuple[orderings.OrderingColumnReference, ...] = tuple() + preceding: typing.Optional[int] = None + following: typing.Optional[int] = None + min_periods: int = 0 diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3fd8319876..9d22c02d87 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -170,9 +170,7 @@ def __init__( if isinstance(dt, pandas.ArrowDtype) ) ): - self._block = blocks.block_from_local( - pd_dataframe, session or bigframes.pandas.get_global_session() - ) + self._block = blocks.block_from_local(pd_dataframe) elif session: self._block = session.read_pandas(pd_dataframe)._get_block() else: @@ -299,7 +297,7 @@ def values(self) -> numpy.ndarray: @property def _session(self) -> bigframes.Session: - return self._get_block().expr._session + return self._get_block().expr.session def __len__(self): rows, _ = self.shape @@ -1107,7 +1105,7 @@ def _assign_single_item( ) local_df = bigframes.dataframe.DataFrame( - {k: v}, session=self._get_block().expr._session + {k: v}, session=self._get_block().expr.session ) # local_df is likely (but not guarunteed) to be cached locally # since the original list came from memory and so is probably < MAX_INLINE_DF_SIZE @@ -2203,7 +2201,7 @@ def to_csv( field_delimiter=sep, header=header, ) - _, query_job = self._block.expr._session._start_query(export_data_statement) + _, query_job = self._block.expr.session._start_query(export_data_statement) self._set_internal_query_job(query_job) def to_json( @@ -2245,7 +2243,7 @@ def to_json( format="JSON", export_options={}, ) - _, query_job = self._block.expr._session._start_query(export_data_statement) + _, query_job = self._block.expr.session._start_query(export_data_statement) self._set_internal_query_job(query_job) def to_gbq( @@ -2274,7 +2272,7 @@ def to_gbq( write_disposition=dispositions[if_exists], destination=bigquery.table.TableReference.from_string( destination_table, - default_project=self._block.expr._session.bqclient.project, + default_project=self._block.expr.session.bqclient.project, ), ) @@ -2321,7 +2319,7 @@ def to_parquet( format="PARQUET", export_options=export_options, ) - _, query_job = self._block.expr._session._start_query(export_data_statement) + _, query_job = self._block.expr.session._start_query(export_data_statement) self._set_internal_query_job(query_job) def to_dict( @@ -2464,7 +2462,7 @@ def _run_io_query( """Executes a query job presenting this dataframe and returns the destination table.""" expr = self._block.expr - session = expr._session + session = expr.session sql = self._create_io_query(index=index, ordering_id=ordering_id) _, query_job = session._start_query( sql=sql, job_config=job_config # type: ignore diff --git a/bigframes/ml/metrics.py b/bigframes/ml/metrics.py index 3bcb621f74..5731b946ca 100644 --- a/bigframes/ml/metrics.py +++ b/bigframes/ml/metrics.py @@ -96,7 +96,7 @@ def roc_curve( y_true_series, y_score_series = utils.convert_to_series(y_true, y_score) - session = y_true_series._block.expr._session + session = y_true_series._block.expr.session # We operate on rows, so, remove the index if there is one # TODO(bmil): check that the indexes are equivalent before removing diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index b9abb2cc03..d33befe4da 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -94,9 +94,7 @@ def __init__( if isinstance(dt, pd.ArrowDtype) ) ): - self._block = blocks.block_from_local( - pd_dataframe, session or bigframes.pandas.get_global_session() - ) + self._block = blocks.block_from_local(pd_dataframe) elif session: self._block = session.read_pandas(pd_dataframe)._get_block() else: diff --git a/bigframes/series.py b/bigframes/series.py index c191452783..37d00d16f3 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -29,7 +29,6 @@ import bigframes.constants as constants import bigframes.core -from bigframes.core import WindowSpec import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.groupby as groupby @@ -43,6 +42,7 @@ import bigframes.core.scalar as scalars import bigframes.core.utils as utils import bigframes.core.window +import bigframes.core.window_spec import bigframes.dataframe import bigframes.dtypes import bigframes.formatting_helpers as formatter @@ -367,43 +367,43 @@ def between(self, left, right, inclusive="both"): def cumsum(self) -> Series: return self._apply_window_op( - agg_ops.sum_op, bigframes.core.WindowSpec(following=0) + agg_ops.sum_op, bigframes.core.window_spec.WindowSpec(following=0) ) def ffill(self, *, limit: typing.Optional[int] = None) -> Series: - window = bigframes.core.WindowSpec(preceding=limit, following=0) + window = bigframes.core.window_spec.WindowSpec(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) pad = ffill def bfill(self, *, limit: typing.Optional[int] = None) -> Series: - window = bigframes.core.WindowSpec(preceding=0, following=limit) + window = bigframes.core.window_spec.WindowSpec(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) def cummax(self) -> Series: return self._apply_window_op( - agg_ops.max_op, bigframes.core.WindowSpec(following=0) + agg_ops.max_op, bigframes.core.window_spec.WindowSpec(following=0) ) def cummin(self) -> Series: return self._apply_window_op( - agg_ops.min_op, bigframes.core.WindowSpec(following=0) + agg_ops.min_op, bigframes.core.window_spec.WindowSpec(following=0) ) def cumprod(self) -> Series: return self._apply_window_op( - agg_ops.product_op, bigframes.core.WindowSpec(following=0) + agg_ops.product_op, bigframes.core.window_spec.WindowSpec(following=0) ) def shift(self, periods: int = 1) -> Series: - window = bigframes.core.WindowSpec( + window = bigframes.core.window_spec.WindowSpec( preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) def diff(self, periods: int = 1) -> Series: - window = bigframes.core.WindowSpec( + window = bigframes.core.window_spec.WindowSpec( preceding=periods if periods > 0 else None, following=-periods if periods < 0 else None, ) @@ -805,7 +805,7 @@ def mode(self) -> Series: block, max_value_count_col_id = block.apply_window_op( value_count_col_id, agg_ops.max_op, - window_spec=WindowSpec(), + window_spec=bigframes.core.window_spec.WindowSpec(), ) block, is_mode_col_id = block.apply_binary_op( value_count_col_id, @@ -1009,9 +1009,7 @@ def _apply_aggregation(self, op: agg_ops.AggregateOp) -> Any: return self._block.get_stat(self._value_column, op) def _apply_window_op( - self, - op: agg_ops.WindowOp, - window_spec: bigframes.core.WindowSpec, + self, op: agg_ops.WindowOp, window_spec: bigframes.core.window_spec.WindowSpec ): block = self._block block, result_id = block.apply_window_op( @@ -1070,7 +1068,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. - window_spec = WindowSpec( + window_spec = bigframes.core.window_spec.WindowSpec( preceding=window - 1, following=0, min_periods=min_periods or window ) return bigframes.core.window.Window( @@ -1078,7 +1076,9 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window ) def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: - window_spec = WindowSpec(following=0, min_periods=min_periods) + window_spec = bigframes.core.window_spec.WindowSpec( + following=0, min_periods=min_periods + ) return bigframes.core.window.Window( self._block, window_spec, self._block.value_columns, is_series=True ) @@ -1251,7 +1251,7 @@ def reindex(self, index=None, *, validate: typing.Optional[bool] = None): "Cannot reindex with index with different nlevels" ) new_indexer = bigframes.dataframe.DataFrame( - index=index, session=self._get_block().expr._session + index=index, session=self._get_block().expr.session )[[]] # multiindex join is senstive to index names, so we will set all these result = new_indexer.rename_axis(range(new_indexer.index.nlevels)).join( @@ -1415,7 +1415,7 @@ def map( elif isinstance(arg, Mapping): map_df = bigframes.dataframe.DataFrame( {"keys": list(arg.keys()), self.name: list(arg.values())}, - session=self._get_block().expr._session, + session=self._get_block().expr.session, ) map_df = map_df.set_index("keys") elif callable(arg): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index af1f70d54d..473de62f53 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -68,6 +68,7 @@ import bigframes.core.blocks as blocks import bigframes.core.guid as guid from bigframes.core.ordering import IntegerEncoding, OrderingColumnReference +import bigframes.core.ordering as orderings import bigframes.core.utils as utils import bigframes.dataframe as dataframe import bigframes.formatting_helpers as formatting_helpers @@ -206,6 +207,10 @@ def _session_dataset_id(self): def _project(self): return self.bqclient.project + def __hash__(self): + # Stable hash needed to use in expression tree + return hash(self._session_id) + def _create_and_bind_bq_session(self): """Create a BQ session and bind the session id with clients to capture BQ activities: go/bigframes-transient-data""" @@ -592,11 +597,13 @@ def _read_gbq_table( # primary key(s) are set on a table. The query engine assumes such # columns are unique, even if not enforced. is_total_ordering = True - ordering = core.ExpressionOrdering( - ordering_value_columns=[ - core.OrderingColumnReference(column_id) - for column_id in total_ordering_cols - ], + ordering = orderings.ExpressionOrdering( + ordering_value_columns=tuple( + [ + core.OrderingColumnReference(column_id) + for column_id in total_ordering_cols + ] + ), total_ordering_columns=frozenset(total_ordering_cols), ) @@ -634,10 +641,13 @@ def _read_gbq_table( distinct_count = row["distinct_count"] is_total_ordering = total_count == distinct_count - ordering = core.ExpressionOrdering( - ordering_value_columns=[ - core.OrderingColumnReference(column_id) for column_id in index_cols - ], + ordering = orderings.ExpressionOrdering( + ordering_value_columns=tuple( + [ + core.OrderingColumnReference(column_id) + for column_id in index_cols + ] + ), total_ordering_columns=frozenset(index_cols), ) @@ -713,7 +723,7 @@ def _read_gbq_with_ordering( index_cols: Iterable[str] = (), index_labels: Iterable[Optional[str]] = (), hidden_cols: Iterable[str] = (), - ordering: core.ExpressionOrdering, + ordering: orderings.ExpressionOrdering, is_total_ordering: bool = False, api_name: str, ) -> dataframe.DataFrame: @@ -826,7 +836,7 @@ def _read_ibis( index_labels: Iterable[blocks.Label], column_keys: Iterable[str], column_labels: Iterable[blocks.Label], - ordering: core.ExpressionOrdering, + ordering: orderings.ExpressionOrdering, ) -> dataframe.DataFrame: """Turns a table expression (plus index column) into a DataFrame.""" @@ -843,7 +853,7 @@ def _read_ibis( hidden_ordering_columns.append(table_expression[ref.column_id]) block = blocks.Block( - core.ArrayValue( + core.ArrayValue.from_ibis( self, table_expression, columns, hidden_ordering_columns, ordering ), index_columns=[index_col.get_name() for index_col in index_cols], @@ -959,8 +969,8 @@ def _read_pandas( ) self._start_generic_job(load_job) - ordering = core.ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ordering_col)], + ordering = orderings.ExpressionOrdering( + ordering_value_columns=tuple([OrderingColumnReference(ordering_col)]), total_ordering_columns=frozenset([ordering_col]), integer_encoding=IntegerEncoding(True, is_sequential=True), ) @@ -1303,7 +1313,7 @@ def _create_sequential_ordering( table: ibis_types.Table, index_cols: Iterable[str] = (), api_name: str = "", - ) -> Tuple[ibis_types.Table, core.ExpressionOrdering]: + ) -> Tuple[ibis_types.Table, orderings.ExpressionOrdering]: # Since this might also be used as the index, don't use the default # "ordering ID" name. default_ordering_name = guid.generate_guid("bigframes_ordering_") @@ -1320,8 +1330,8 @@ def _create_sequential_ordering( f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" ) ordering_reference = core.OrderingColumnReference(default_ordering_name) - ordering = core.ExpressionOrdering( - ordering_value_columns=[ordering_reference], + ordering = orderings.ExpressionOrdering( + ordering_value_columns=tuple([ordering_reference]), total_ordering_columns=frozenset([default_ordering_name]), integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), ) diff --git a/tests/system/small/test_progress_bar.py b/tests/system/small/test_progress_bar.py index f7fc4eaa8f..084b723fba 100644 --- a/tests/system/small/test_progress_bar.py +++ b/tests/system/small/test_progress_bar.py @@ -98,7 +98,7 @@ def assert_loading_msg_exist(capystOut: str, pattern=job_load_message_regex): def test_query_job_repr_html(penguins_df_default_index: bf.dataframe.DataFrame): bf.options.display.progress_bar = "terminal" - penguins_df_default_index._block._expr._session.bqclient.default_query_job_config.use_query_cache = ( + penguins_df_default_index._block._expr.session.bqclient.default_query_job_config.use_query_cache = ( False ) penguins_df_default_index.to_pandas() @@ -117,7 +117,7 @@ def test_query_job_repr_html(penguins_df_default_index: bf.dataframe.DataFrame): def test_query_job_repr(penguins_df_default_index: bf.dataframe.DataFrame): - penguins_df_default_index._block._expr._session.bqclient.default_query_job_config.use_query_cache = ( + penguins_df_default_index._block._expr.session.bqclient.default_query_job_config.use_query_cache = ( False ) penguins_df_default_index.to_pandas() diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index c9510290b6..05d8b84185 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2858,7 +2858,7 @@ def test_map_series_input(scalars_dfs): pd_map_series = scalars_pandas_df.string_col.iloc[0 : len(new_index)] pd_map_series.index = new_index bf_map_series = series.Series( - pd_map_series, session=scalars_df._get_block().expr._session + pd_map_series, session=scalars_df._get_block().expr.session ) pd_result = scalars_pandas_df.int64_too.map(pd_map_series) @@ -2877,7 +2877,7 @@ def test_map_series_input_duplicates_error(scalars_dfs): pd_map_series = scalars_pandas_df.string_col.iloc[0 : len(new_index)] pd_map_series.index = new_index bf_map_series = series.Series( - pd_map_series, session=scalars_df._get_block().expr._session + pd_map_series, session=scalars_df._get_block().expr.session ) with pytest.raises(pd.errors.InvalidIndexError): diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 127a88a760..bf72e444eb 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -318,7 +318,6 @@ def test_read_pandas(session, scalars_dfs): _, scalars_pandas_df = scalars_dfs df = session.read_pandas(scalars_pandas_df) - assert df._block._expr._ordering is not None result = df.to_pandas() expected = scalars_pandas_df @@ -350,9 +349,8 @@ def test_read_pandas_rowid_exists_adds_suffix(session, scalars_pandas_df_default pandas_df = scalars_pandas_df_default_index.copy() pandas_df["rowid"] = np.arange(pandas_df.shape[0]) - df = session.read_pandas(pandas_df) - total_order_col = df._block._expr._ordering.total_order_col - assert total_order_col and total_order_col.column_id == "rowid_2" + df_roundtrip = session.read_pandas(pandas_df).to_pandas() + pd.testing.assert_frame_equal(df_roundtrip, pandas_df, check_dtype=False) def test_read_pandas_tokyo( @@ -385,7 +383,6 @@ def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder): # Convert default pandas dtypes to match BigQuery DataFrames dtypes. dtype=dtype, ) - assert df._block._expr._ordering is not None # TODO(chelsealin): If we serialize the index, can more easily compare values. pd.testing.assert_index_equal(df.columns, scalars_df.columns) @@ -441,7 +438,6 @@ def test_read_csv_local_default_engine(session, scalars_dfs, sep): # Convert default pandas dtypes to match BigQuery DataFrames dtypes. dtype=dtype, ) - assert df._block._expr._ordering is not None # TODO(chelsealin): If we serialize the index, can more easily compare values. pd.testing.assert_index_equal(df.columns, scalars_df.columns) @@ -976,7 +972,6 @@ def test_read_json_gcs_default_engine(session, scalars_dfs, gcs_folder): orient="records", ) - assert df._block._expr._ordering is not None pd.testing.assert_index_equal(df.columns, scalars_df.columns) # The auto detects of BigQuery load job have restrictions to detect the bytes, diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index a7e9b5a84b..86715d090c 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -18,8 +18,6 @@ import bigframes.core.blocks as blocks -from .. import resources - @pytest.mark.parametrize( ("data",), @@ -76,9 +74,8 @@ ) def test_block_from_local(data): expected = pandas.DataFrame(data) - session = resources.create_pandas_session({}) - block = blocks.block_from_local(data, session=session) + block = blocks.block_from_local(data) pandas.testing.assert_index_equal(block.column_labels, expected.columns) assert tuple(block.index_labels) == tuple(expected.index.names) diff --git a/tests/unit/resources.py b/tests/unit/resources.py index 0a68600a35..f660d774f0 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -22,6 +22,7 @@ import bigframes import bigframes.core as core +import bigframes.core.ordering import bigframes.session.clients """Utilities for creating test resources.""" @@ -61,14 +62,20 @@ def create_pandas_session(tables: Dict[str, pandas.DataFrame]) -> bigframes.Sess def create_arrayvalue( df: pandas.DataFrame, total_ordering_columns: List[str] -) -> bigframes.core.ArrayValue: +) -> core.ArrayValue: session = create_pandas_session({"test_table": df}) ibis_table = session.ibis_client.table("test_table") columns = tuple(ibis_table[key] for key in ibis_table.columns) - ordering = core.ExpressionOrdering( - [core.OrderingColumnReference(column) for column in total_ordering_columns], + ordering = bigframes.core.ordering.ExpressionOrdering( + tuple( + [core.OrderingColumnReference(column) for column in total_ordering_columns] + ), total_ordering_columns=frozenset(total_ordering_columns), ) - return core.ArrayValue( - session=session, table=ibis_table, columns=columns, ordering=ordering + return core.ArrayValue.from_ibis( + session=session, + table=ibis_table, + columns=columns, + hidden_ordering_columns=(), + ordering=ordering, ) diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 69b9e79807..d9672b2635 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -16,6 +16,7 @@ import pandas import bigframes.core as core +import bigframes.core.ordering import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -37,15 +38,19 @@ def test_arrayvalue_constructor_from_ibis_table_adds_all_columns(): ) ibis_table = session.ibis_client.table("test_table") columns = (ibis_table["col1"], ibis_table["col2"], ibis_table["col3"]) - ordering = core.ExpressionOrdering( - [core.OrderingColumnReference("col1")], + ordering = bigframes.core.ordering.ExpressionOrdering( + tuple([core.OrderingColumnReference("col1")]), total_ordering_columns=frozenset(["col1"]), ) - actual = core.ArrayValue( - session=session, table=ibis_table, columns=columns, ordering=ordering + actual = core.ArrayValue.from_ibis( + session=session, + table=ibis_table, + columns=columns, + ordering=ordering, + hidden_ordering_columns=(), ) - assert actual._table is ibis_table - assert len(actual.columns) == 3 + assert actual.compile()._table is ibis_table + assert len(actual.column_ids) == 3 def test_arrayvalue_with_get_column_type(): @@ -78,7 +83,7 @@ def test_arrayvalue_with_get_column(): ), total_ordering_columns=["col1"], ) - col1 = value._get_ibis_column("col1") + col1 = value.compile()._get_ibis_column("col1") assert isinstance(col1, ibis_types.Value) assert col1.get_name() == "col1" assert col1.type().is_int64() @@ -95,7 +100,7 @@ def test_arrayvalues_to_ibis_expr_with_get_column(): ), total_ordering_columns=["col1"], ) - expr = value._get_ibis_column("col1") + expr = value.compile()._get_ibis_column("col1") assert expr.get_name() == "col1" assert expr.type().is_int64() @@ -112,7 +117,7 @@ def test_arrayvalues_to_ibis_expr_with_concat(): total_ordering_columns=["col1"], ) expr = value.concat([value]) - actual = expr._to_ibis_expr("unordered") + actual = expr.compile()._to_ibis_expr("unordered") assert len(actual.columns) == 3 # TODO(ashleyxu, b/299631930): test out the union expression assert actual.columns[0] == "column_0" @@ -131,8 +136,8 @@ def test_arrayvalues_to_ibis_expr_with_project_unary_op(): ), total_ordering_columns=["col1"], ) - expr = value.project_unary_op("col1", ops.AsTypeOp("string")) - assert value.columns[0].type().is_int64() + expr = value.project_unary_op("col1", ops.AsTypeOp("string")).compile() + assert value.compile().columns[0].type().is_int64() assert expr.columns[0].type().is_string() @@ -147,7 +152,7 @@ def test_arrayvalues_to_ibis_expr_with_project_binary_op(): ), total_ordering_columns=["col1"], ) - expr = value.project_binary_op("col2", "col3", ops.add_op, "col4") + expr = value.project_binary_op("col2", "col3", ops.add_op, "col4").compile() assert expr.columns[3].type().is_float64() actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 4 @@ -166,7 +171,9 @@ def test_arrayvalues_to_ibis_expr_with_project_ternary_op(): ), total_ordering_columns=["col1"], ) - expr = value.project_ternary_op("col2", "col3", "col4", ops.where_op, "col5") + expr = value.project_ternary_op( + "col2", "col3", "col4", ops.where_op, "col5" + ).compile() assert expr.columns[4].type().is_float64() actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 5 @@ -188,7 +195,7 @@ def test_arrayvalue_to_ibis_expr_with_aggregate(): aggregations=(("col1", agg_ops.sum_op, "col4"),), by_column_ids=["col1"], dropna=False, - ) + ).compile() actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 2 assert actual.columns[0] == "col1" @@ -207,7 +214,7 @@ def test_arrayvalue_to_ibis_expr_with_corr_aggregate(): ), total_ordering_columns=["col1"], ) - expr = value.corr_aggregate(corr_aggregations=[("col1", "col3", "col4")]) + expr = value.corr_aggregate(corr_aggregations=[("col1", "col3", "col4")]).compile() actual = expr._to_ibis_expr("unordered") assert len(expr.columns) == 1 assert actual.columns[0] == "col4"