8000 refactor: Extract data loading logic into class by TrevorBergeron · Pull Request #913 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2542,7 +2542,8 @@ def _get_rows_as_json_values(self) -> Block:
SELECT {select_columns_csv} FROM T1
"""
# The only ways this code is used is through df.apply(axis=1) cope path
destination, query_job = self.session._query_to_destination(
# TODO: Stop using internal API
destination, query_job = self.session._loader._query_to_destination(
json_sql, index_cols=[ordering_column_name], api_name="apply"
)
if not destination:
Expand Down
79 changes: 26 additions & 53 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2956,17 +2956,20 @@ def to_csv(
if "*" not in path_or_buf:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

result_table = self._run_io_query(
index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID
)
export_data_statement = bigframes.session._io.bigquery.create_export_csv_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path_or_buf,
field_delimiter=sep,
header=header,
export_array, id_overrides = self._prepare_export(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_csv"
options = {
"field_delimiter": sep,
"header": header,
}
query_job = self._session._executor.export_gcs(
export_array,
id_overrides,
path_or_buf,
format="csv",
export_options=options,
)
self._set_internal_query_job(query_job)
return None
Expand Down Expand Up @@ -3006,17 +3009,12 @@ def to_json(
"'lines' keyword is only valid when 'orient' is 'records'."
)

result_table = self._run_io_query(
index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID
)
export_data_statement = bigframes.session._io.bigquery.create_export_data_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path_or_buf,
format="JSON",
export_options={},
export_array, id_overrides = self._prepare_export(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_json"
query_job = self._session._executor.export_gcs(
export_array, id_overrides, path_or_buf, format="json", export_options={}
)
self._set_internal_query_job(query_job)
return None
Expand Down Expand Up @@ -3145,18 +3143,17 @@ def to_parquet(
if compression:
export_options["compression"] = compression.upper()

result_table = self._run_io_query(
index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID
export_array, id_overrides = self._prepare_export(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
export_data_statement = bigframes.session._io.bigquery.create_export_data_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path,
format="PARQUET",
query_job = self._session._executor.export_gcs(
export_array,
id_overrides,
path,
format="parquet",
export_options=export_options,
)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_parquet"
)
self._set_internal_query_job(query_job)
return None

Expand Down Expand Up @@ -3386,30 +3383,6 @@ def _prepare_export(
array_value = array_value.promote_offsets(ordering_id)
return array_value, id_overrides

def _run_io_query(
self,
index: bool,
ordering_id: Optional[str] = None,
) -> bigquery.TableReference:
"""Executes a query job presenting this dataframe and returns the destination
table."""
session = self._block.expr.session
export_array, id_overrides = self._prepare_export(
index=index and self._has_index, ordering_id=ordering_id
)

_, query_job = session._execute(
export_array,
ordered=False,
col_id_overrides=id_overrides,
)
self._set_internal_query_job(query_job)

# The query job should have finished, so there should be always be a result table.
result_table = query_job.destination
assert result_table is not None
return result_table

def map(self, func, na_action: Optional[str] = None) -> DataFrame:
if not callable(func):
raise TypeError("the first argument must be callable")
Expand Down
9 changes: 8 additions & 1 deletion bigframes/functions/_remote_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import google.api_core.retry
from google.cloud import bigquery, functions_v2

import bigframes.session._io.bigquery

from . import _utils

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -142,7 +144,12 @@ def create_bq_remote_function(
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = self._session._start_query(create_function_ddl)
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)

logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
Expand Down
Loading
0