8000 perf: Improve isin performance by TrevorBergeron · Pull Request #1203 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
12 changes: 12 additions & 0 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,18 @@ def project_window_op(
output_name,
)

def isin(
self, other: ArrayValue, lcol: str, rcol: str
) -> typing.Tuple[ArrayValue, str]:
node = nodes.InNode(
self.node,
other.node,
ex.deref(lcol),
ex.deref(rcol),
indicator_col=ids.ColumnId.unique(),
)
return ArrayValue(node), node.indicator_col.name

def relational_join(
self,
other: ArrayValue,
Expand Down
16 changes: 4 additions & 12 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2036,23 +2036,15 @@ def isin(self, other: Block):
return block

def _isin_inner(self: Block, col: str, unique_values: core.ArrayValue) -> Block:
unique_values, const = unique_values.create_constant(
True, dtype=bigframes.dtypes.BOOL_DTYPE
)
expr, (l_map, r_map) = self._expr.relational_join(
unique_values, ((col, unique_values.column_ids[0]),), type="left"
)
expr, matches = expr.project_to_id(ops.notnull_op.as_expr(r_map[const]))
expr, matches = self._expr.isin(unique_values, col, unique_values.column_ids[0])

new_index_cols = tuple(l_map[idx_col] for idx_col in self.index_columns)
new_value_cols = tuple(
l_map[val_col] if val_col != col else matches
for val_col in self.value_columns
val_col if val_col != col else matches for val_col in self.value_columns
)
expr = expr.select_columns((*new_index_cols, *new_value_cols))
expr = expr.select_columns((*self.index_columns, *new_value_cols))
return Block(
expr,
index_columns=new_index_cols,
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self._index_labels,
)
Expand Down
12 changes: 12 additions & 0 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.explode
import bigframes.core.compile.ibis_types
import bigframes.core.compile.isin
import bigframes.core.compile.scalar_op_compiler
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
Expand Down Expand Up @@ -128,6 +129,17 @@ def compile_join(self, node: nodes.JoinNode):
conditions=condition_pairs,
)

@_compile_node.register
def compile_isin(self, node: nodes.InNode):
left_unordered = self.compile_node(node.left_child)
right_unordered = self.compile_node(node.right_child)
return bigframes.core.compile.isin.isin_unordered(
left=left_unordered,
right=right_unordered,
indicator_col=node.indicator_col.sql,
conditions=(node.left_col.id.sql, node.right_col.id.sql),
)

@_compile_node.register
def compile_fromrange(self, node: nodes.FromRangeNode):
# Both start and end are single elements and do not inherently have an order
Expand Down
71 changes: 71 additions & 0 deletions bigframes/core/compile/isin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers to join ArrayValue objects."""

from __future__ import annotations

import itertools
from typing import Tuple

import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.types as ibis_types

import bigframes.core.compile.compiled as compiled


def isin_unordered(
left: compiled.UnorderedIR,
right: compiled.UnorderedIR,
indicator_col: str,
conditions: Tuple[str, str],
) -> compiled.UnorderedIR:
"""Join two expressions by column equality.

Arguments:
left: Expression for left table to join.
right: Expression for right table to join.
conditions: Id pairs to compare
Returns:
The joined expression.
"""
left_table = left._to_ibis_expr()
right_table = right._to_ibis_expr()
new_column = (
value_to_join_key(left_table[conditions[0]])
.isin(value_to_join_key(right_table[conditions[1]]))
.name(indicator_col)
)

columns = tuple(
itertools.chain(
(left_table[col.get_name()] for col in left.columns), (new_column,)
)
)

return compiled.UnorderedIR(
left_table,
columns=columns,
)


def value_to_join_key(value: ibis_types.Value):
"""Converts nullable values to non-null string SQL will not match null keys together - but pandas does."""
if not value.type().is_string():
value = value.cast(ibis_dtypes.str)
return (
value.fill_null(ibis_types.literal("$NULL_SENTINEL$"))
if hasattr(value, "fill_null")
else value.fillna(ibis_types.literal("$NULL_SENTINEL$"))
)
161 changes: 158 additions & 3 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ def explicitly_ordered(self) -> bool:
"""
...

@functools.cached_property
def height(self) -> int:
if len(self.child_nodes) == 0:
return 0
return max(child.height for child in self.child_nodes) + 1

@functools.cached_property
def total_variables(self) -> int:
return self.variables_introduced + sum(
Expand Down Expand Up @@ -284,6 +290,34 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
return self.transform_children(lambda x: x.prune(used_cols))


class AdditiveNode:
"""Definition of additive - if you drop added_fields, you end up with the descendent.

.. code-block:: text

AdditiveNode (fields: a, b, c; added_fields: c)
|
| additive_base
V
BigFrameNode (fields: a, b)

"""

@property
@abc.abstractmethod
def added_fields(self) -> Tuple[Field, ...]:
...

@property
@abc.abstractmethod
def additive_base(self) -> BigFrameNode:
...

@abc.abstractmethod
def replace_additive_base(self, BigFrameNode):
...


@dataclasses.dataclass(frozen=True, eq=False)
class UnaryNode(BigFrameNode):
child: BigFrameNode
Expand Down Expand Up @@ -381,6 +415,106 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
return self


@dataclasses.dataclass(frozen=True, eq=False)
class InNode(BigFrameNode, AdditiveNode):
"""
Special Join Type that only returns rows from the left side, as well as adding a bool column indicating whether a match exists on the right side.

Modelled separately from join node, as this operation preserves row identity.
"""

left_child: BigFrameNode
right_child: BigFrameNode
left_col: ex.DerefOp
right_col: ex.DerefOp
indicator_col: bfet_ids.ColumnId

def _validate(self):
assert not (
set(self.left_child.ids) & set(self.right_child.ids)
), "Join ids collide"

@property
def row_preserving(self) -> bool:
return False

@property
def non_local(self) -> bool:
return True

@property
def child_nodes(self) -> typing.Sequence[BigFrameNode]:
return (self.left_child, self.right_child)

@property
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
# Preserves left ordering always
return True

@property
def added_fields(self) -> Tuple[Field, ...]:
return (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE),)

@property
def fields(self) -> Iterable[Field]:
return itertools.chain(
self.left_child.fields,
self.added_fields,
)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return 1

@property
def joins(self) -> bool:
return True

@property
def row_count(self) -> Optional[int]:
return self.left_child.row_count

@property
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return (self.indicator_col,)

@property
def additive_base(self) -> BigFrameNode:
return self.left_child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, left_child=node)
Comment on lines +490 to +491
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking aloud: I wonder if there's some way we can organize these sorts of tree transformations so that it's easier to reason about which can be applied in which order?


def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
transformed = dataclasses.replace(
self, left_child=t(self.left_child), right_child=t(self.right_child)
)
if self == transformed:
# reusing existing object speeds up eq, and saves a small amount of memory
return self
return transformed

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
return self

def remap_vars(
self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]
) -> BigFrameNode:
return dataclasses.replace(
self, indicator_col=mappings.get(self.indicator_col, self.indicator_col)
)

def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings, allow_partial_bindings=True), right_col=self.right_col.remap_column_refs(mappings, allow_partial_bindings=True)) # type: ignore


@dataclasses.dataclass(frozen=True, eq=False)
class JoinNode(BigFrameNode):
left_child: BigFrameNode
Expand Down Expand Up @@ -926,7 +1060,7 @@ class CachedTableNode(ReadTableNode):

# Unary nodes
@dataclasses.dataclass(frozen=True, eq=False)
class PromoteOffsetsNode(UnaryNode):
class PromoteOffsetsNode(UnaryNode, AdditiveNode):
col_id: bigframes.core.identifiers.ColumnId

@property
Expand Down Expand Up @@ -959,6 +1093,13 @@ def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
def added_fields(self) -> Tuple[Field, ...]:
return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),)

@property
def additive_base(self) -> BigFrameNode:
return self.child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, child=node)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
if self.col_id not in used_cols:
return self.child.prune(used_cols)
Expand Down Expand Up @@ -1171,7 +1312,7 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):


@dataclasses.dataclass(frozen=True, eq=False)
class ProjectionNode(UnaryNode):
class ProjectionNode(UnaryNode, AdditiveNode):
"""Assigns new variables (without modifying existing ones)"""

assignments: typing.Tuple[
Expand Down Expand Up @@ -1212,6 +1353,13 @@ def row_count(self) -> Optional[int]:
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return tuple(id for _, id in self.assignments)

@property
def additive_base(self) -> BigFrameNode:
return self.child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, child=node)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
pruned_assignments = tuple(i for i in self.assignments if i[1] in used_cols)
if len(pruned_assignments) == 0:
Expand Down Expand Up @@ -1378,7 +1526,7 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):


@dataclasses.dataclass(frozen=True, eq=False)
class WindowOpNode(UnaryNode):
class WindowOpNode(UnaryNode, AdditiveNode):
expression: ex.Aggregation
window_spec: window.WindowSpec
output_name: bigframes.core.identifiers.ColumnId
Expand Down Expand Up @@ -1438,6 +1586,13 @@ def inherits_order(self) -> bool:
) and self.expression.op.implicitly_inherits_order
return op_inherits_order or self.window_spec.row_bounded

@property
def additive_base(self) -> BigFrameNode:
return self.child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, child=node)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
if self.output_name not in used_cols:
return self.child.prune(used_cols)
Expand Down
4380 Loading
0