From 73434d3e8ec99d8439094c7d58aa380299f81dd3 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 23 Apr 2025 17:18:30 +0000 Subject: [PATCH 01/20] add error handling for one function, test is not clean --- bigframes/blob/_functions.py | 59 ++++++++++++++++++++---------------- bigframes/operations/blob.py | 59 ++++++++++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 28 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 8dd9328fb8..9272be10a2 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -395,40 +395,47 @@ def image_normalize_func( def image_normalize_to_bytes_func( src_obj_ref_rt: str, alpha: float, beta: float, norm_type: str, ext: str -) -> bytes: - import json +) -> str: + try: + import base64 + import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - ext = ext or ".jpeg" + ext = ext or ".jpeg" - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } + norm_type_mapping = { + "inf": cv.NORM_INF, + "l1": cv.NORM_L1, + "l2": cv.NORM_L2, + "minmax": cv.NORM_MINMAX, + } - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - response = session.get(src_url, timeout=30) - bts = response.content + response = session.get(src_url, timeout=30) + bts = response.content - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) - bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_normalized = cv.normalize( + img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] + ) + bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() + result_dict = {"status": "", "content": base64.b64encode(bts).decode("utf-8")} - return bts + except Exception as e: + result_dict = {"status": str(e), "content": ""} + + result_json = json.dumps(result_dict) + return result_json image_normalize_to_bytes_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index d505f096f4..eed8ae8d3d 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -544,6 +544,7 @@ def image_normalize( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Normalize images. @@ -561,6 +562,10 @@ def image_normalize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default "False"): controls the verbosity of the output. + when set to True, both error messages and the normalized image + content are displayed. Conversely, when set to False, only the + normalized image content is presented, suppressing error messages. Returns: bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. @@ -568,7 +573,17 @@ def image_normalize( if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") + bigframes.series.Series: blob Series if destination is GCS. Or + struct[str, bytes] or bytes Series if destination is BQ, + depend on the "verbose" parameter. Contains the normalized image + data. Includes error messages if verbosity is enbled. + + """ + import base64 + + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -591,7 +606,27 @@ def image_normalize( df["ext"] = ext # type: ignore res = self._df_apply_udf(df, image_normalize_udf) - return res + bq_session = self._block.bq_session + encoded_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + base64_decode_udf = bq_session.register_function( + "base64_decode_bq", + lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))") + .to_dataframe() + .iloc[0, 0], + ) + decoded_content_series = encoded_content_series.apply(base64_decode_udf) + + if verbose: + status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) + res_df = bpd.DataFrame( + {"status": status_series, "content": decoded_content_series} + ) + struct_series = bbq.struct(res_df) + return struct_series + else: + return decoded_content_series if isinstance(dst, str): dst = os.path.join(dst, "") @@ -623,7 +658,27 @@ def image_normalize( res = self._df_apply_udf(df, image_normalize_udf) res.cache() # to execute the udf - return dst + bq_session = self._block.bq_session + encoded_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + base64_decode_udf = bq_session.register_function( + "base64_decode_bq", + lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))") + .to_dataframe() + .iloc[0, 0], + ) + decoded_content_series = encoded_content_series.apply(base64_decode_udf) + + if verbose: + status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) + res_df = bpd.DataFrame( + {"status": status_series, "content": decoded_content_series} + ) + struct_series = bbq.struct(res_df) + return struct_series + else: + return decoded_content_series def pdf_extract( self, From 47980721e668f68913f94606c57bdc02627d9ccf Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 4 Jun 2025 18:12:54 +0000 Subject: [PATCH 02/20] add verbose for image function --- bigframes/operations/blob.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index eed8ae8d3d..ae2609fcf1 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -579,8 +579,6 @@ def image_normalize( data. Includes error messages if verbosity is enbled. """ - import base64 - import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func import bigframes.pandas as bpd @@ -623,8 +621,8 @@ def image_normalize( res_df = bpd.DataFrame( {"status": status_series, "content": decoded_content_series} ) - struct_series = bbq.struct(res_df) - return struct_series + res_struct = bbq.struct(res_df) + return res_struct else: return decoded_content_series @@ -675,8 +673,8 @@ def image_normalize( res_df = bpd.DataFrame( {"status": status_series, "content": decoded_content_series} ) - struct_series = bbq.struct(res_df) - return struct_series + res_struct = bbq.struct(res_df) + return res_struct else: return decoded_content_series @@ -739,8 +737,8 @@ def pdf_extract( if verbose: status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df) - return struct_series + res_struct = bbq.struct(res_df) + return res_struct else: return content_series @@ -820,8 +818,8 @@ def pdf_chunk( if verbose: status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df) - return struct_series + res_struct = bbq.struct(res_df) + return res_struct else: return content_series From ffacceea2ae17cac8132de0efb26783d8cddab24 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Tue, 24 Jun 2025 21:30:53 +0000 Subject: [PATCH 03/20] still have decoding issue with image normarlize to bytes --- bigframes/blob/_functions.py | 124 +++++++++------ bigframes/operations/blob.py | 190 +++++++++++++++-------- tests/system/large/blob/test_function.py | 34 +++- 3 files changed, 235 insertions(+), 113 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 9272be10a2..3268575f9d 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -21,7 +21,13 @@ import bigframes.session import bigframes.session._io.bigquery as bf_io_bigquery -_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} +_PYTHON_TO_BQ_TYPES = { + int: "INT64", + float: "FLOAT64", + str: "STRING", + bytes: "BYTES", + bool: "BOOL", +} @dataclass(frozen=True) @@ -334,58 +340,70 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: - import json + try: + import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - ext = ext or ".jpeg" + ext = ext or ".jpeg" - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } + norm_type_mapping = { + "inf": cv.NORM_INF, + "l1": cv.NORM_L1, + "l2": cv.NORM_L2, + "minmax": cv.NORM_MINMAX, + } - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - response = session.get(src_url, timeout=30) - bts = response.content + response = session.get(src_url, timeout=30) + bts = response.content - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_normalized = cv.normalize( + img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] + ) - bts = cv.imencode(ext, img_normalized)[1].tobytes() + bts = cv.imencode(ext, img_normalized)[1].tobytes() - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) + if verbose: + result_dict = {"status": "", "content": dst_obj_ref_rt} + return json.dumps(result_dict) + else: + return dst_obj_ref_rt - return dst_obj_ref_rt + except Exception as e: + if verbose: + result_dict = {"status": str(e), "content": None} + return json.dumps(result_dict) + else: + return None image_normalize_def = FunctionDef( @@ -394,7 +412,12 @@ def image_normalize_func( def image_normalize_to_bytes_func( - src_obj_ref_rt: str, alpha: float, beta: float, norm_type: str, ext: str + src_obj_ref_rt: str, + alpha: float, + beta: float, + norm_type: str, + ext: str, + verbose: bool, ) -> str: try: import base64 @@ -429,13 +452,22 @@ def image_normalize_to_bytes_func( img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] ) bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() - result_dict = {"status": "", "content": base64.b64encode(bts).decode("utf-8")} - except Exception as e: - result_dict = {"status": str(e), "content": ""} + if verbose: + content_b64 = base64.b64encode(bts).decode("utf-8") + result_dict = {"status": "", "content": content_b64} + result_json = json.dumps(result_dict) + return result_json + else: + return bts - result_json = json.dumps(result_dict) - return result_json + except Exception as e: + if verbose: + result_dict = {"status": str(e), "content": b""} + result_json = json.dumps(result_dict) + return result_json + else: + return b"" image_normalize_to_bytes_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index ae2609fcf1..68518292ad 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -313,6 +313,7 @@ def exif( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Extract EXIF data. Now only support image types. @@ -322,15 +323,17 @@ def exif( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: JSON series of key-value pairs. + bigframes.series.Series: JSON series of key-value pairs if verbose=False, or struct with status and content if verbose=True. """ if engine is None or engine.casefold() != "pillow": raise ValueError("Must specify the engine, supported value is 'pillow'.") import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -345,9 +348,22 @@ def exif( ).udf() res = self._df_apply_udf(df, exif_udf) - res = bbq.parse_json(res) - return res + exif_content_series = bbq.parse_json( + res._apply_unary_op(ops.JSONValue(json_path="$.content")) + ).rename("exif_content") + + if verbose: + exif_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": exif_status_series, "content": exif_content_series} + ) + results_struct = bbq.struct(results_df).rename("exif_results") + return results_struct + else: + return exif_content_series def image_blur( self, @@ -359,6 +375,7 @@ def image_blur( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Blurs images. @@ -374,14 +391,17 @@ def image_blur( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -402,7 +422,21 @@ def image_blur( df["ext"] = ext # type: ignore res = self._df_apply_udf(df, image_blur_udf) - return res + blurred_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ).rename("blurred_content") + + if verbose: + blurred_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": blurred_status_series, "content": blurred_content_series} + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct + else: + return blurred_content_series if isinstance(dst, str): dst = os.path.join(dst, "") @@ -432,7 +466,17 @@ def image_blur( res = self._df_apply_udf(df, image_blur_udf) res.cache() # to execute the udf - return dst + if verbose: + blurred_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": blurred_status_series, "content": dst} + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct + else: + return dst def image_resize( self, @@ -446,6 +490,7 @@ def image_resize( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ): """Resize images. @@ -463,9 +508,10 @@ def image_resize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") @@ -477,7 +523,9 @@ def image_resize( "Only one of dsize or (fx, fy) parameters must be set. And the set values must be positive. " ) + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -499,7 +547,21 @@ def image_resize( df["ext"] = ext # type: ignore res = self._df_apply_udf(df, image_resize_udf) - return res + resized_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ).rename("resized_content") + + if verbose: + resized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": resized_status_series, "content": resized_content_series} + ) + results_struct = bbq.strcut(results_df).rename("resized_results") + return results_struct + else: + return resized_content_series if isinstance(dst, str): dst = os.path.join(dst, "") @@ -530,7 +592,17 @@ def image_resize( res = self._df_apply_udf(df, image_resize_udf) res.cache() # to execute the udf - return dst + if verbose: + resized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": resized_status_series, "content": dst} + ) + results_struct = bbq.struct(results_df).rename("resized_results") + return results_struct + else: + return dst def image_normalize( self, @@ -562,23 +634,16 @@ def image_normalize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. - verbose (bool, default "False"): controls the verbosity of the output. - when set to True, both error messages and the normalized image - content are displayed. Conversely, when set to False, only the - normalized image content is presented, suppressing error messages. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") - bigframes.series.Series: blob Series if destination is GCS. Or - struct[str, bytes] or bytes Series if destination is BQ, - depend on the "verbose" parameter. Contains the normalized image - data. Includes error messages if verbosity is enbled. + import base64 - """ import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func import bigframes.pandas as bpd @@ -602,29 +667,25 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) - bq_session = self._block.bq_session - encoded_content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - base64_decode_udf = bq_session.register_function( - "base64_decode_bq", - lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))") - .to_dataframe() - .iloc[0, 0], - ) - decoded_content_series = encoded_content_series.apply(base64_decode_udf) - if verbose: - status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame( - {"status": status_series, "content": decoded_content_series} + normalized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + normalized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + # TODO this is not allowed, I need to find another way + normalized_bytes = base64.b64decode(normalized_content_b64_series) + results_df = bpd.DataFrame( + {"status": normalized_status_series, "content": normalized_bytes} ) - res_struct = bbq.struct(res_df) - return res_struct + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct else: - return decoded_content_series + return res if isinstance(dst, str): dst = os.path.join(dst, "") @@ -652,31 +713,22 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + # df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) res.cache() # to execute the udf - bq_session = self._block.bq_session - encoded_content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - base64_decode_udf = bq_session.register_function( - "base64_decode_bq", - lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))") - .to_dataframe() - .iloc[0, 0], - ) - decoded_content_series = encoded_content_series.apply(base64_decode_udf) - if verbose: - status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame( - {"status": status_series, "content": decoded_content_series} + normalized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": normalized_status_series, "content": dst} ) - res_struct = bbq.struct(res_df) - return res_struct + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct else: - return decoded_content_series + return dst def pdf_extract( self, @@ -732,15 +784,19 @@ def pdf_extract( res = src_rt.apply(pdf_extract_udf) - content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + extracted_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ).rename("extracted_content") if verbose: status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - res_struct = bbq.struct(res_df) - return res_struct + results_df = bpd.DataFrame( + {"status": status_series, "content": extracted_content_series} + ) + results_struct = bbq.struct(results_df).rename("extracted_results") + return results_struct else: - return content_series + return extracted_content_series def pdf_chunk( self, @@ -814,14 +870,18 @@ def pdf_chunk( res = self._df_apply_udf(df, pdf_chunk_udf) - content_series = bbq.json_extract_string_array(res, "$.content") + chunked_content_series = bbq.json_extract_string_array(res, "$.content").rename( + "chunked_content" + ) if verbose: status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - res_struct = bbq.struct(res_df) - return res_struct + results_df = bpd.DataFrame( + {"status": status_series, "content": chunked_content_series} + ) + resultes_struct = bbq.struct(results_df).rename("chunked_results") + return resultes_struct else: - return content_series + return chunked_content_series def audio_transcribe( self, diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index f006395d2f..7a79fc200b 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -52,14 +52,24 @@ def images_output_uris(images_output_folder: str) -> list[str]: ] +@pytest.mark.parametrize( + "verbose, expected_type", + [ + (True, "struct"), + (False, "json"), + ], +) def test_blob_exif( bq_connection: str, session: bigframes.Session, + verbose: bool, + expected_type: str, ): exif_image_df = session.from_glob_path( "gs://bigframes_blob_test/images_exif/*", name="blob_col", connection=bq_connection, + verbose=verbose, ) actual = exif_image_df["blob_col"].blob.exif( @@ -288,18 +298,38 @@ def test_blob_image_normalize_to_folder( assert not actual.blob.size().isna().any() -def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): +@pytest.mark.parametrize( + "verbose", + [True, False], +) +def test_blob_image_normalize_to_bq( + images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, beta=150.0, norm_type="minmax", connection=bq_connection, engine="opencv", + verbose=verbose, ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE + + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + else: + assert actual.dtype == dtypes.BYTES_DTYPE @pytest.mark.parametrize( From a574c03b441d1184ceb105a4517e87adfca4dd76 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Tue, 1 Jul 2025 18:01:44 +0000 Subject: [PATCH 04/20] Add image normalize error handling --- bigframes/blob/_functions.py | 42 ++++++----------- bigframes/operations/blob.py | 45 +++++++++++-------- tests/system/large/blob/test_function.py | 57 +++++++++++++++--------- 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 3268575f9d..bc7471e3e9 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -26,7 +26,6 @@ float: "FLOAT64", str: "STRING", bytes: "BYTES", - bool: "BOOL", } @@ -340,11 +339,12 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, - verbose: bool, ) -> str: - try: - import json + import json + + result_dict = {"status": "", "content": dst_obj_ref_rt} + try: import cv2 as cv # type: ignore import numpy as np import requests @@ -392,18 +392,11 @@ def image_normalize_func( }, timeout=30, ) - if verbose: - result_dict = {"status": "", "content": dst_obj_ref_rt} - return json.dumps(result_dict) - else: - return dst_obj_ref_rt except Exception as e: - if verbose: - result_dict = {"status": str(e), "content": None} - return json.dumps(result_dict) - else: - return None + result_dict["status"] = str(e) + + return json.dumps(result_dict) image_normalize_def = FunctionDef( @@ -417,7 +410,6 @@ def image_normalize_to_bytes_func( beta: float, norm_type: str, ext: str, - verbose: bool, ) -> str: try: import base64 @@ -453,21 +445,15 @@ def image_normalize_to_bytes_func( ) bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() - if verbose: - content_b64 = base64.b64encode(bts).decode("utf-8") - result_dict = {"status": "", "content": content_b64} - result_json = json.dumps(result_dict) - return result_json - else: - return bts + content_b64 = base64.b64encode(bts).decode("utf-8") + result_dict = {"status": "", "content": content_b64} + result_json = json.dumps(result_dict) except Exception as e: - if verbose: - result_dict = {"status": str(e), "content": b""} - result_json = json.dumps(result_dict) - return result_json - else: - return b"" + result_dict = {"status": str(e), "content": b""} + result_json = json.dumps(result_dict) + + return result_json image_normalize_to_bytes_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 68518292ad..733409463c 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -642,8 +642,6 @@ def image_normalize( if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") - import base64 - import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func import bigframes.pandas as bpd @@ -667,25 +665,25 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore - df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) + normalized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[normalized_content_b64_series] + ) if verbose: normalized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) - normalized_content_b64_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - # TODO this is not allowed, I need to find another way - normalized_bytes = base64.b64decode(normalized_content_b64_series) results_df = bpd.DataFrame( {"status": normalized_status_series, "content": normalized_bytes} ) results_struct = bbq.struct(results_df).rename("normalized_results") return results_struct else: - return res + return normalized_bytes.rename("normalized_bytes") if isinstance(dst, str): dst = os.path.join(dst, "") @@ -713,22 +711,31 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore - # df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) res.cache() # to execute the udf + normalized_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + normalized_content_blobs = normalized_content_series.str.to_blob( + connection=connection + ) + if verbose: normalized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) results_df = bpd.DataFrame( - {"status": normalized_status_series, "content": dst} + { + "status": normalized_status_series, + "content": normalized_content_blobs, + } ) results_struct = bbq.struct(results_df).rename("normalized_results") return results_struct else: - return dst + return normalized_content_blobs.rename("normalized_content") def pdf_extract( self, @@ -786,7 +793,7 @@ def pdf_extract( extracted_content_series = res._apply_unary_op( ops.JSONValue(json_path="$.content") - ).rename("extracted_content") + ) if verbose: status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) @@ -796,7 +803,7 @@ def pdf_extract( results_struct = bbq.struct(results_df).rename("extracted_results") return results_struct else: - return extracted_content_series + return extracted_content_series.rename("extracted_content") def pdf_chunk( self, @@ -870,9 +877,8 @@ def pdf_chunk( res = self._df_apply_udf(df, pdf_chunk_udf) - chunked_content_series = bbq.json_extract_string_array(res, "$.content").rename( - "chunked_content" - ) + chunked_content_series = bbq.json_extract_string_array(res, "$.content") + if verbose: status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) results_df = bpd.DataFrame( @@ -881,7 +887,7 @@ def pdf_chunk( resultes_struct = bbq.struct(results_df).rename("chunked_results") return resultes_struct else: - return chunked_content_series + return chunked_content_series.rename("chunked_content") def audio_transcribe( self, @@ -939,6 +945,7 @@ def audio_transcribe( model_params={"generationConfig": {"temperature": 0.0}}, ) + transcribed_content_series = transcribed_results.struct.field("result").rename( "transcribed_content" ) @@ -954,4 +961,4 @@ def audio_transcribe( results_struct = bbq.struct(results_df).rename("transcription_results") return results_struct else: - return transcribed_content_series + return transcribed_content_series.rename("transcribed_content") diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 7a79fc200b..8acf0b519b 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -228,11 +228,13 @@ def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str assert actual.dtype == dtypes.BYTES_DTYPE +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, + verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection @@ -246,30 +248,48 @@ def test_blob_image_normalize_to_series( connection=bq_connection, engine="opencv", ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - # verify the files exist - assert not actual.blob.size().isna().any() + if verbose: + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + + # verify the files exist + assert not actual.blob.size().isna().any() +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], + verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, @@ -298,10 +318,7 @@ def test_blob_image_normalize_to_folder( assert not actual.blob.size().isna().any() -@pytest.mark.parametrize( - "verbose", - [True, False], -) +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_bq( images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool ): From 889cfe51f98c0a26314b9843db8120c4595ea8c2 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 31 Jul 2025 04:55:31 +0000 Subject: [PATCH 05/20] add eror handling for image functions --- bigframes/blob/_functions.py | 309 ++++++++++++++--------- tests/system/large/blob/test_function.py | 290 +++++++++++++++------ 2 files changed, 399 insertions(+), 200 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index bc7471e3e9..e076019ae2 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -118,7 +118,7 @@ def udf(self): return self._session.read_gbq_function(udf_name) -def exif_func(src_obj_ref_rt: str) -> str: +def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: import io import json @@ -126,25 +126,33 @@ def exif_func(src_obj_ref_rt: str) -> str: import requests from requests import adapters - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + result_dict = {"status": "", "content": "{}"} + try: + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - response = session.get(src_url, timeout=30) - bts = response.content + response = session.get(src_url, timeout=30) + bts = response.content - image = Image.open(io.BytesIO(bts)) - exif_data = image.getexif() - exif_dict = {} - if exif_data: - for tag, value in exif_data.items(): - tag_name = ExifTags.TAGS.get(tag, tag) - exif_dict[tag_name] = value + image = Image.open(io.BytesIO(bts)) + exif_data = image.getexif() + exif_dict = {} + if exif_data: + for tag, value in exif_data.items(): + tag_name = ExifTags.TAGS.get(tag, tag) + exif_dict[tag_name] = value + result_dict["content"] = json.dumps(exif_dict) + except Exception as e: + result_dict["status"] = str(e) - return json.dumps(exif_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] exif_func_def = FunctionDef(exif_func, ["pillow", "requests"]) @@ -152,82 +160,110 @@ def exif_func(src_obj_ref_rt: str) -> str: # Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string. def image_blur_func( - src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str + src_obj_ref_rt: str, + dst_obj_ref_rt: str, + ksize_x: int, + ksize_y: int, + ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters - ext = ext or ".jpeg" + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + ext = ext or ".jpeg" - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - response = session.get(src_url, timeout=30) - bts = response.content + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + response = session.get(src_url, timeout=30) + bts = response.content - bts = cv.imencode(ext, img_blurred)[1].tobytes() + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + bts = cv.imencode(ext, img_blurred)[1].tobytes() - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - return dst_obj_ref_rt + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) + + except Exception as e: + result_dict["status"] = str(e) + + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) def image_blur_to_bytes_func( - src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str -) -> bytes: + src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str, verbose: bool +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": b""} - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters - ext = ext or ".jpeg" + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + ext = ext or ".jpeg" - response = session.get(src_url, timeout=30) - bts = response.content + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - bts = cv.imencode(ext, img_blurred)[1].tobytes() + response = session.get(src_url, timeout=30) + bts = response.content - return bts + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + bts = cv.imencode(ext, img_blurred)[1].tobytes() + result_dict["content"] = bts + + except Exception as e: + result_dict["status"] = str(e) + + if verbose: + result_dict["content"] = base64.b64encode(result_dict["content"]).decode( + "utf-8" + ) + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_to_bytes_def = FunctionDef( @@ -243,49 +279,59 @@ def image_resize_func( fx: float, fy: float, ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters - ext = ext or ".jpeg" + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + ext = ext or ".jpeg" - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - response = session.get(src_url, timeout=30) - bts = response.content + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + response = session.get(src_url, timeout=30) + bts = response.content + + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + + bts = cv.imencode(ext, img_resized)[1].tobytes() - bts = cv.imencode(ext, img_resized)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_def = FunctionDef( @@ -300,31 +346,46 @@ def image_resize_to_bytes_func( fx: float, fy: float, ext: str, -) -> bytes: + verbose: bool, +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": b""} - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters - ext = ext or ".jpeg" + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + ext = ext or ".jpeg" - response = session.get(src_url, timeout=30) - bts = response.content + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - bts = cv.imencode(".jpeg", img_resized)[1].tobytes() + response = session.get(src_url, timeout=30) + bts = response.content + + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + bts = cv.imencode(".jpeg", img_resized)[1].tobytes() + result_dict["content"] = bts + + except Exception as e: + result_dict["status"] = str(e) - return bts + if verbose: + result_dict["content"] = base64.b64encode(result_dict["content"]).decode( + "utf-8" + ) + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_to_bytes_def = FunctionDef( @@ -339,6 +400,7 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: import json @@ -396,7 +458,10 @@ def image_normalize_func( except Exception as e: result_dict["status"] = str(e) - return json.dumps(result_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_def = FunctionDef( @@ -410,11 +475,14 @@ def image_normalize_to_bytes_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: - try: - import base64 - import json + import base64 + import json + result_dict = {"status": "", "content": ""} + + try: import cv2 as cv # type: ignore import numpy as np import requests @@ -446,14 +514,15 @@ def image_normalize_to_bytes_func( bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() content_b64 = base64.b64encode(bts).decode("utf-8") - result_dict = {"status": "", "content": content_b64} - result_json = json.dumps(result_dict) + result_dict["content"] = content_b64 except Exception as e: - result_dict = {"status": str(e), "content": b""} - result_json = json.dumps(result_dict) + result_dict["status"] = str(e) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_to_bytes_def = FunctionDef( diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 8acf0b519b..6033a02ad6 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -88,144 +88,257 @@ def test_blob_exif( ) +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_blur_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, + verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection ) actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=series, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=verbose ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + # verify the files exist assert not actual.blob.size().isna().any() +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_blur_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], + verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=images_output_folder, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (8, 8), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=verbose, ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() -def test_blob_image_blur_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): +@pytest.mark.parametrize("verbose", [True, False]) +def test_blob_image_blur_to_bq( + images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), connection=bq_connection, engine="opencv" + (8, 8), connection=bq_connection, engine="opencv", verbose=verbose ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + + else: + assert actual.dtype == dtypes.BYTES_DTYPE + + +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_resize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, + verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection ) actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), dst=series, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (200, 300), + dst=series, + connection=bq_connection, + engine="opencv", + verbose=verbose, ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + # verify the files exist assert not actual.blob.size().isna().any() +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_resize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], + verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), dst=images_output_folder, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (200, 300), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=verbose, ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + # verify the files exist assert not actual.blob.size().isna().any() -def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): +@pytest.mark.parametrize("verbose", [True, False]) +def test_blob_image_resize_to_bq( + images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), connection=bq_connection, engine="opencv" + (200, 300), connection=bq_connection, engine="opencv", verbose=verbose ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE + + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + + else: + assert actual.dtype == dtypes.BYTES_DTYPE @pytest.mark.parametrize("verbose", [True, False]) @@ -247,6 +360,7 @@ def test_blob_image_normalize_to_series( dst=series, connection=bq_connection, engine="opencv", + verbose=verbose, ) if verbose: @@ -298,21 +412,37 @@ def test_blob_image_normalize_to_folder( dst=images_output_folder, connection=bq_connection, engine="opencv", + verbose=verbose, ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() From 44c5be26c3c890cfb8987645db04dff1c87c64b2 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 31 Jul 2025 17:15:30 +0000 Subject: [PATCH 06/20] add error handling, mypy not clean --- bigframes/blob/_functions.py | 22 +++++++----- bigframes/operations/blob.py | 25 +++++++++++-- tests/system/large/blob/test_function.py | 45 +++++++++++++++--------- 3 files changed, 64 insertions(+), 28 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index e076019ae2..6d97276e33 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -263,7 +263,7 @@ def image_blur_to_bytes_func( ) return json.dumps(result_dict) else: - return result_dict["content"] + return base64.b64encode(result_dict["content"]).decode("utf-8") image_blur_to_bytes_def = FunctionDef( @@ -385,7 +385,7 @@ def image_resize_to_bytes_func( ) return json.dumps(result_dict) else: - return result_dict["content"] + return base64.b64encode(result_dict["content"]).decode("utf-8") image_resize_to_bytes_def = FunctionDef( @@ -531,7 +531,7 @@ def image_normalize_to_bytes_func( # Extracts all text from a PDF url -def pdf_extract_func(src_obj_ref_rt: str) -> str: +def pdf_extract_func(src_obj_ref_rt: str, verbose: bool) -> str: try: import io import json @@ -564,8 +564,10 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str: except Exception as e: result_dict = {"status": str(e), "content": ""} - result_json = json.dumps(result_dict) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] pdf_extract_def = FunctionDef( @@ -574,7 +576,9 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str: # Extracts text from a PDF url and chunks it simultaneously -def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str: +def pdf_chunk_func( + src_obj_ref_rt: str, chunk_size: int, overlap_size: int, verbose: bool +) -> str: try: import io import json @@ -620,8 +624,10 @@ def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> s except Exception as e: result_dict = {"status": str(e), "content": []} - result_json = json.dumps(result_dict) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return json.dumps(result_dict["content"]) pdf_chunk_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 733409463c..f2cae0e399 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -427,6 +427,12 @@ def image_blur( ).rename("blurred_content") if verbose: + blurred_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + blurred_content_series = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[blurred_content_b64_series] + ) blurred_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) @@ -436,7 +442,10 @@ def image_blur( results_struct = bbq.struct(results_df).rename("blurred_results") return results_struct else: - return blurred_content_series + blurred_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("blurred_bytes") + return blurred_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -552,16 +561,26 @@ def image_resize( ).rename("resized_content") if verbose: + resized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + resized_content_series = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[resized_content_b64_series] + ) + resized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) results_df = bpd.DataFrame( {"status": resized_status_series, "content": resized_content_series} ) - results_struct = bbq.strcut(results_df).rename("resized_results") + results_struct = bbq.struct(results_df).rename("resized_results") return results_struct else: - return resized_content_series + resized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("resized_bytes") + return resized_bytes if isinstance(dst, str): dst = os.path.join(dst, "") diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 6033a02ad6..e41104cf8c 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -53,39 +53,50 @@ def images_output_uris(images_output_folder: str) -> list[str]: @pytest.mark.parametrize( - "verbose, expected_type", + "verbose", [ - (True, "struct"), - (False, "json"), + (True), + (False), ], ) def test_blob_exif( bq_connection: str, session: bigframes.Session, verbose: bool, - expected_type: str, ): exif_image_df = session.from_glob_path( "gs://bigframes_blob_test/images_exif/*", name="blob_col", connection=bq_connection, - verbose=verbose, ) actual = exif_image_df["blob_col"].blob.exif( - engine="pillow", connection=bq_connection - ) - expected = bpd.Series( - ['{"ExifOffset": 47, "Make": "MyCamera"}'], - session=session, - dtype=dtypes.JSON_DTYPE, - ) - pd.testing.assert_series_equal( - actual.to_pandas(), - expected.to_pandas(), - check_dtype=False, - check_index_type=False, + engine="pillow", connection=bq_connection, verbose=verbose ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.JSON_DTYPE + + else: + expected = bpd.Series( + ['{"ExifOffset": 47, "Make": "MyCamera"}'], + session=session, + dtype=dtypes.JSON_DTYPE, + ) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + check_dtype=False, + check_index_type=False, + ) @pytest.mark.parametrize("verbose", [True, False]) From ac0a96b261710d452e0527b540751ae9f0a4c7c8 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 22 Aug 2025 20:05:35 +0000 Subject: [PATCH 07/20] clean mypy --- bigframes/blob/_functions.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 6d97276e33..d0e1b67c3b 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -229,7 +229,8 @@ def image_blur_to_bytes_func( import base64 import json - result_dict = {"status": "", "content": b""} + status = "" + content = b"" try: import cv2 as cv # type: ignore @@ -251,19 +252,16 @@ def image_blur_to_bytes_func( nparr = np.frombuffer(bts, np.uint8) img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - bts = cv.imencode(ext, img_blurred)[1].tobytes() - result_dict["content"] = bts + content = cv.imencode(ext, img_blurred)[1].tobytes() except Exception as e: - result_dict["status"] = str(e) + status = str(e) if verbose: - result_dict["content"] = base64.b64encode(result_dict["content"]).decode( - "utf-8" - ) - return json.dumps(result_dict) + encoded_content = base64.b64encode(content).decode("utf-8") + return json.dumps({"status": status, "content": encoded_content}) else: - return base64.b64encode(result_dict["content"]).decode("utf-8") + return base64.b64encode(content).decode("utf-8") image_blur_to_bytes_def = FunctionDef( @@ -351,7 +349,8 @@ def image_resize_to_bytes_func( import base64 import json - result_dict = {"status": "", "content": b""} + status = "" + content = b"" try: import cv2 as cv # type: ignore @@ -373,19 +372,16 @@ def image_resize_to_bytes_func( nparr = np.frombuffer(bts, np.uint8) img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - bts = cv.imencode(".jpeg", img_resized)[1].tobytes() - result_dict["content"] = bts + content = cv.imencode(".jpeg", img_resized)[1].tobytes() except Exception as e: - result_dict["status"] = str(e) + status = str(e) if verbose: - result_dict["content"] = base64.b64encode(result_dict["content"]).decode( - "utf-8" - ) - return json.dumps(result_dict) + encoded_content = base64.b64encode(content).decode("utf-8") + return json.dumps({"status": status, "content": encoded_content}) else: - return base64.b64encode(result_dict["content"]).decode("utf-8") + return base64.b64encode(content).decode("utf-8") image_resize_to_bytes_def = FunctionDef( From bbe1120072ec471ddad390c6a81ef62218d5790e Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 22 Aug 2025 22:58:57 +0000 Subject: [PATCH 08/20] image function still have bug --- bigframes/blob/_functions.py | 49 ++++++------------------ bigframes/operations/blob.py | 1 + tests/system/large/blob/test_function.py | 24 ++---------- 3 files changed, 16 insertions(+), 58 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index d0e1b67c3b..586164e172 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -26,6 +26,7 @@ float: "FLOAT64", str: "STRING", bytes: "BYTES", + bool: "BOOL", } @@ -118,7 +119,7 @@ def udf(self): return self._session.read_gbq_function(udf_name) -def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: +def exif_func(src_obj_ref_rt: str) -> str: import io import json @@ -149,10 +150,7 @@ def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: except Exception as e: result_dict["status"] = str(e) - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] + return json.dumps(result_dict) exif_func_def = FunctionDef(exif_func, ["pillow", "requests"]) @@ -165,7 +163,6 @@ def image_blur_func( ksize_x: int, ksize_y: int, ext: str, - verbose: bool, ) -> str: import json @@ -214,17 +211,14 @@ def image_blur_func( except Exception as e: result_dict["status"] = str(e) - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] + return json.dumps(result_dict) image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) def image_blur_to_bytes_func( - src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str, verbose: bool + src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str ) -> str: import base64 import json @@ -257,11 +251,8 @@ def image_blur_to_bytes_func( except Exception as e: status = str(e) - if verbose: - encoded_content = base64.b64encode(content).decode("utf-8") - return json.dumps({"status": status, "content": encoded_content}) - else: - return base64.b64encode(content).decode("utf-8") + encoded_content = base64.b64encode(content).decode("utf-8") + return json.dumps({"status": status, "content": encoded_content}) image_blur_to_bytes_def = FunctionDef( @@ -277,7 +268,6 @@ def image_resize_func( fx: float, fy: float, ext: str, - verbose: bool, ) -> str: import json @@ -326,10 +316,7 @@ def image_resize_func( except Exception as e: result_dict["status"] = str(e) - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] + return json.dumps(result_dict) image_resize_def = FunctionDef( @@ -344,7 +331,6 @@ def image_resize_to_bytes_func( fx: float, fy: float, ext: str, - verbose: bool, ) -> str: import base64 import json @@ -377,11 +363,8 @@ def image_resize_to_bytes_func( except Exception as e: status = str(e) - if verbose: - encoded_content = base64.b64encode(content).decode("utf-8") - return json.dumps({"status": status, "content": encoded_content}) - else: - return base64.b64encode(content).decode("utf-8") + encoded_content = base64.b64encode(content).decode("utf-8") + return json.dumps({"status": status, "content": encoded_content}) image_resize_to_bytes_def = FunctionDef( @@ -396,7 +379,6 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, - verbose: bool, ) -> str: import json @@ -454,10 +436,7 @@ def image_normalize_func( except Exception as e: result_dict["status"] = str(e) - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] + return json.dumps(result_dict) image_normalize_def = FunctionDef( @@ -471,7 +450,6 @@ def image_normalize_to_bytes_func( beta: float, norm_type: str, ext: str, - verbose: bool, ) -> str: import base64 import json @@ -515,10 +493,7 @@ def image_normalize_to_bytes_func( except Exception as e: result_dict["status"] = str(e) - if verbose: - return json.dumps(result_dict) - else: - return result_dict["content"] + return json.dumps(result_dict) image_normalize_to_bytes_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index f2cae0e399..cb943bbf18 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -337,6 +337,7 @@ def exif( connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() + df["verbose"] = verbose exif_udf = blob_func.TransformFunction( blob_func.exif_func_def, diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index e41104cf8c..3be816d8ad 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -52,13 +52,7 @@ def images_output_uris(images_output_folder: str) -> list[str]: ] -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_exif( bq_connection: str, session: bigframes.Session, @@ -490,13 +484,7 @@ def test_blob_image_normalize_to_bq( assert actual.dtype == dtypes.BYTES_DTYPE -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_pdf_extract( pdf_mm_df: bpd.DataFrame, verbose: bool, @@ -539,13 +527,7 @@ def test_blob_pdf_extract( ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in extracted text. " -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: str): actual = ( pdf_mm_df["pdf"] From e933e81fe3371f4fcaf9a4c5873a8a43b03cb100 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 25 Aug 2025 22:39:30 +0000 Subject: [PATCH 09/20] test fix --- bigframes/blob/_functions.py | 49 ++++++++++++++++---- bigframes/operations/blob.py | 89 +++++++++++++++++------------------- 2 files changed, 83 insertions(+), 55 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 586164e172..2a11974b8d 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -119,7 +119,7 @@ def udf(self): return self._session.read_gbq_function(udf_name) -def exif_func(src_obj_ref_rt: str) -> str: +def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: import io import json @@ -145,12 +145,18 @@ def exif_func(src_obj_ref_rt: str) -> str: if exif_data: for tag, value in exif_data.items(): tag_name = ExifTags.TAGS.get(tag, tag) + # Pillow might return bytes, which are not serializable. + if isinstance(value, bytes): + value = value.decode("utf-8", "replace") exif_dict[tag_name] = value result_dict["content"] = json.dumps(exif_dict) except Exception as e: result_dict["status"] = str(e) - return json.dumps(result_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] exif_func_def = FunctionDef(exif_func, ["pillow", "requests"]) @@ -163,6 +169,7 @@ def image_blur_func( ksize_x: int, ksize_y: int, ext: str, + verbose: bool, ) -> str: import json @@ -211,14 +218,17 @@ def image_blur_func( except Exception as e: result_dict["status"] = str(e) - return json.dumps(result_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) def image_blur_to_bytes_func( - src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str + src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str, verbose: bool ) -> str: import base64 import json @@ -252,7 +262,11 @@ def image_blur_to_bytes_func( status = str(e) encoded_content = base64.b64encode(content).decode("utf-8") - return json.dumps({"status": status, "content": encoded_content}) + result_dict = {"status": status, "content": encoded_content} + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_to_bytes_def = FunctionDef( @@ -268,6 +282,7 @@ def image_resize_func( fx: float, fy: float, ext: str, + verbose: bool, ) -> str: import json @@ -316,7 +331,10 @@ def image_resize_func( except Exception as e: result_dict["status"] = str(e) - return json.dumps(result_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_def = FunctionDef( @@ -331,6 +349,7 @@ def image_resize_to_bytes_func( fx: float, fy: float, ext: str, + verbose: bool, ) -> str: import base64 import json @@ -364,7 +383,11 @@ def image_resize_to_bytes_func( status = str(e) encoded_content = base64.b64encode(content).decode("utf-8") - return json.dumps({"status": status, "content": encoded_content}) + result_dict = {"status": status, "content": encoded_content} + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_to_bytes_def = FunctionDef( @@ -379,6 +402,7 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: import json @@ -436,7 +460,10 @@ def image_normalize_func( except Exception as e: result_dict["status"] = str(e) - return json.dumps(result_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_def = FunctionDef( @@ -450,6 +477,7 @@ def image_normalize_to_bytes_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: import base64 import json @@ -493,7 +521,10 @@ def image_normalize_to_bytes_func( except Exception as e: result_dict["status"] = str(e) - return json.dumps(result_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_to_bytes_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index cb943bbf18..94fcec8f81 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -350,11 +350,10 @@ def exif( res = self._df_apply_udf(df, exif_udf) - exif_content_series = bbq.parse_json( - res._apply_unary_op(ops.JSONValue(json_path="$.content")) - ).rename("exif_content") - if verbose: + exif_content_series = bbq.parse_json( + res._apply_unary_op(ops.JSONValue(json_path="$.content")) + ).rename("exif_content") exif_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) @@ -364,7 +363,7 @@ def exif( results_struct = bbq.struct(results_df).rename("exif_results") return results_struct else: - return exif_content_series + return bbq.parse_json(res) def image_blur( self, @@ -421,12 +420,9 @@ def image_blur( df["ksize_x"], df["ksize_y"] = ksize df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_blur_udf) - blurred_content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ).rename("blurred_content") - if verbose: blurred_content_b64_series = res._apply_unary_op( ops.JSONValue(json_path="$.content") @@ -472,6 +468,7 @@ def image_blur( df = df.join(dst_rt, how="outer") df["ksize_x"], df["ksize_y"] = ksize df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_blur_udf) res.cache() # to execute the udf @@ -480,13 +477,15 @@ def image_blur( blurred_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) + content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + dst_blobs = content_series.str.to_blob(connection=connection) results_df = bpd.DataFrame( - {"status": blurred_status_series, "content": dst} + {"status": blurred_status_series, "content": dst_blobs} ) results_struct = bbq.struct(results_df).rename("blurred_results") return results_struct else: - return dst + return res.str.to_blob(connection=connection) def image_resize( self, @@ -555,12 +554,9 @@ def image_resize( df["dsize_x"], df["dsizye_y"] = dsize df["fx"], df["fy"] = fx, fy df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_resize_udf) - resized_content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ).rename("resized_content") - if verbose: resized_content_b64_series = res._apply_unary_op( ops.JSONValue(json_path="$.content") @@ -608,6 +604,7 @@ def image_resize( df["dsize_x"], df["dsizye_y"] = dsize df["fx"], df["fy"] = fx, fy df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_resize_udf) res.cache() # to execute the udf @@ -616,13 +613,15 @@ def image_resize( resized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) + content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + dst_blobs = content_series.str.to_blob(connection=connection) results_df = bpd.DataFrame( - {"status": resized_status_series, "content": dst} + {"status": resized_status_series, "content": dst_blobs} ) results_struct = bbq.struct(results_df).rename("resized_results") return results_struct else: - return dst + return res.str.to_blob(connection=connection) def image_normalize( self, @@ -685,15 +684,16 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) - normalized_content_b64_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - normalized_bytes = bbq.sql_scalar( - "FROM_BASE64({0})", columns=[normalized_content_b64_series] - ) if verbose: + normalized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[normalized_content_b64_series] + ) normalized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) @@ -703,7 +703,10 @@ def image_normalize( results_struct = bbq.struct(results_df).rename("normalized_results") return results_struct else: - return normalized_bytes.rename("normalized_bytes") + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("normalized_bytes") + return normalized_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -731,31 +734,27 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) res.cache() # to execute the udf - normalized_content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) - normalized_content_blobs = normalized_content_series.str.to_blob( - connection=connection - ) - if verbose: normalized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) + content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + dst_blobs = content_series.str.to_blob(connection=connection) results_df = bpd.DataFrame( { "status": normalized_status_series, - "content": normalized_content_blobs, + "content": dst_blobs, } ) results_struct = bbq.struct(results_df).rename("normalized_results") return results_struct else: - return normalized_content_blobs.rename("normalized_content") + return res.str.to_blob(connection=connection) def pdf_extract( self, @@ -807,15 +806,14 @@ def pdf_extract( container_memory=container_memory, ).udf() - src_rt = self.get_runtime_json_str(mode="R") - - res = src_rt.apply(pdf_extract_udf) - - extracted_content_series = res._apply_unary_op( - ops.JSONValue(json_path="$.content") - ) + df = self.get_runtime_json_str(mode="R").to_frame() + df["verbose"] = verbose + res = self._df_apply_udf(df, pdf_extract_udf) if verbose: + extracted_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) results_df = bpd.DataFrame( {"status": status_series, "content": extracted_content_series} @@ -823,7 +821,7 @@ def pdf_extract( results_struct = bbq.struct(results_df).rename("extracted_results") return results_struct else: - return extracted_content_series.rename("extracted_content") + return res.rename("extracted_content") def pdf_chunk( self, @@ -890,16 +888,15 @@ def pdf_chunk( container_memory=container_memory, ).udf() - src_rt = self.get_runtime_json_str(mode="R") - df = src_rt.to_frame() + df = self.get_runtime_json_str(mode="R").to_frame() df["chunk_size"] = chunk_size df["overlap_size"] = overlap_size + df["verbose"] = verbose res = self._df_apply_udf(df, pdf_chunk_udf) - chunked_content_series = bbq.json_extract_string_array(res, "$.content") - if verbose: + chunked_content_series = bbq.json_extract_string_array(res, "$.content") status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) results_df = bpd.DataFrame( {"status": status_series, "content": chunked_content_series} @@ -907,7 +904,7 @@ def pdf_chunk( resultes_struct = bbq.struct(results_df).rename("chunked_results") return resultes_struct else: - return chunked_content_series.rename("chunked_content") + return bbq.json_extract_string_array(res, "$").rename("chunked_content") def audio_transcribe( self, From f6f069f09178d6b92b6764e66f65f7e8426f943e Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 17 Sep 2025 06:51:54 +0000 Subject: [PATCH 10/20] fix testcase --- bigframes/blob/_functions.py | 26 ++++++++------ bigframes/dataframe.py | 8 ++--- bigframes/operations/blob.py | 28 +++++++-------- tests/system/large/blob/test_function.py | 43 ++++++++++++------------ 4 files changed, 53 insertions(+), 52 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 2a11974b8d..99f5c5a95e 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -215,6 +215,8 @@ def image_blur_func( timeout=30, ) + result_dict["content"] = dst_obj_ref_rt_json["objectref"]["uri"] + except Exception as e: result_dict["status"] = str(e) @@ -233,8 +235,7 @@ def image_blur_to_bytes_func( import base64 import json - status = "" - content = b"" + result_dict = {"status": "", "content": ""} try: import cv2 as cv # type: ignore @@ -258,11 +259,12 @@ def image_blur_to_bytes_func( img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) content = cv.imencode(ext, img_blurred)[1].tobytes() + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict["content"] = encoded_content + except Exception as e: - status = str(e) + result_dict["status"] = str(e) - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict = {"status": status, "content": encoded_content} if verbose: return json.dumps(result_dict) else: @@ -328,6 +330,8 @@ def image_resize_func( timeout=30, ) + result_dict["content"] = dst_obj_ref_rt_json["objectref"]["uri"] + except Exception as e: result_dict["status"] = str(e) @@ -354,8 +358,7 @@ def image_resize_to_bytes_func( import base64 import json - status = "" - content = b"" + result_dict = {"status": "", "content": ""} try: import cv2 as cv # type: ignore @@ -379,11 +382,12 @@ def image_resize_to_bytes_func( img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) content = cv.imencode(".jpeg", img_resized)[1].tobytes() + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict["content"] = encoded_content + except Exception as e: - status = str(e) + result_dict["status"] = str(e) - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict = {"status": status, "content": encoded_content} if verbose: return json.dumps(result_dict) else: @@ -457,6 +461,8 @@ def image_normalize_func( timeout=30, ) + result_dict["content"] = dst_obj_ref_rt_json["objectref"]["uri"] + except Exception as e: result_dict["status"] = str(e) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index bc2bbb963b..3f187ab497 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -95,12 +95,8 @@ import bigframes.session - SingleItemValue = Union[ - bigframes.series.Series, int, float, str, pandas.Timedelta, Callable - ] - MultiItemValue = Union[ - "DataFrame", Sequence[int | float | str | pandas.Timedelta | Callable] - ] + SingleItemValue = Union[bigframes.series.Series, int, float, str, Callable] + MultiItemValue = Union["DataFrame", Sequence[Union[int, float, str, Callable]]] LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 94fcec8f81..f1b94fb5ba 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -15,6 +15,7 @@ from __future__ import annotations import os +import typing from typing import cast, Literal, Optional, Union import warnings @@ -376,7 +377,10 @@ def image_blur( container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", verbose: bool = False, - ) -> bigframes.series.Series: + ) -> typing.Union[ + bigframes.series.Series, + typing.Tuple[bigframes.series.Series, bigframes.series.Series], + ]: """Blurs images. Args: @@ -479,11 +483,8 @@ def image_blur( ) content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) dst_blobs = content_series.str.to_blob(connection=connection) - results_df = bpd.DataFrame( - {"status": blurred_status_series, "content": dst_blobs} - ) - results_struct = bbq.struct(results_df).rename("blurred_results") - return results_struct + + return dst_blobs, blurred_status_series else: return res.str.to_blob(connection=connection) @@ -636,7 +637,10 @@ def image_normalize( container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", verbose: bool = False, - ) -> bigframes.series.Series: + ) -> typing.Union[ + bigframes.series.Series, + typing.Tuple[bigframes.series.Series, bigframes.series.Series], + ]: """Normalize images. Args: @@ -745,14 +749,8 @@ def image_normalize( ) content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) dst_blobs = content_series.str.to_blob(connection=connection) - results_df = bpd.DataFrame( - { - "status": normalized_status_series, - "content": dst_blobs, - } - ) - results_struct = bbq.struct(results_df).rename("normalized_results") - return results_struct + + return dst_blobs, normalized_status_series else: return res.str.to_blob(connection=connection) diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 3be816d8ad..2c283ea09f 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -121,6 +121,7 @@ def test_blob_image_blur_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -132,14 +133,13 @@ def test_blob_image_blur_to_series( } ) pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), + actual.blob.to_frame().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -169,6 +169,7 @@ def test_blob_image_blur_to_folder( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -180,14 +181,13 @@ def test_blob_image_blur_to_folder( } ) pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), + actual.blob.to_frame().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -249,6 +249,7 @@ def test_blob_image_resize_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -260,14 +261,13 @@ def test_blob_image_resize_to_series( } ) pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), + actual.blob.to_frame().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -298,6 +298,7 @@ def test_blob_image_resize_to_folder( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -309,14 +310,13 @@ def test_blob_image_resize_to_folder( } ) pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), + actual.blob.to_frame().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -369,7 +369,6 @@ def test_blob_image_normalize_to_series( ) if verbose: - assert hasattr(actual, "struct") actual_exploded = actual.struct.explode() assert "status" in actual_exploded.columns @@ -381,6 +380,7 @@ def test_blob_image_normalize_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -392,7 +392,7 @@ def test_blob_image_normalize_to_series( } ) pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), + actual.blob.to_frame().to_pandas(), expected_df, check_dtype=False, check_index_type=False, @@ -432,6 +432,7 @@ def test_blob_image_normalize_to_folder( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -443,14 +444,14 @@ def test_blob_image_normalize_to_folder( } ) pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), + actual.blob.to_frame().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) From 5778a412b9e4e11ea3fdf65ba762775d8dabd045 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 17 Sep 2025 07:57:30 +0000 Subject: [PATCH 11/20] Revert "fix testcase" This reverts commit 9a5bcd241990f74f4c4a06e9e4edd6713b8985a5. --- bigframes/blob/_functions.py | 26 ++++++-------- bigframes/dataframe.py | 2 +- bigframes/operations/blob.py | 28 ++++++++------- tests/system/large/blob/test_function.py | 43 ++++++++++++------------ 4 files changed, 47 insertions(+), 52 deletions(-) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 99f5c5a95e..2a11974b8d 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -215,8 +215,6 @@ def image_blur_func( timeout=30, ) - result_dict["content"] = dst_obj_ref_rt_json["objectref"]["uri"] - except Exception as e: result_dict["status"] = str(e) @@ -235,7 +233,8 @@ def image_blur_to_bytes_func( import base64 import json - result_dict = {"status": "", "content": ""} + status = "" + content = b"" try: import cv2 as cv # type: ignore @@ -259,12 +258,11 @@ def image_blur_to_bytes_func( img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) content = cv.imencode(ext, img_blurred)[1].tobytes() - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict["content"] = encoded_content - except Exception as e: - result_dict["status"] = str(e) + status = str(e) + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict = {"status": status, "content": encoded_content} if verbose: return json.dumps(result_dict) else: @@ -330,8 +328,6 @@ def image_resize_func( timeout=30, ) - result_dict["content"] = dst_obj_ref_rt_json["objectref"]["uri"] - except Exception as e: result_dict["status"] = str(e) @@ -358,7 +354,8 @@ def image_resize_to_bytes_func( import base64 import json - result_dict = {"status": "", "content": ""} + status = "" + content = b"" try: import cv2 as cv # type: ignore @@ -382,12 +379,11 @@ def image_resize_to_bytes_func( img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) content = cv.imencode(".jpeg", img_resized)[1].tobytes() - encoded_content = base64.b64encode(content).decode("utf-8") - result_dict["content"] = encoded_content - except Exception as e: - result_dict["status"] = str(e) + status = str(e) + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict = {"status": status, "content": encoded_content} if verbose: return json.dumps(result_dict) else: @@ -461,8 +457,6 @@ def image_normalize_func( timeout=30, ) - result_dict["content"] = dst_obj_ref_rt_json["objectref"]["uri"] - except Exception as e: result_dict["status"] = str(e) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3f187ab497..f469bb62ac 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -96,7 +96,7 @@ import bigframes.session SingleItemValue = Union[bigframes.series.Series, int, float, str, Callable] - MultiItemValue = Union["DataFrame", Sequence[Union[int, float, str, Callable]]] + MultiItemValue = Union["DataFrame", Sequence[int | float | str | Callable]] LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index f1b94fb5ba..94fcec8f81 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -15,7 +15,6 @@ from __future__ import annotations import os -import typing from typing import cast, Literal, Optional, Union import warnings @@ -377,10 +376,7 @@ def image_blur( container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", verbose: bool = False, - ) -> typing.Union[ - bigframes.series.Series, - typing.Tuple[bigframes.series.Series, bigframes.series.Series], - ]: + ) -> bigframes.series.Series: """Blurs images. Args: @@ -483,8 +479,11 @@ def image_blur( ) content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) dst_blobs = content_series.str.to_blob(connection=connection) - - return dst_blobs, blurred_status_series + results_df = bpd.DataFrame( + {"status": blurred_status_series, "content": dst_blobs} + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct else: return res.str.to_blob(connection=connection) @@ -637,10 +636,7 @@ def image_normalize( container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", verbose: bool = False, - ) -> typing.Union[ - bigframes.series.Series, - typing.Tuple[bigframes.series.Series, bigframes.series.Series], - ]: + ) -> bigframes.series.Series: """Normalize images. Args: @@ -749,8 +745,14 @@ def image_normalize( ) content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) dst_blobs = content_series.str.to_blob(connection=connection) - - return dst_blobs, normalized_status_series + results_df = bpd.DataFrame( + { + "status": normalized_status_series, + "content": dst_blobs, + } + ) + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct else: return res.str.to_blob(connection=connection) diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 2c283ea09f..3be816d8ad 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -121,7 +121,6 @@ def test_blob_image_blur_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") - assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -133,13 +132,14 @@ def test_blob_image_blur_to_series( } ) pd.testing.assert_frame_equal( - actual.blob.to_frame().to_pandas(), + actual.struct.explode().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -169,7 +169,6 @@ def test_blob_image_blur_to_folder( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") - assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -181,13 +180,14 @@ def test_blob_image_blur_to_folder( } ) pd.testing.assert_frame_equal( - actual.blob.to_frame().to_pandas(), + actual.struct.explode().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -249,7 +249,6 @@ def test_blob_image_resize_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") - assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -261,13 +260,14 @@ def test_blob_image_resize_to_series( } ) pd.testing.assert_frame_equal( - actual.blob.to_frame().to_pandas(), + actual.struct.explode().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -298,7 +298,6 @@ def test_blob_image_resize_to_folder( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") - assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -310,13 +309,14 @@ def test_blob_image_resize_to_folder( } ) pd.testing.assert_frame_equal( - actual.blob.to_frame().to_pandas(), + actual.struct.explode().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) @@ -369,6 +369,7 @@ def test_blob_image_normalize_to_series( ) if verbose: + assert hasattr(actual, "struct") actual_exploded = actual.struct.explode() assert "status" in actual_exploded.columns @@ -380,7 +381,6 @@ def test_blob_image_normalize_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") - assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -392,7 +392,7 @@ def test_blob_image_normalize_to_series( } ) pd.testing.assert_frame_equal( - actual.blob.to_frame().to_pandas(), + actual.struct.explode().to_pandas(), expected_df, check_dtype=False, check_index_type=False, @@ -432,7 +432,6 @@ def test_blob_image_normalize_to_folder( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") - assert not content_series.blob.size().isna().any() else: expected_df = pd.DataFrame( @@ -444,14 +443,14 @@ def test_blob_image_normalize_to_folder( } ) pd.testing.assert_frame_equal( - actual.blob.to_frame().to_pandas(), + actual.struct.explode().to_pandas(), expected_df, check_dtype=False, check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) From 0f95d287534bab72371382b1433a622a9047f1dc Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 18 Sep 2025 07:27:59 +0000 Subject: [PATCH 12/20] Fix: Correctly handle destination blob series in image transformation functions --- bigframes/operations/blob.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 94fcec8f81..d55b451ddb 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -477,15 +477,13 @@ def image_blur( blurred_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) - content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) - dst_blobs = content_series.str.to_blob(connection=connection) results_df = bpd.DataFrame( - {"status": blurred_status_series, "content": dst_blobs} + {"status": blurred_status_series, "content": dst} ) results_struct = bbq.struct(results_df).rename("blurred_results") return results_struct else: - return res.str.to_blob(connection=connection) + return dst def image_resize( self, @@ -613,15 +611,13 @@ def image_resize( resized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) - content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) - dst_blobs = content_series.str.to_blob(connection=connection) results_df = bpd.DataFrame( - {"status": resized_status_series, "content": dst_blobs} + {"status": resized_status_series, "content": dst} ) results_struct = bbq.struct(results_df).rename("resized_results") return results_struct else: - return res.str.to_blob(connection=connection) + return dst def image_normalize( self, @@ -743,18 +739,16 @@ def image_normalize( normalized_status_series = res._apply_unary_op( ops.JSONValue(json_path="$.status") ) - content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) - dst_blobs = content_series.str.to_blob(connection=connection) results_df = bpd.DataFrame( { "status": normalized_status_series, - "content": dst_blobs, + "content": dst, } ) results_struct = bbq.struct(results_df).rename("normalized_results") return results_struct else: - return res.str.to_blob(connection=connection) + return dst def pdf_extract( self, From 62629912918436fb1a64c5e497dfa7cb08209ac2 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 2 Oct 2025 03:32:57 +0000 Subject: [PATCH 13/20] fix test --- bigframes/operations/blob.py | 16 ++++++++++++---- tests/system/large/blob/test_function.py | 9 +++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index d55b451ddb..a07bd303e4 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -22,7 +22,7 @@ import pandas as pd import requests -from bigframes import clients +from bigframes import clients, dtypes from bigframes.core import log_adapter import bigframes.dataframe import bigframes.exceptions as bfe @@ -80,9 +80,17 @@ def metadata(self) -> bigframes.series.Series: Returns: bigframes.series.Series: JSON metadata of the Blob. Contains fields: content_type, md5_hash, size and updated(time).""" - details_json = self._apply_unary_op(ops.obj_fetch_metadata_op).struct.field( - "details" - ) + series_to_check = bigframes.series.Series(self._block) + # Check if it's a struct series from a verbose operation + if ( + dtypes.is_struct_like(series_to_check.dtype) + and "content" in series_to_check.dtype.names + and dtypes.is_blob_like(series_to_check.dtype.field("content").dtype) + ): + series_to_check = series_to_check.struct.field("content") + details_json = series_to_check._apply_unary_op( + ops.obj_fetch_metadata_op + ).struct.field("details") import bigframes.bigquery as bbq return bbq.json_extract(details_json, "$.gcs_metadata").rename("metadata") diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 3be816d8ad..6be3ba5b20 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -120,8 +120,6 @@ def test_blob_image_blur_to_series( content_series = actual_exploded["content"] # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - else: expected_df = pd.DataFrame( { @@ -299,6 +297,9 @@ def test_blob_image_resize_to_folder( # Content should be blob objects for GCS destination assert hasattr(content_series, "blob") + # verify the files exist + assert not content_series.blob.size().isna().any() + else: expected_df = pd.DataFrame( { @@ -315,8 +316,8 @@ def test_blob_image_resize_to_folder( check_index_type=False, ) - # verify the files exist - assert not actual.blob.size().isna().any() + # verify the files exist + assert not actual.blob.size().isna().any() @pytest.mark.parametrize("verbose", [True, False]) From 8fb07c91e4da09e816939ca4c050d352e01bd6bc Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 9 Oct 2025 07:20:09 +0000 Subject: [PATCH 14/20] fix the bug for test_blob_image_*_to series and to_folder --- bigframes/operations/blob.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index a07bd303e4..0addbd7b92 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -82,12 +82,15 @@ def metadata(self) -> bigframes.series.Series: bigframes.series.Series: JSON metadata of the Blob. Contains fields: content_type, md5_hash, size and updated(time).""" series_to_check = bigframes.series.Series(self._block) # Check if it's a struct series from a verbose operation - if ( - dtypes.is_struct_like(series_to_check.dtype) - and "content" in series_to_check.dtype.names - and dtypes.is_blob_like(series_to_check.dtype.field("content").dtype) - ): - series_to_check = series_to_check.struct.field("content") + if dtypes.is_struct_like(series_to_check.dtype): + pyarrow_dtype = series_to_check.dtype.pyarrow_dtype + if "content" in pyarrow_dtype.names: + content_field_type = pyarrow_dtype.field("content").type + content_bf_type = dtypes.arrow_dtype_to_bigframes_dtype( + content_field_type + ) + if content_bf_type == dtypes.OBJ_REF_DTYPE: + series_to_check = series_to_check.struct.field("content") details_json = series_to_check._apply_unary_op( ops.obj_fetch_metadata_op ).struct.field("details") From caf49487bdecd89c02930d94ae2edff67c11d5fa Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 9 Oct 2025 07:36:13 +0000 Subject: [PATCH 15/20] fix test_blob_image_resize* and test_blob_image_normalize* --- bigframes/operations/blob.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 0addbd7b92..01b39633e4 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -489,7 +489,12 @@ def image_blur( ops.JSONValue(json_path="$.status") ) results_df = bpd.DataFrame( - {"status": blurred_status_series, "content": dst} + { + "status": blurred_status_series, + "content": dst.blob.uri().str.to_blob( + connection=self._resolve_connection(connection) + ), + } ) results_struct = bbq.struct(results_df).rename("blurred_results") return results_struct @@ -623,7 +628,12 @@ def image_resize( ops.JSONValue(json_path="$.status") ) results_df = bpd.DataFrame( - {"status": resized_status_series, "content": dst} + { + "status": resized_status_series, + "content": dst.blob.uri().str.to_blob( + connection=self._resolve_connection(connection) + ), + } ) results_struct = bbq.struct(results_df).rename("resized_results") return results_struct @@ -753,7 +763,9 @@ def image_normalize( results_df = bpd.DataFrame( { "status": normalized_status_series, - "content": dst, + "content": dst.blob.uri().str.to_blob( + connection=self._resolve_connection(connection) + ), } ) results_struct = bbq.struct(results_df).rename("normalized_results") From dfdf84ba0280292f1b6f86d36ee0a6ab8b30da3c Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 9 Oct 2025 08:06:40 +0000 Subject: [PATCH 16/20] fix lint error --- tests/system/large/blob/test_function.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 6be3ba5b20..8bb7e3fbfc 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -118,7 +118,6 @@ def test_blob_image_blur_to_series( status_series = actual_exploded["status"] assert status_series.dtype == dtypes.STRING_DTYPE - content_series = actual_exploded["content"] # Content should be blob objects for GCS destination else: expected_df = pd.DataFrame( From 31651ce0c6a47451a9cf6c3d24f0b546ae998f95 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 9 Oct 2025 10:10:37 +0000 Subject: [PATCH 17/20] fix presubmit --- bigframes/operations/blob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 01b39633e4..d55e4121c3 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -84,7 +84,7 @@ def metadata(self) -> bigframes.series.Series: # Check if it's a struct series from a verbose operation if dtypes.is_struct_like(series_to_check.dtype): pyarrow_dtype = series_to_check.dtype.pyarrow_dtype - if "content" in pyarrow_dtype.names: + if "content" in [field.name for field in pyarrow_dtype]: content_field_type = pyarrow_dtype.field("content").type content_bf_type = dtypes.arrow_dtype_to_bigframes_dtype( content_field_type From 2c5d27bee233191420df37901e29b7b6a9a89a77 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 9 Oct 2025 10:18:47 +0000 Subject: [PATCH 18/20] fix mypy --- bigframes/dataframe.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index f469bb62ac..bc2bbb963b 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -95,8 +95,12 @@ import bigframes.session - SingleItemValue = Union[bigframes.series.Series, int, float, str, Callable] - MultiItemValue = Union["DataFrame", Sequence[int | float | str | Callable]] + SingleItemValue = Union[ + bigframes.series.Series, int, float, str, pandas.Timedelta, Callable + ] + MultiItemValue = Union[ + "DataFrame", Sequence[int | float | str | pandas.Timedelta | Callable] + ] LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] From 53528c7ceb88c059d19781ee21699cdb9887656b Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 10 Oct 2025 03:36:00 +0000 Subject: [PATCH 19/20] cosmetic change --- bigframes/operations/blob.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index d55e4121c3..4da9bfee82 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -979,7 +979,6 @@ def audio_transcribe( model_params={"generationConfig": {"temperature": 0.0}}, ) - transcribed_content_series = transcribed_results.struct.field("result").rename( "transcribed_content" ) From 07e11d77611838bb47b5d9f5f364d5ba88cc4cb6 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Tue, 14 Oct 2025 19:38:09 +0000 Subject: [PATCH 20/20] refactor testcase --- tests/system/large/blob/test_function.py | 820 ++++++++++++++--------- 1 file changed, 519 insertions(+), 301 deletions(-) diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index 8bb7e3fbfc..7963fabd0b 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -52,11 +52,9 @@ def images_output_uris(images_output_folder: str) -> list[str]: ] -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_exif( bq_connection: str, session: bigframes.Session, - verbose: bool, ): exif_image_df = session.from_glob_path( "gs://bigframes_blob_test/images_exif/*", @@ -65,162 +63,202 @@ def test_blob_exif( ) actual = exif_image_df["blob_col"].blob.exif( - engine="pillow", connection=bq_connection, verbose=verbose + engine="pillow", connection=bq_connection, verbose=False + ) + expected = bpd.Series( + ['{"ExifOffset": 47, "Make": "MyCamera"}'], + session=session, + dtype=dtypes.JSON_DTYPE, + ) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + check_dtype=False, + check_index_type=False, ) - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.JSON_DTYPE +def test_blob_exif_verbose( + bq_connection: str, + session: bigframes.Session, +): + exif_image_df = session.from_glob_path( + "gs://bigframes_blob_test/images_exif/*", + name="blob_col", + connection=bq_connection, + ) - else: - expected = bpd.Series( - ['{"ExifOffset": 47, "Make": "MyCamera"}'], - session=session, - dtype=dtypes.JSON_DTYPE, - ) - pd.testing.assert_series_equal( - actual.to_pandas(), - expected.to_pandas(), - check_dtype=False, - check_index_type=False, - ) + actual = exif_image_df["blob_col"].blob.exif( + engine="pillow", connection=bq_connection, verbose=True + ) + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.JSON_DTYPE -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_blur_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, - verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection ) actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=verbose - ) - - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - # Content should be blob objects for GCS destination - else: - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=False + ) + + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() -@pytest.mark.parametrize("verbose", [True, False]) +def test_blob_image_blur_to_series_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_uris: list[str], + session: bigframes.Session, +): + series = bpd.Series(images_output_uris, session=session).str.to_blob( + connection=bq_connection + ) + + actual = images_mm_df["blob_col"].blob.image_blur( + (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=True + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + # Content should be blob objects for GCS destination + # verify the files exist + assert not actual.blob.size().isna().any() + + def test_blob_image_blur_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], - verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_blur( (8, 8), dst=images_output_folder, connection=bq_connection, engine="opencv", - verbose=verbose, - ) - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - else: - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + verbose=False, + ) + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() -@pytest.mark.parametrize("verbose", [True, False]) -def test_blob_image_blur_to_bq( - images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +def test_blob_image_blur_to_folder_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_folder: str, + images_output_uris: list[str], ): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), connection=bq_connection, engine="opencv", verbose=verbose + (8, 8), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + # verify the files exist + assert not actual.blob.size().isna().any() + + +def test_blob_image_blur_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): + actual = images_mm_df["blob_col"].blob.image_blur( + (8, 8), connection=bq_connection, engine="opencv", verbose=False ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 + assert actual.dtype == dtypes.BYTES_DTYPE + + +def test_blob_image_blur_to_bq_verbose(images_mm_df: bpd.DataFrame, bq_connection: str): + actual = images_mm_df["blob_col"].blob.image_blur( + (8, 8), connection=bq_connection, engine="opencv", verbose=True + ) - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns + assert isinstance(actual, bpd.Series) + assert len(actual) == 2 - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.BYTES_DTYPE + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE - else: - assert actual.dtype == dtypes.BYTES_DTYPE + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_resize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, - verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection @@ -231,128 +269,162 @@ def test_blob_image_resize_to_series( dst=series, connection=bq_connection, engine="opencv", - verbose=verbose, - ) - - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - else: - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + verbose=False, + ) + + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + + # verify the files exist + assert not actual.blob.size().isna().any() + + +def test_blob_image_resize_to_series_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_uris: list[str], + session: bigframes.Session, +): + series = bpd.Series(images_output_uris, session=session).str.to_blob( + connection=bq_connection + ) + + actual = images_mm_df["blob_col"].blob.image_resize( + (200, 300), + dst=series, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") # verify the files exist assert not actual.blob.size().isna().any() -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_resize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], - verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_resize( (200, 300), dst=images_output_folder, connection=bq_connection, engine="opencv", - verbose=verbose, + verbose=False, + ) + + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + + # verify the files exist + assert not actual.blob.size().isna().any() + + +def test_blob_image_resize_to_folder_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_folder: str, + images_output_uris: list[str], +): + actual = images_mm_df["blob_col"].blob.image_resize( + (200, 300), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=True, ) - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") - # verify the files exist - assert not content_series.blob.size().isna().any() + # verify the files exist + assert not content_series.blob.size().isna().any() - else: - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) - # verify the files exist - assert not actual.blob.size().isna().any() +def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): + actual = images_mm_df["blob_col"].blob.image_resize( + (200, 300), connection=bq_connection, engine="opencv", verbose=False + ) + assert isinstance(actual, bpd.Series) + assert len(actual) == 2 + assert actual.dtype == dtypes.BYTES_DTYPE -@pytest.mark.parametrize("verbose", [True, False]) -def test_blob_image_resize_to_bq( - images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool + +def test_blob_image_resize_to_bq_verbose( + images_mm_df: bpd.DataFrame, bq_connection: str ): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), connection=bq_connection, engine="opencv", verbose=verbose + (200, 300), connection=bq_connection, engine="opencv", verbose=True ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.BYTES_DTYPE + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE - else: - assert actual.dtype == dtypes.BYTES_DTYPE + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, - verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection @@ -365,50 +437,66 @@ def test_blob_image_normalize_to_series( dst=series, connection=bq_connection, engine="opencv", - verbose=verbose, + verbose=False, ) - if verbose: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns + # verify the files exist + assert not actual.blob.size().isna().any() - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") +def test_blob_image_normalize_to_series_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_uris: list[str], + session: bigframes.Session, +): + series = bpd.Series(images_output_uris, session=session).str.to_blob( + connection=bq_connection + ) - else: - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + actual = images_mm_df["blob_col"].blob.image_normalize( + alpha=50.0, + beta=150.0, + norm_type="minmax", + dst=series, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns - # verify the files exist - assert not actual.blob.size().isna().any() + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], - verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, @@ -417,45 +505,74 @@ def test_blob_image_normalize_to_folder( dst=images_output_folder, connection=bq_connection, engine="opencv", - verbose=verbose, - ) - - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns - - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE - - content_series = actual_exploded["content"] - # Content should be blob objects for GCS destination - assert hasattr(content_series, "blob") - - else: - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + verbose=False, + ) + + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() -@pytest.mark.parametrize("verbose", [True, False]) -def test_blob_image_normalize_to_bq( - images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +def test_blob_image_normalize_to_folder_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_folder: str, + images_output_uris: list[str], +): + actual = images_mm_df["blob_col"].blob.image_normalize( + alpha=50.0, + beta=150.0, + norm_type="minmax", + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + +def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): + actual = images_mm_df["blob_col"].blob.image_normalize( + alpha=50.0, + beta=150.0, + norm_type="minmax", + connection=bq_connection, + engine="opencv", + verbose=False, + ) + + assert isinstance(actual, bpd.Series) + assert len(actual) == 2 + assert actual.dtype == dtypes.BYTES_DTYPE + + +def test_blob_image_normalize_to_bq_verbose( + images_mm_df: bpd.DataFrame, bq_connection: str ): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, @@ -463,36 +580,106 @@ def test_blob_image_normalize_to_bq( norm_type="minmax", connection=bq_connection, engine="opencv", - verbose=verbose, + verbose=True, ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - if verbose: - assert hasattr(actual, "struct") - actual_exploded = actual.struct.explode() - assert "status" in actual_exploded.columns - assert "content" in actual_exploded.columns + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns - status_series = actual_exploded["status"] - assert status_series.dtype == dtypes.STRING_DTYPE + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE - content_series = actual_exploded["content"] - assert content_series.dtype == dtypes.BYTES_DTYPE - else: - assert actual.dtype == dtypes.BYTES_DTYPE + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE -@pytest.mark.parametrize("verbose", [True, False]) def test_blob_pdf_extract( pdf_mm_df: bpd.DataFrame, - verbose: bool, bq_connection: str, ): actual = ( pdf_mm_df["pdf"] - .blob.pdf_extract(connection=bq_connection, verbose=verbose, engine="pypdf") + .blob.pdf_extract(connection=bq_connection, verbose=False, engine="pypdf") + .explode() + .to_pandas() + ) + + # check relative length + expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." + expected_len = len(expected_text) + + actual_text = actual[actual != ""].iloc[0] + actual_len = len(actual_text) + + relative_length_tolerance = 0.25 + min_acceptable_len = expected_len * (1 - relative_length_tolerance) + max_acceptable_len = expected_len * (1 + relative_length_tolerance) + assert min_acceptable_len <= actual_len <= max_acceptable_len, ( + f"Item (verbose=False): Extracted text length {actual_len} is outside the acceptable range " + f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " + f"Expected reference length was {expected_len}. " + ) + + # check for major keywords + major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] + for keyword in major_keywords: + assert ( + keyword.lower() in actual_text.lower() + ), f"Item (verbose=False): Expected keyword '{keyword}' not found in extracted text. " + + +def test_blob_pdf_extract_verbose( + pdf_mm_df: bpd.DataFrame, + bq_connection: str, +): + actual = ( + pdf_mm_df["pdf"] + .blob.pdf_extract(connection=bq_connection, verbose=True, engine="pypdf") + .explode() + .to_pandas() + ) + + # check relative length + expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." + expected_len = len(expected_text) + + # The first entry is for a file that doesn't exist, so we check the second one + successful_results = actual[actual.apply(lambda x: x["status"] == "")] + actual_text = successful_results.apply(lambda x: x["content"]).iloc[0] + actual_len = len(actual_text) + + relative_length_tolerance = 0.25 + min_acceptable_len = expected_len * (1 - relative_length_tolerance) + max_acceptable_len = expected_len * (1 + relative_length_tolerance) + assert min_acceptable_len <= actual_len <= max_acceptable_len, ( + f"Item (verbose=True): Extracted text length {actual_len} is outside the acceptable range " + f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " + f"Expected reference length was {expected_len}. " + ) + + # check for major keywords + major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] + for keyword in major_keywords: + assert ( + keyword.lower() in actual_text.lower() + ), f"Item (verbose=True): Expected keyword '{keyword}' not found in extracted text. " + + +def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, bq_connection: str): + actual = ( + pdf_mm_df["pdf"] + .blob.pdf_chunk( + connection=bq_connection, + chunk_size=50, + overlap_size=10, + verbose=False, + engine="pypdf", + ) .explode() .to_pandas() ) @@ -501,20 +688,15 @@ def test_blob_pdf_extract( expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." expected_len = len(expected_text) - actual_text = "" - if verbose: - # The first entry is for a file that doesn't exist, so we check the second one - successful_results = actual[actual.apply(lambda x: x["status"] == "")] - actual_text = successful_results.apply(lambda x: x["content"]).iloc[0] - else: - actual_text = actual[actual != ""].iloc[0] + # First entry is NA + actual_text = "".join(actual.dropna()) actual_len = len(actual_text) relative_length_tolerance = 0.25 min_acceptable_len = expected_len * (1 - relative_length_tolerance) max_acceptable_len = expected_len * (1 + relative_length_tolerance) assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose={verbose}): Extracted text length {actual_len} is outside the acceptable range " + f"Item (verbose=False): Extracted text length {actual_len} is outside the acceptable range " f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " f"Expected reference length was {expected_len}. " ) @@ -524,18 +706,17 @@ def test_blob_pdf_extract( for keyword in major_keywords: assert ( keyword.lower() in actual_text.lower() - ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in extracted text. " + ), f"Item (verbose=False): Expected keyword '{keyword}' not found in extracted text. " -@pytest.mark.parametrize("verbose", [True, False]) -def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: str): +def test_blob_pdf_chunk_verbose(pdf_mm_df: bpd.DataFrame, bq_connection: str): actual = ( pdf_mm_df["pdf"] .blob.pdf_chunk( connection=bq_connection, chunk_size=50, overlap_size=10, - verbose=verbose, + verbose=True, engine="pypdf", ) .explode() @@ -546,21 +727,16 @@ def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." expected_len = len(expected_text) - actual_text = "" - if verbose: - # The first entry is for a file that doesn't exist, so we check the second one - successful_results = actual[actual.apply(lambda x: x["status"] == "")] - actual_text = "".join(successful_results.apply(lambda x: x["content"]).iloc[0]) - else: - # First entry is NA - actual_text = "".join(actual.dropna()) + # The first entry is for a file that doesn't exist, so we check the second one + successful_results = actual[actual.apply(lambda x: x["status"] == "")] + actual_text = "".join(successful_results.apply(lambda x: x["content"]).iloc[0]) actual_len = len(actual_text) relative_length_tolerance = 0.25 min_acceptable_len = expected_len * (1 - relative_length_tolerance) max_acceptable_len = expected_len * (1 + relative_length_tolerance) assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose={verbose}): Extracted text length {actual_len} is outside the acceptable range " + f"Item (verbose=True): Extracted text length {actual_len} is outside the acceptable range " f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " f"Expected reference length was {expected_len}. " ) @@ -570,28 +746,25 @@ def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: for keyword in major_keywords: assert ( keyword.lower() in actual_text.lower() - ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in extracted text. " + ), f"Item (verbose=True): Expected keyword '{keyword}' not found in extracted text. " @pytest.mark.parametrize( - "model_name, verbose", + "model_name", [ - ("gemini-2.0-flash-001", True), - ("gemini-2.0-flash-001", False), - ("gemini-2.0-flash-lite-001", True), - ("gemini-2.0-flash-lite-001", False), + "gemini-2.0-flash-001", + "gemini-2.0-flash-lite-001", ], ) def test_blob_transcribe( audio_mm_df: bpd.DataFrame, model_name: str, - verbose: bool, ): actual = ( audio_mm_df["audio"] .blob.audio_transcribe( model_name=model_name, # type: ignore - verbose=verbose, + verbose=False, ) .to_pandas() ) @@ -600,18 +773,63 @@ def test_blob_transcribe( expected_text = "Now, as all books not primarily intended as picture-books consist principally of types composed to form letterpress" expected_len = len(expected_text) - actual_text = "" - if verbose: - actual_text = actual[0]["content"] - else: - actual_text = actual[0] + actual_text = actual[0] if pd.isna(actual_text) or actual_text == "": # Ensure the tests are robust to flakes in the model, which isn't # particularly useful information for the bigframes team. - logging.warning( - f"blob_transcribe() model {model_name} verbose={verbose} failure" + logging.warning(f"blob_transcribe() model {model_name} verbose=False failure") + return + + actual_len = len(actual_text) + + relative_length_tolerance = 0.2 + min_acceptable_len = expected_len * (1 - relative_length_tolerance) + max_acceptable_len = expected_len * (1 + relative_length_tolerance) + assert min_acceptable_len <= actual_len <= max_acceptable_len, ( + f"Item (verbose=False): Transcribed text length {actual_len} is outside the acceptable range " + f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " + f"Expected reference length was {expected_len}. " + ) + + # check for major keywords + major_keywords = ["book", "picture"] + for keyword in major_keywords: + assert ( + keyword.lower() in actual_text.lower() + ), f"Item (verbose=False): Expected keyword '{keyword}' not found in transcribed text. " + + +@pytest.mark.parametrize( + "model_name", + [ + "gemini-2.0-flash-001", + "gemini-2.0-flash-lite-001", + ], +) +def test_blob_transcribe_verbose( + audio_mm_df: bpd.DataFrame, + model_name: str, +): + actual = ( + audio_mm_df["audio"] + .blob.audio_transcribe( + model_name=model_name, # type: ignore + verbose=True, ) + .to_pandas() + ) + + # check relative length + expected_text = "Now, as all books not primarily intended as picture-books consist principally of types composed to form letterpress" + expected_len = len(expected_text) + + actual_text = actual[0]["content"] + + if pd.isna(actual_text) or actual_text == "": + # Ensure the tests are robust to flakes in the model, which isn't + # particularly useful information for the bigframes team. + logging.warning(f"blob_transcribe() model {model_name} verbose=True failure") return actual_len = len(actual_text) @@ -620,7 +838,7 @@ def test_blob_transcribe( min_acceptable_len = expected_len * (1 - relative_length_tolerance) max_acceptable_len = expected_len * (1 + relative_length_tolerance) assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose={verbose}): Transcribed text length {actual_len} is outside the acceptable range " + f"Item (verbose=True): Transcribed text length {actual_len} is outside the acceptable range " f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " f"Expected reference length was {expected_len}. " ) @@ -630,4 +848,4 @@ def test_blob_transcribe( for keyword in major_keywords: assert ( keyword.lower() in actual_text.lower() - ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in transcribed text. " + ), f"Item (verbose=True): Expected keyword '{keyword}' not found in transcribed text. "