8000 refactor: ArrayValue is now a tree that defers conversion to ibis by TrevorBergeron · Pull Request #110 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2e51559
refactor: ArrayValue is now a tree that defers conversion to ibis
TrevorBergeron Oct 13, 2023
29d10c1
fix test failures
TrevorBergeron Oct 13, 2023
2425e60
make tree hashable
TrevorBergeron Oct 13, 2023
ea7483a
cleanup code
TrevorBergeron Oct 13, 2023
dda9bab
fix unit-test using list instead of tuple
TrevorBergeron Oct 13, 2023
a8fbd05
Merge branch 'main' into thetree
TrevorBergeron Oct 13, 2023
6f4eef8
make ReadLocalNode hashable
TrevorBergeron Oct 13, 2023
69623c5
more unit test fixes, remove session from CompiledArrayValue
TrevorBergeron Oct 13, 2023
4a614e6
allow shape computation without session for local data
TrevorBergeron Oct 13, 2023
776633d
make window_spec hashable
TrevorBergeron Oct 13, 2023
730641f
Merge branch 'main' into thetree
TrevorBergeron Oct 13, 2023
95e6feb
further alter no-session shape logic
TrevorBergeron Oct 16, 2023
9a6af92
Merge branch 'main' into thetree
TrevorBergeron Oct 17, 2023
03ca19c
Merge remote-tracking branch 'github/main' into thetree
TrevorBergeron Oct 26, 2023
75eda7f
pr comments
TrevorBergeron Oct 26, 2023
ac4ef6e
more pr comments
TrevorBergeron Oct 26, 2023
6e4df09
Merge branch 'main' into thetree
TrevorBergeron Oct 26, 2023
06aec95
Merge branch 'main' into thetree
TrevorBergeron Oct 26, 2023
24ba2f7
Merge branch 'main' into thetree
TrevorBergeron Oct 26, 2023
5ce0895
Merge remote-tracking branch 'github/main' into thetree
TrevorBergeron Oct 26, 2023
daf2ffe
fix bug from merge
TrevorBergeron Oct 26, 2023
13e77db
Merge branch 'main' into thetree
TrevorBergeron Oct 26, 2023
f268f38
fix another merge bug
TrevorBergeron Oct 26, 2023
efbdf3e
Merge branch 'main' into thetree
TrevorBergeron Oct 26, 2023
733eb3c
more merge failures...
TrevorBergeron Oct 26, 2023
48d0794
Merge remote-tracking branch 'github/main' into thetree
TrevorBergeron Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,221 changes: 216 additions & 1,005 deletions bigframes/core/__init__.py

Large diffs are not rendered by default.

25 changes: 13 additions & 12 deletions bigframes/core/block_transforms.py
7440
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.core.ordering as ordering
import bigframes.core.window_spec as windows
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops

Expand Down Expand Up @@ -68,21 +69,21 @@ def indicate_duplicates(
if keep == "first":
# Count how many copies occur up to current copy of value
# Discard this value if there are copies BEFORE
window_spec = core.WindowSpec(
window_spec = windows.WindowSpec(
grouping_keys=tuple(columns),
following=0,
)
elif keep == "last":
# Count how many copies occur up to current copy of values
# Discard this value if there are copies AFTER
window_spec = core.WindowSpec(
window_spec = windows.WindowSpec(
grouping_keys=tuple(columns),
preceding=0,
)
else: # keep == False
# Count how many copies of the value occur in entire series.
# Discard this value if there are copies ANYWHERE
window_spec = core.WindowSpec(grouping_keys=tuple(columns))
window_spec = windows.WindowSpec(grouping_keys=tuple(columns))
block, dummy = block.create_constant(1)
block, val_count_col_id = block.apply_window_op(
dummy,
Expand Down Expand Up @@ -131,7 +132,7 @@ def value_counts(
)
count_id = agg_ids[0]
if normalize:
unbound_window = core.WindowSpec()
unbound_window = windows.WindowSpec()
block, total_count_id = block.apply_window_op(
count_id, agg_ops.sum_op, unbound_window
)
Expand All @@ -153,7 +154,7 @@ def value_counts(

def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
column_labels = block.column_labels
window_spec = core.WindowSpec(
window_spec = windows.WindowSpec(
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
Expand Down Expand Up @@ -195,7 +196,7 @@ def rank(
ops.isnull_op,
)
nullity_col_ids.append(nullity_col_id)
window = core.WindowSpec(
window = windows.WindowSpec(
# BigQuery has syntax to reorder nulls with "NULLS FIRST/LAST", but that is unavailable through ibis presently, so must order on a separate nullity expression first.
ordering=(
ordering.OrderingColumnReference(
Expand Down Expand Up @@ -229,7 +230,7 @@ def rank(
block, result_id = block.apply_window_op(
rownum_col_ids[i],
agg_op,
window_spec=core.WindowSpec(grouping_keys=[columns[i]]),
window_spec=windows.WindowSpec(grouping_keys=(columns[i],)),
skip_reproject_unsafe=(i < (len(columns) - 1)),
)
post_agg_rownum_col_ids.append(result_id)
Expand Down Expand Up @@ -311,7 +312,7 @@ def nsmallest(
block, counter = block.apply_window_op(
column_ids[0],
agg_ops.rank_op,
window_spec=core.WindowSpec(ordering=order_refs),
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
)
block, condition = block.apply_unary_op(
counter, ops.partial_right(ops.le_op, n)
Expand Down Expand Up @@ -343,7 +344,7 @@ def nlargest(
block, counter = block.apply_window_op(
column_ids[0],
agg_ops.rank_op,
window_spec=core.WindowSpec(ordering=order_refs),
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
)
block, condition = block.apply_unary_op(
counter, ops.partial_right(ops.le_op, n)
Expand Down Expand Up @@ -440,14 +441,14 @@ def _mean_delta_to_power(
grouping_column_ids: typing.Sequence[str],
) -> typing.Tuple[blocks.Block, typing.Sequence[str]]:
"""Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis."""
window = core.WindowSpec(grouping_keys=grouping_column_ids)
window = windows.WindowSpec(grouping_keys=tuple(grouping_column_ids))
block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window)
delta_ids = []
cube_op = ops.partial_right(ops.pow_op, n_power)
for val_id, mean_val_id in zip(column_ids, mean_ids):
block, delta_id = block.apply_binary_op(val_id, mean_val_id, ops.sub_op)
block, delta_power_id = block.apply_unary_op(delta_id, cube_op)
block = block.drop_columns(delta_id)
block = block.drop_columns([delta_id])
delta_ids.append(delta_power_id)
return block, delta_ids

Expand Down Expand Up @@ -645,7 +646,7 @@ def _idx_extrema(
for idx_col in original_block.index_columns
],
]
window_spec = core.WindowSpec(ordering=order_refs)
window_spec = windows.WindowSpec(ordering=tuple(order_refs))
idx_col = original_block.index_columns[0]
block, result_col = block.apply_window_op(
idx_col, agg_ops.first_op, window_spec
Expand Down
34 changes: 18 additions & 16 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import bigframes.core as core
import bigframes.core.guid as guid
import bigframes.core.indexes as indexes
import bigframes.core.joins as joins
import bigframes.core.joins.name_resolution as join_names
import bigframes.core.ordering as ordering
import bigframes.core.utils
Expand Down Expand Up @@ -378,7 +377,7 @@ def _to_dataframe(self, result) -> pd.DataFrame:
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
return self._expr._session._rows_to_dataframe(result, dtypes)
return self._expr.session._rows_to_dataframe(result, dtypes)

def to_pandas(
self,
Expand Down Expand Up @@ -422,7 +421,7 @@ def to_pandas_batches(self):
dtypes.update(zip(self.value_columns, self.dtypes))
results_iterator, _ = self._expr.start_query()
for arrow_table in results_iterator.to_arrow_iterable(
bqstorage_client=self._expr._session.bqstoragereadclient
bqstorage_client=self._expr.session.bqstoragereadclient
):
df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
self._copy_index_to_pandas(df)
Expand Down Expand Up @@ -454,7 +453,9 @@ def _compute_and_count(

results_iterator, query_job = expr.start_query(max_results=max_results)

table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
table_size = (
expr.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
)
fraction = (
max_download_size / table_size
if (max_download_size is not None) and (table_size != 0)
Expand Down Expand Up @@ -819,7 +820,9 @@ def aggregate_all_and_stack(
axis: int | str = 0,
value_col_id: str = "values",
dropna: bool = True,
dtype=pd.Float64Dtype(),
dtype: typing.Union[
bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...]
] = pd.Float64Dtype(),
) -> Block:
axis_n = utils.get_axis_number(axis)
if axis_n == 0:
Expand All @@ -829,7 +832,7 @@ def aggregate_all_and_stack(
result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot(
row_labels=self.column_labels.to_list(),
index_col_ids=["index"],
unpivot_columns=[(value_col_id, self.value_columns)],
unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]),
dtype=dtype,
)
return Block(result_expr, index_columns=["index"], column_labels=[None])
Expand All @@ -841,7 +844,7 @@ def aggregate_all_and_stack(
stacked_expr = expr_with_offsets.unpivot(
row_labels=self.column_labels.to_list(),
index_col_ids=[guid.generate_guid()],
unpivot_columns=[(value_col_id, self.value_columns)],
unpivot_columns=[(value_col_id, tuple(self.value_columns))],
passthrough_columns=[*self.index_columns, offset_col],
dtype=dtype,
)
Expand Down Expand Up @@ -1029,13 +1032,13 @@ def summarize(
for col_id in column_ids
]
columns = [
(col_id, [f"{col_id}-{stat.name}" for stat in stats])
(col_id, tuple(f"{col_id}-{stat.name}" for stat in stats))
for col_id in column_ids
]
expr = self.expr.aggregate(aggregations).unpivot(
labels,
unpivot_columns=columns,
index_col_ids=[label_col_id],
unpivot_columns=tuple(columns),
index_col_ids=tuple([label_col_id]),
)
labels = self._get_labels_for_columns(column_ids)
return Block(expr, column_labels=labels, index_columns=[label_col_id])
Expand Down Expand Up @@ -1342,7 +1345,7 @@ def stack(self, how="left", levels: int = 1):
passthrough_columns=self.index_columns,
unpivot_columns=unpivot_columns,
index_col_ids=added_index_columns,
dtype=dtypes,
dtype=tuple(dtypes),
how=how,
)
new_index_level_names = self.column_labels.names[-levels:]
Expand Down Expand Up @@ -1382,7 +1385,7 @@ def _create_stack_column(
dtype = self._column_type(input_id)
input_columns.append(input_id)
# Input column i is the first one that
return input_columns, dtype or pd.Float64Dtype()
return tuple(input_columns), dtype or pd.Float64Dtype()

def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype:
col_offset = self.value_columns.index(col_id)
Expand Down Expand Up @@ -1497,8 +1500,7 @@ def merge(
sort: bool,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> Block:
joined_expr = joins.join_by_column(
self.expr,
joined_expr = self.expr.join(
left_join_ids,
other.expr,
right_join_ids,
Expand Down Expand Up @@ -1708,7 +1710,7 @@ def _is_monotonic(
return result


def block_from_local(data, session=None) -> Block:
def block_from_local(data) -> Block:
pd_data = pd.DataFrame(data)
columns = pd_data.columns

Expand All @@ -1730,7 +1732,7 @@ def block_from_local(data, session=None) -> Block:
)
index_ids = pd_data.columns[: len(index_labels)]

keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
keys_expr = core.ArrayValue.from_pandas(pd_data)
return Block(
keys_expr,
column_labels=columns,
Expand Down
21 changes: 21 additions & 0 deletions bigframes/core/compile/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from bigframes.core.compile.compiled import CompiledArrayValue
from bigframes.core.compile.compiler import compile_node

__all__ = [
"compile_node",
"CompiledArrayValue",
]
Loading
0