diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 8dd9328fb8..2a11974b8d 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -21,7 +21,13 @@ import bigframes.session import bigframes.session._io.bigquery as bf_io_bigquery -_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} +_PYTHON_TO_BQ_TYPES = { + int: "INT64", + float: "FLOAT64", + str: "STRING", + bytes: "BYTES", + bool: "BOOL", +} @dataclass(frozen=True) @@ -113,7 +119,7 @@ def udf(self): return self._session.read_gbq_function(udf_name) -def exif_func(src_obj_ref_rt: str) -> str: +def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: import io import json @@ -121,25 +127,36 @@ def exif_func(src_obj_ref_rt: str) -> str: import requests from requests import adapters - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + result_dict = {"status": "", "content": "{}"} + try: + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - response = session.get(src_url, timeout=30) - bts = response.content + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - image = Image.open(io.BytesIO(bts)) - exif_data = image.getexif() - exif_dict = {} - if exif_data: - for tag, value in exif_data.items(): - tag_name = ExifTags.TAGS.get(tag, tag) - exif_dict[tag_name] = value + response = session.get(src_url, timeout=30) + bts = response.content + + image = Image.open(io.BytesIO(bts)) + exif_data = image.getexif() + exif_dict = {} + if exif_data: + for tag, value in exif_data.items(): + tag_name = ExifTags.TAGS.get(tag, tag) + # Pillow might return bytes, which are not serializable. + if isinstance(value, bytes): + value = value.decode("utf-8", "replace") + exif_dict[tag_name] = value + result_dict["content"] = json.dumps(exif_dict) + except Exception as e: + result_dict["status"] = str(e) - return json.dumps(exif_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] exif_func_def = FunctionDef(exif_func, ["pillow", "requests"]) @@ -147,82 +164,109 @@ def exif_func(src_obj_ref_rt: str) -> str: # Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string. def image_blur_func( - src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str + src_obj_ref_rt: str, + dst_obj_ref_rt: str, + ksize_x: int, + ksize_y: int, + ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + bts = cv.imencode(ext, img_blurred)[1].tobytes() - bts = cv.imencode(ext, img_blurred)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) def image_blur_to_bytes_func( - src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str -) -> bytes: + src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str, verbose: bool +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + status = "" + content = b"" + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + content = cv.imencode(ext, img_blurred)[1].tobytes() - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - bts = cv.imencode(ext, img_blurred)[1].tobytes() + except Exception as e: + status = str(e) - return bts + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict = {"status": status, "content": encoded_content} + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_to_bytes_def = FunctionDef( @@ -238,49 +282,59 @@ def image_resize_func( fx: float, fy: float, ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + bts = cv.imencode(ext, img_resized)[1].tobytes() - bts = cv.imencode(ext, img_resized)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_def = FunctionDef( @@ -295,31 +349,45 @@ def image_resize_to_bytes_func( fx: float, fy: float, ext: str, -) -> bytes: + verbose: bool, +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + status = "" + content = b"" + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + content = cv.imencode(".jpeg", img_resized)[1].tobytes() - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - bts = cv.imencode(".jpeg", img_resized)[1].tobytes() + except Exception as e: + status = str(e) - return bts + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict = {"status": status, "content": encoded_content} + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_to_bytes_def = FunctionDef( @@ -334,58 +402,68 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + norm_type_mapping = { + "inf": cv.NORM_INF, + "l1": cv.NORM_L1, + "l2": cv.NORM_L2, + "minmax": cv.NORM_MINMAX, + } - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_normalized = cv.normalize( + img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] + ) - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) + bts = cv.imencode(ext, img_normalized)[1].tobytes() - bts = cv.imencode(ext, img_normalized)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_def = FunctionDef( @@ -394,41 +472,59 @@ def image_normalize_func( def image_normalize_to_bytes_func( - src_obj_ref_rt: str, alpha: float, beta: float, norm_type: str, ext: str -) -> bytes: + src_obj_ref_rt: str, + alpha: float, + beta: float, + norm_type: str, + ext: str, + verbose: bool, +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": ""} + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + norm_type_mapping = { + "inf": cv.NORM_INF, + "l1": cv.NORM_L1, + "l2": cv.NORM_L2, + "minmax": cv.NORM_MINMAX, + } - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + + response = session.get(src_url, timeout=30) + bts = response.content - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_normalized = cv.normalize( + img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] + ) + bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() - response = session.get(src_url, timeout=30) - bts = response.content + content_b64 = base64.b64encode(bts).decode("utf-8") + result_dict["content"] = content_b64 - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) - bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() + except Exception as e: + result_dict["status"] = str(e) - return bts + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_to_bytes_def = FunctionDef( @@ -437,7 +533,7 @@ def image_normalize_to_bytes_func( # Extracts all text from a PDF url -def pdf_extract_func(src_obj_ref_rt: str) -> str: +def pdf_extract_func(src_obj_ref_rt: str, verbose: bool) -> str: try: import io import json @@ -470,8 +566,10 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str: except Exception as e: result_dict = {"status": str(e), "content": ""} - result_json = json.dumps(result_dict) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] pdf_extract_def = FunctionDef( @@ -480,7 +578,9 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str: # Extracts text from a PDF url and chunks it simultaneously -def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str: +def pdf_chunk_func( + src_obj_ref_rt: str, chunk_size: int, overlap_size: int, verbose: bool +) -> str: try: import io import json @@ -526,8 +626,10 @@ def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> s except Exception as e: result_dict = {"status": str(e), "content": []} - result_json = json.dumps(result_dict) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return json.dumps(result_dict["content"]) pdf_chunk_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index d505f096f4..4da9bfee82 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -22,7 +22,7 @@ import pandas as pd import requests -from bigframes import clients +from bigframes import clients, dtypes from bigframes.core import log_adapter import bigframes.dataframe import bigframes.exceptions as bfe @@ -80,9 +80,20 @@ def metadata(self) -> bigframes.series.Series: Returns: bigframes.series.Series: JSON metadata of the Blob. Contains fields: content_type, md5_hash, size and updated(time).""" - details_json = self._apply_unary_op(ops.obj_fetch_metadata_op).struct.field( - "details" - ) + series_to_check = bigframes.series.Series(self._block) + # Check if it's a struct series from a verbose operation + if dtypes.is_struct_like(series_to_check.dtype): + pyarrow_dtype = series_to_check.dtype.pyarrow_dtype + if "content" in [field.name for field in pyarrow_dtype]: + content_field_type = pyarrow_dtype.field("content").type + content_bf_type = dtypes.arrow_dtype_to_bigframes_dtype( + content_field_type + ) + if content_bf_type == dtypes.OBJ_REF_DTYPE: + series_to_check = series_to_check.struct.field("content") + details_json = series_to_check._apply_unary_op( + ops.obj_fetch_metadata_op + ).struct.field("details") import bigframes.bigquery as bbq return bbq.json_extract(details_json, "$.gcs_metadata").rename("metadata") @@ -313,6 +324,7 @@ def exif( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Extract EXIF data. Now only support image types. @@ -322,18 +334,21 @@ def exif( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: JSON series of key-value pairs. + bigframes.series.Series: JSON series of key-value pairs if verbose=False, or struct with status and content if verbose=True. """ if engine is None or engine.casefold() != "pillow": raise ValueError("Must specify the engine, supported value is 'pillow'.") import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() + df["verbose"] = verbose exif_udf = blob_func.TransformFunction( blob_func.exif_func_def, @@ -345,9 +360,21 @@ def exif( ).udf() res = self._df_apply_udf(df, exif_udf) - res = bbq.parse_json(res) - return res + if verbose: + exif_content_series = bbq.parse_json( + res._apply_unary_op(ops.JSONValue(json_path="$.content")) + ).rename("exif_content") + exif_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": exif_status_series, "content": exif_content_series} + ) + results_struct = bbq.struct(results_df).rename("exif_results") + return results_struct + else: + return bbq.parse_json(res) def image_blur( self, @@ -359,6 +386,7 @@ def image_blur( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Blurs images. @@ -374,14 +402,17 @@ def image_blur( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -400,9 +431,29 @@ def image_blur( df["ksize_x"], df["ksize_y"] = ksize df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_blur_udf) - return res + if verbose: + blurred_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + blurred_content_series = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[blurred_content_b64_series] + ) + blurred_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": blurred_status_series, "content": blurred_content_series} + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct + else: + blurred_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("blurred_bytes") + return blurred_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -428,11 +479,27 @@ def image_blur( df = df.join(dst_rt, how="outer") df["ksize_x"], df["ksize_y"] = ksize df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_blur_udf) res.cache() # to execute the udf - return dst + if verbose: + blurred_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + { + "status": blurred_status_series, + "content": dst.blob.uri().str.to_blob( + connection=self._resolve_connection(connection) + ), + } + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct + else: + return dst def image_resize( self, @@ -446,6 +513,7 @@ def image_resize( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ): """Resize images. @@ -463,9 +531,10 @@ def image_resize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") @@ -477,7 +546,9 @@ def image_resize( "Only one of dsize or (fx, fy) parameters must be set. And the set values must be positive. " ) + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -497,9 +568,30 @@ def image_resize( df["dsize_x"], df["dsizye_y"] = dsize df["fx"], df["fy"] = fx, fy df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_resize_udf) - return res + if verbose: + resized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + resized_content_series = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[resized_content_b64_series] + ) + + resized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": resized_status_series, "content": resized_content_series} + ) + results_struct = bbq.struct(results_df).rename("resized_results") + return results_struct + else: + resized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("resized_bytes") + return resized_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -526,11 +618,27 @@ def image_resize( df["dsize_x"], df["dsizye_y"] = dsize df["fx"], df["fy"] = fx, fy df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_resize_udf) res.cache() # to execute the udf - return dst + if verbose: + resized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + { + "status": resized_status_series, + "content": dst.blob.uri().str.to_blob( + connection=self._resolve_connection(connection) + ), + } + ) + results_struct = bbq.struct(results_df).rename("resized_results") + return results_struct + else: + return dst def image_normalize( self, @@ -544,6 +652,7 @@ def image_normalize( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Normalize images. @@ -561,14 +670,17 @@ def image_normalize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -589,9 +701,29 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) - return res + if verbose: + normalized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[normalized_content_b64_series] + ) + normalized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": normalized_status_series, "content": normalized_bytes} + ) + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct + else: + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("normalized_bytes") + return normalized_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -619,11 +751,27 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) res.cache() # to execute the udf - return dst + if verbose: + normalized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + { + "status": normalized_status_series, + "content": dst.blob.uri().str.to_blob( + connection=self._resolve_connection(connection) + ), + } + ) + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct + else: + return dst def pdf_extract( self, @@ -675,19 +823,22 @@ def pdf_extract( container_memory=container_memory, ).udf() - src_rt = self.get_runtime_json_str(mode="R") - - res = src_rt.apply(pdf_extract_udf) - - content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + df = self.get_runtime_json_str(mode="R").to_frame() + df["verbose"] = verbose + res = self._df_apply_udf(df, pdf_extract_udf) if verbose: + extracted_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df) - return struct_series + results_df = bpd.DataFrame( + {"status": status_series, "content": extracted_content_series} + ) + results_struct = bbq.struct(results_df).rename("extracted_results") + return results_struct else: - return content_series + return res.rename("extracted_content") def pdf_chunk( self, @@ -754,21 +905,23 @@ def pdf_chunk( container_memory=container_memory, ).udf() - src_rt = self.get_runtime_json_str(mode="R") - df = src_rt.to_frame() + df = self.get_runtime_json_str(mode="R").to_frame() df["chunk_size"] = chunk_size df["overlap_size"] = overlap_size + df["verbose"] = verbose res = self._df_apply_udf(df, pdf_chunk_udf) - content_series = bbq.json_extract_string_array(res, "$.content") if verbose: + chunked_content_series = bbq.json_extract_string_array(res, "$.content") status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df) - return struct_series + results_df = bpd.DataFrame( + {"status": status_series, "content": chunked_content_series} + ) + resultes_struct = bbq.struct(results_df).rename("chunked_results") + return resultes_struct else: - return content_series + return bbq.json_extract_string_array(res, "$").rename("chunked_content") def audio_transcribe( self, @@ -841,4 +994,4 @@ def audio_transcribe( results_struct = bbq.struct(results_df).rename("transcription_results") return results_struct else: - return transcribed_content_series + return transcribed_content_series.rename("transcribed_content") diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index f006395d2f..7963fabd0b 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -63,7 +63,7 @@ def test_blob_exif( ) actual = exif_image_df["blob_col"].blob.exif( - engine="pillow", connection=bq_connection + engine="pillow", connection=bq_connection, verbose=False ) expected = bpd.Series( ['{"ExifOffset": 47, "Make": "MyCamera"}'], @@ -78,6 +78,31 @@ def test_blob_exif( ) +def test_blob_exif_verbose( + bq_connection: str, + session: bigframes.Session, +): + exif_image_df = session.from_glob_path( + "gs://bigframes_blob_test/images_exif/*", + name="blob_col", + connection=bq_connection, + ) + + actual = exif_image_df["blob_col"].blob.exif( + engine="pillow", connection=bq_connection, verbose=True + ) + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.JSON_DTYPE + + def test_blob_image_blur_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, @@ -89,8 +114,9 @@ def test_blob_image_blur_to_series( ) actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=series, connection=bq_connection, engine="opencv" + (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=False ) + expected_df = pd.DataFrame( { "uri": images_output_uris, @@ -110,6 +136,33 @@ def test_blob_image_blur_to_series( assert not actual.blob.size().isna().any() +def test_blob_image_blur_to_series_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_uris: list[str], + session: bigframes.Session, +): + series = bpd.Series(images_output_uris, session=session).str.to_blob( + connection=bq_connection + ) + + actual = images_mm_df["blob_col"].blob.image_blur( + (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=True + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + # Content should be blob objects for GCS destination + # verify the files exist + assert not actual.blob.size().isna().any() + + def test_blob_image_blur_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, @@ -117,7 +170,11 @@ def test_blob_image_blur_to_folder( images_output_uris: list[str], ): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=images_output_folder, connection=bq_connection, engine="opencv" + (8, 8), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=False, ) expected_df = pd.DataFrame( { @@ -138,9 +195,38 @@ def test_blob_image_blur_to_folder( assert not actual.blob.size().isna().any() +def test_blob_image_blur_to_folder_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_folder: str, + images_output_uris: list[str], +): + actual = images_mm_df["blob_col"].blob.image_blur( + (8, 8), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + # verify the files exist + assert not actual.blob.size().isna().any() + + def test_blob_image_blur_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), connection=bq_connection, engine="opencv" + (8, 8), connection=bq_connection, engine="opencv", verbose=False ) assert isinstance(actual, bpd.Series) @@ -148,6 +234,26 @@ def test_blob_image_blur_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): assert actual.dtype == dtypes.BYTES_DTYPE +def test_blob_image_blur_to_bq_verbose(images_mm_df: bpd.DataFrame, bq_connection: str): + actual = images_mm_df["blob_col"].blob.image_blur( + (8, 8), connection=bq_connection, engine="opencv", verbose=True + ) + + assert isinstance(actual, bpd.Series) + assert len(actual) == 2 + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + + def test_blob_image_resize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, @@ -159,8 +265,13 @@ def test_blob_image_resize_to_series( ) actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), dst=series, connection=bq_connection, engine="opencv" + (200, 300), + dst=series, + connection=bq_connection, + engine="opencv", + verbose=False, ) + expected_df = pd.DataFrame( { "uri": images_output_uris, @@ -180,6 +291,40 @@ def test_blob_image_resize_to_series( assert not actual.blob.size().isna().any() +def test_blob_image_resize_to_series_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_uris: list[str], + session: bigframes.Session, +): + series = bpd.Series(images_output_uris, session=session).str.to_blob( + connection=bq_connection + ) + + actual = images_mm_df["blob_col"].blob.image_resize( + (200, 300), + dst=series, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + # verify the files exist + assert not actual.blob.size().isna().any() + + def test_blob_image_resize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, @@ -187,8 +332,13 @@ def test_blob_image_resize_to_folder( images_output_uris: list[str], ): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), dst=images_output_folder, connection=bq_connection, engine="opencv" + (200, 300), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=False, ) + expected_df = pd.DataFrame( { "uri": images_output_uris, @@ -208,9 +358,39 @@ def test_blob_image_resize_to_folder( assert not actual.blob.size().isna().any() +def test_blob_image_resize_to_folder_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_folder: str, + images_output_uris: list[str], +): + actual = images_mm_df["blob_col"].blob.image_resize( + (200, 300), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + # verify the files exist + assert not content_series.blob.size().isna().any() + + def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), connection=bq_connection, engine="opencv" + (200, 300), connection=bq_connection, engine="opencv", verbose=False ) assert isinstance(actual, bpd.Series) @@ -218,6 +398,28 @@ def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str assert actual.dtype == dtypes.BYTES_DTYPE +def test_blob_image_resize_to_bq_verbose( + images_mm_df: bpd.DataFrame, bq_connection: str +): + actual = images_mm_df["blob_col"].blob.image_resize( + (200, 300), connection=bq_connection, engine="opencv", verbose=True + ) + + assert isinstance(actual, bpd.Series) + assert len(actual) == 2 + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + + def test_blob_image_normalize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, @@ -235,7 +437,9 @@ def test_blob_image_normalize_to_series( dst=series, connection=bq_connection, engine="opencv", + verbose=False, ) + expected_df = pd.DataFrame( { "uri": images_output_uris, @@ -255,6 +459,39 @@ def test_blob_image_normalize_to_series( assert not actual.blob.size().isna().any() +def test_blob_image_normalize_to_series_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_uris: list[str], + session: bigframes.Session, +): + series = bpd.Series(images_output_uris, session=session).str.to_blob( + connection=bq_connection + ) + + actual = images_mm_df["blob_col"].blob.image_normalize( + alpha=50.0, + beta=150.0, + norm_type="minmax", + dst=series, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + def test_blob_image_normalize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, @@ -268,7 +505,9 @@ def test_blob_image_normalize_to_folder( dst=images_output_folder, connection=bq_connection, engine="opencv", + verbose=False, ) + expected_df = pd.DataFrame( { "uri": images_output_uris, @@ -288,6 +527,35 @@ def test_blob_image_normalize_to_folder( assert not actual.blob.size().isna().any() +def test_blob_image_normalize_to_folder_verbose( + images_mm_df: bpd.DataFrame, + bq_connection: str, + images_output_folder: str, + images_output_uris: list[str], +): + actual = images_mm_df["blob_col"].blob.image_normalize( + alpha=50.0, + beta=150.0, + norm_type="minmax", + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, @@ -295,6 +563,7 @@ def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: norm_type="minmax", connection=bq_connection, engine="opencv", + verbose=False, ) assert isinstance(actual, bpd.Series) @@ -302,21 +571,40 @@ def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: assert actual.dtype == dtypes.BYTES_DTYPE -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) +def test_blob_image_normalize_to_bq_verbose( + images_mm_df: bpd.DataFrame, bq_connection: str +): + actual = images_mm_df["blob_col"].blob.image_normalize( + alpha=50.0, + beta=150.0, + norm_type="minmax", + connection=bq_connection, + engine="opencv", + verbose=True, + ) + + assert isinstance(actual, bpd.Series) + assert len(actual) == 2 + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + + def test_blob_pdf_extract( pdf_mm_df: bpd.DataFrame, - verbose: bool, bq_connection: str, ): actual = ( pdf_mm_df["pdf"] - .blob.pdf_extract(connection=bq_connection, verbose=verbose, engine="pypdf") + .blob.pdf_extract(connection=bq_connection, verbose=False, engine="pypdf") .explode() .to_pandas() ) @@ -325,20 +613,14 @@ def test_blob_pdf_extract( expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." expected_len = len(expected_text) - actual_text = "" - if verbose: - # The first entry is for a file that doesn't exist, so we check the second one - successful_results = actual[actual.apply(lambda x: x["status"] == "")] - actual_text = successful_results.apply(lambda x: x["content"]).iloc[0] - else: - actual_text = actual[actual != ""].iloc[0] + actual_text = actual[actual != ""].iloc[0] actual_len = len(actual_text) relative_length_tolerance = 0.25 min_acceptable_len = expected_len * (1 - relative_length_tolerance) max_acceptable_len = expected_len * (1 + relative_length_tolerance) assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose={verbose}): Extracted text length {actual_len} is outside the acceptable range " + f"Item (verbose=False): Extracted text length {actual_len} is outside the acceptable range " f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " f"Expected reference length was {expected_len}. " ) @@ -348,24 +630,93 @@ def test_blob_pdf_extract( for keyword in major_keywords: assert ( keyword.lower() in actual_text.lower() - ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in extracted text. " + ), f"Item (verbose=False): Expected keyword '{keyword}' not found in extracted text. " -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) -def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: str): +def test_blob_pdf_extract_verbose( + pdf_mm_df: bpd.DataFrame, + bq_connection: str, +): + actual = ( + pdf_mm_df["pdf"] + .blob.pdf_extract(connection=bq_connection, verbose=True, engine="pypdf") + .explode() + .to_pandas() + ) + + # check relative length + expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." + expected_len = len(expected_text) + + # The first entry is for a file that doesn't exist, so we check the second one + successful_results = actual[actual.apply(lambda x: x["status"] == "")] + actual_text = successful_results.apply(lambda x: x["content"]).iloc[0] + actual_len = len(actual_text) + + relative_length_tolerance = 0.25 + min_acceptable_len = expected_len * (1 - relative_length_tolerance) + max_acceptable_len = expected_len * (1 + relative_length_tolerance) + assert min_acceptable_len <= actual_len <= max_acceptable_len, ( + f"Item (verbose=True): Extracted text length {actual_len} is outside the acceptable range " + f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " + f"Expected reference length was {expected_len}. " + ) + + # check for major keywords + major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] + for keyword in major_keywords: + assert ( + keyword.lower() in actual_text.lower() + ), f"Item (verbose=True): Expected keyword '{keyword}' not found in extracted text. " + + +def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, bq_connection: str): + actual = ( + pdf_mm_df["pdf"] + .blob.pdf_chunk( + connection=bq_connection, + chunk_size=50, + overlap_size=10, + verbose=False, + engine="pypdf", + ) + .explode() + .to_pandas() + ) + + # check relative length + expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." + expected_len = len(expected_text) + + # First entry is NA + actual_text = "".join(actual.dropna()) + actual_len = len(actual_text) + + relative_length_tolerance = 0.25 + min_acceptable_len = expected_len * (1 - relative_length_tolerance) + max_acceptable_len = expected_len * (1 + relative_length_tolerance) + assert min_acceptable_len <= actual_len <= max_acceptable_len, ( + f"Item (verbose=False): Extracted text length {actual_len} is outside the acceptable range " + f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " + f"Expected reference length was {expected_len}. " + ) + + # check for major keywords + major_keywords = ["Sample", "PDF", "testing", "dummy", "messages"] + for keyword in major_keywords: + assert ( + keyword.lower() in actual_text.lower() + ), f"Item (verbose=False): Expected keyword '{keyword}' not found in extracted text. " + + +def test_blob_pdf_chunk_verbose(pdf_mm_df: bpd.DataFrame, bq_connection: str): actual = ( pdf_mm_df["pdf"] .blob.pdf_chunk( connection=bq_connection, chunk_size=50, overlap_size=10, - verbose=verbose, + verbose=True, engine="pypdf", ) .explode() @@ -376,21 +727,16 @@ def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: expected_text = "Sample PDF This is a testing file. Some dummy messages are used for testing purposes." expected_len = len(expected_text) - actual_text = "" - if verbose: - # The first entry is for a file that doesn't exist, so we check the second one - successful_results = actual[actual.apply(lambda x: x["status"] == "")] - actual_text = "".join(successful_results.apply(lambda x: x["content"]).iloc[0]) - else: - # First entry is NA - actual_text = "".join(actual.dropna()) + # The first entry is for a file that doesn't exist, so we check the second one + successful_results = actual[actual.apply(lambda x: x["status"] == "")] + actual_text = "".join(successful_results.apply(lambda x: x["content"]).iloc[0]) actual_len = len(actual_text) relative_length_tolerance = 0.25 min_acceptable_len = expected_len * (1 - relative_length_tolerance) max_acceptable_len = expected_len * (1 + relative_length_tolerance) assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose={verbose}): Extracted text length {actual_len} is outside the acceptable range " + f"Item (verbose=True): Extracted text length {actual_len} is outside the acceptable range " f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " f"Expected reference length was {expected_len}. " ) @@ -400,28 +746,25 @@ def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: for keyword in major_keywords: assert ( keyword.lower() in actual_text.lower() - ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in extracted text. " + ), f"Item (verbose=True): Expected keyword '{keyword}' not found in extracted text. " @pytest.mark.parametrize( - "model_name, verbose", + "model_name", [ - ("gemini-2.0-flash-001", True), - ("gemini-2.0-flash-001", False), - ("gemini-2.0-flash-lite-001", True), - ("gemini-2.0-flash-lite-001", False), + "gemini-2.0-flash-001", + "gemini-2.0-flash-lite-001", ], ) def test_blob_transcribe( audio_mm_df: bpd.DataFrame, model_name: str, - verbose: bool, ): actual = ( audio_mm_df["audio"] .blob.audio_transcribe( model_name=model_name, # type: ignore - verbose=verbose, + verbose=False, ) .to_pandas() ) @@ -430,18 +773,63 @@ def test_blob_transcribe( expected_text = "Now, as all books not primarily intended as picture-books consist principally of types composed to form letterpress" expected_len = len(expected_text) - actual_text = "" - if verbose: - actual_text = actual[0]["content"] - else: - actual_text = actual[0] + actual_text = actual[0] if pd.isna(actual_text) or actual_text == "": # Ensure the tests are robust to flakes in the model, which isn't # particularly useful information for the bigframes team. - logging.warning( - f"blob_transcribe() model {model_name} verbose={verbose} failure" + logging.warning(f"blob_transcribe() model {model_name} verbose=False failure") + return + + actual_len = len(actual_text) + + relative_length_tolerance = 0.2 + min_acceptable_len = expected_len * (1 - relative_length_tolerance) + max_acceptable_len = expected_len * (1 + relative_length_tolerance) + assert min_acceptable_len <= actual_len <= max_acceptable_len, ( + f"Item (verbose=False): Transcribed text length {actual_len} is outside the acceptable range " + f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " + f"Expected reference length was {expected_len}. " + ) + + # check for major keywords + major_keywords = ["book", "picture"] + for keyword in major_keywords: + assert ( + keyword.lower() in actual_text.lower() + ), f"Item (verbose=False): Expected keyword '{keyword}' not found in transcribed text. " + + +@pytest.mark.parametrize( + "model_name", + [ + "gemini-2.0-flash-001", + "gemini-2.0-flash-lite-001", + ], +) +def test_blob_transcribe_verbose( + audio_mm_df: bpd.DataFrame, + model_name: str, +): + actual = ( + audio_mm_df["audio"] + .blob.audio_transcribe( + model_name=model_name, # type: ignore + verbose=True, ) + .to_pandas() + ) + + # check relative length + expected_text = "Now, as all books not primarily intended as picture-books consist principally of types composed to form letterpress" + expected_len = len(expected_text) + + actual_text = actual[0]["content"] + + if pd.isna(actual_text) or actual_text == "": + # Ensure the tests are robust to flakes in the model, which isn't + # particularly useful information for the bigframes team. + logging.warning(f"blob_transcribe() model {model_name} verbose=True failure") return actual_len = len(actual_text) @@ -450,7 +838,7 @@ def test_blob_transcribe( min_acceptable_len = expected_len * (1 - relative_length_tolerance) max_acceptable_len = expected_len * (1 + relative_length_tolerance) assert min_acceptable_len <= actual_len <= max_acceptable_len, ( - f"Item (verbose={verbose}): Transcribed text length {actual_len} is outside the acceptable range " + f"Item (verbose=True): Transcribed text length {actual_len} is outside the acceptable range " f"[{min_acceptable_len:.0f}, {max_acceptable_len:.0f}]. " f"Expected reference length was {expected_len}. " ) @@ -460,4 +848,4 @@ def test_blob_transcribe( for keyword in major_keywords: assert ( keyword.lower() in actual_text.lower() - ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in transcribed text. " + ), f"Item (verbose=True): Expected keyword '{keyword}' not found in transcribed text. "