diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index fa18f00483..0735c4fc5a 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -253,9 +253,16 @@ def _( value_generator = iter_array( array.flatten(), bigframes.dtypes.get_array_inner_type(dtype) ) - for (start, end) in _pairwise(array.offsets): - arr_size = end.as_py() - start.as_py() - yield list(itertools.islice(value_generator, arr_size)) + offset_generator = iter_array(array.offsets, bigframes.dtypes.INT_DTYPE) + + start_offset = None + end_offset = None + for offset in offset_generator: + start_offset = end_offset + end_offset = offset + if start_offset is not None: + arr_size = end_offset - start_offset + yield list(itertools.islice(value_generator, arr_size)) @iter_array.register def _( @@ -267,8 +274,15 @@ def _( sub_generators[field_name] = iter_array(array.field(field_name), dtype) keys = list(sub_generators.keys()) - for row_values in zip(*sub_generators.values()): - yield {key: value for key, value in zip(keys, row_values)} + is_null_generator = iter_array(array.is_null(), bigframes.dtypes.BOOL_DTYPE) + + for values in zip(is_null_generator, *sub_generators.values()): + is_row_null = values[0] + row_values = values[1:] + if not is_row_null: + yield {key: value for key, value in zip(keys, row_values)} + else: + yield None for batch in table.to_batches(): sub_generators: dict[str, Generator[Any, None, None]] = {} @@ -491,16 +505,3 @@ def _schema_durations_to_ints(schema: pa.Schema) -> pa.Schema: return pa.schema( pa.field(field.name, _durations_to_ints(field.type)) for field in schema ) - - -def _pairwise(iterable): - do_yield = False - a = None - b = None - for item in iterable: - a = b - b = item - if do_yield: - yield (a, b) - else: - do_yield = True diff --git a/tests/system/small/engines/test_read_local.py b/tests/system/small/engines/test_read_local.py index abdd29c4ac..257bddd917 100644 --- a/tests/system/small/engines/test_read_local.py +++ b/tests/system/small/engines/test_read_local.py @@ -88,8 +88,9 @@ def test_engines_read_local_w_zero_row_source( assert_equivalence_execution(local_node, REFERENCE_ENGINE, engine) -# TODO: Fix sqlglot impl -@pytest.mark.parametrize("engine", ["polars", "bq", "pyarrow"], indirect=True) +@pytest.mark.parametrize( + "engine", ["polars", "bq", "pyarrow", "bq-sqlglot"], indirect=True +) def test_engines_read_local_w_nested_source( fake_session: bigframes.Session, nested_data_source: local_data.ManagedArrowTable, diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql deleted file mode 100644 index 42b7bc7361..0000000000 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal_w_nested_structs_df/out.sql +++ /dev/null @@ -1,19 +0,0 @@ -SELECT - * -FROM UNNEST(ARRAY>, `bfcol_2` INT64>>[( - 1, - STRUCT( - 'Alice' AS `name`, - 30 AS `age`, - STRUCT('New York' AS `city`, 'USA' AS `country`) AS `address` - ), - 0 -), ( - 2, - STRUCT( - 'Bob' AS `name`, - 25 AS `age`, - STRUCT('London' AS `city`, 'UK' AS `country`) AS `address` - ), - 1 -)]) \ No newline at end of file diff --git a/tests/unit/test_local_data.py b/tests/unit/test_local_data.py index dfd1cd622f..6f23036efb 100644 --- a/tests/unit/test_local_data.py +++ b/tests/unit/test_local_data.py @@ -20,20 +20,21 @@ pd_data = pd.DataFrame( { - "ints": [10, 20, 30, 40], - "nested_ints": [[1, 2], [3, 4, 5], [], [20, 30]], - "structs": [{"a": 100}, {}, {"b": 200}, {"b": 300}], + "ints": [10, 20, 30, 40, 50], + "nested_ints": [[1, 2], [], [3, 4, 5], [], [20, 30]], + "structs": [{"a": 100}, None, {}, {"b": 200}, {"b": 300}], } ) pd_data_normalized = pd.DataFrame( { - "ints": pd.Series([10, 20, 30, 40], dtype=dtypes.INT_DTYPE), + "ints": pd.Series([10, 20, 30, 40, 50], dtype=dtypes.INT_DTYPE), "nested_ints": pd.Series( - [[1, 2], [3, 4, 5], [], [20, 30]], dtype=pd.ArrowDtype(pa.list_(pa.int64())) + [[1, 2], [], [3, 4, 5], [], [20, 30]], + dtype=pd.ArrowDtype(pa.list_(pa.int64())), ), "structs": pd.Series( - [{"a": 100}, {}, {"b": 200}, {"b": 300}], + [{"a": 100}, None, {}, {"b": 200}, {"b": 300}], dtype=pd.ArrowDtype(pa.struct({"a": pa.int64(), "b": pa.int64()})), ), } @@ -122,11 +123,11 @@ def test_local_data_well_formed_round_trip_chunked(): def test_local_data_well_formed_round_trip_sliced(): pa_table = pa.Table.from_pandas(pd_data, preserve_index=False) - as_rechunked_pyarrow = pa.Table.from_batches(pa_table.slice(2, 4).to_batches()) + as_rechunked_pyarrow = pa.Table.from_batches(pa_table.slice(0, 4).to_batches()) local_entry = local_data.ManagedArrowTable.from_pyarrow(as_rechunked_pyarrow) result = pd.DataFrame(local_entry.itertuples(), columns=pd_data.columns) pandas.testing.assert_frame_equal( - pd_data_normalized[2:4].reset_index(drop=True), + pd_data_normalized[0:4].reset_index(drop=True), result.reset_index(drop=True), check_dtype=False, ) @@ -143,3 +144,25 @@ def test_local_data_not_equal_other(): local_entry2 = local_data.ManagedArrowTable.from_pandas(pd_data[::2]) assert local_entry != local_entry2 assert hash(local_entry) != hash(local_entry2) + + +def test_local_data_itertuples_struct_none(): + pd_data = pd.DataFrame( + { + "structs": [{"a": 100}, None, {"b": 200}, {"b": 300}], + } + ) + local_entry = local_data.ManagedArrowTable.from_pandas(pd_data) + result = list(local_entry.itertuples()) + assert result[1][0] is None + + +def test_local_data_itertuples_list_none(): + pd_data = pd.DataFrame( + { + "lists": [[1, 2], None, [3, 4]], + } + ) + local_entry = local_data.ManagedArrowTable.from_pandas(pd_data) + result = list(local_entry.itertuples()) + assert result[1][0] == []