8000 refactor: add output type annotations to scalar ops by TrevorBergeron · Pull Request #338 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
import dataclasses
import itertools
import typing
from typing import Optional

import bigframes.dtypes
import bigframes.dtypes as dtypes
import bigframes.operations


def const(
value: typing.Hashable, dtype: Optional[bigframes.dtypes.Dtype] = None
) -> Expression:
return ScalarConstantExpression(value, dtype)
def const(value: typing.Hashable, dtype: dtypes.ExpressionType = None) -> Expression:
return ScalarConstantExpression(value, dtype or dtypes.infer_literal_type(value))


def free_var(id: str) -> Expression:
Expand All @@ -45,9 +42,16 @@ def unbound_variables(self) -> typing.Tuple[str, ...]:
def rename(self, name_mapping: dict[str, str]) -> Expression:
return self

@abc.abstractproperty
@property
@abc.abstractmethod
def is_const(self) -> bool:
return False
...

@abc.abstractmethod
def output_type(
self, input_types: dict[str, dtypes.ExpressionType]
) -> dtypes.ExpressionType:
...


@dataclasses.dataclass(frozen=True)
Expand All @@ -56,12 +60,17 @@ class ScalarConstantExpression(Expression):

# TODO: Further constrain?
value: typing.Hashable
dtype: Optional[bigframes.dtypes.Dtype] = None
dtype: dtypes.ExpressionType = None

@property
def is_const(self) -> bool:
return True

def output_type(
self, input_types: dict[str, bigframes.dtypes.Dtype]
) -> dtypes.ExpressionType:
return self.dtype


@dataclasses.dataclass(frozen=True)
class UnboundVariableExpression(Expression):
Expand All @@ -83,6 +92,14 @@ def rename(self, name_mapping: dict[str, str]) -> Expression:
def is_const(self) -> bool:
return False

def output_type(
self, input_types: dict[str, bigframes.dtypes.Dtype]
) -> dtypes.ExpressionType:
if self.id in input_types:
return input_types[self.id]
else:
raise ValueError("Type of variable has not been fixed.")


@dataclasses.dataclass(frozen=True)
class OpExpression(Expression):
Expand Down Expand Up @@ -110,3 +127,11 @@ def rename(self, name_mapping: dict[str, str]) -> Expression:
@property
def is_const(self) -> bool:
return all(child.is_const for child in self.inputs)

def output_type(
self, input_types: dict[str, dtypes.ExpressionType]
) -> dtypes.ExpressionType:
operand_types = tuple(
map(lambda x: x.output_type(input_types=input_types), self.inputs)
)
return self.op.output_type(*operand_types)
65 changes: 62 additions & 3 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import geopandas as gpd # type: ignore
import google.cloud.bigquery as bigquery
import ibis
from ibis.backends.bigquery.datatypes import BigQueryType
import ibis.expr.datatypes as ibis_dtypes
from ibis.expr.datatypes.core import dtype as python_type_to_bigquery_type
import ibis.expr.types as ibis_types
import numpy as np
import pandas as pd
Expand All @@ -42,6 +44,14 @@
pd.ArrowDtype,
gpd.array.GeometryDtype,
]
# Represents both column types (dtypes) and local-only types
# None represents the type of a None scalar.
ExpressionType = typing.Optional[Dtype]

INT_DTYPE = pd.Int64Dtype()
FLOAT_DTYPE = pd.Float64Dtype()
BOOL_DTYPE = pd.BooleanDtype()
STRING_DTYPE = pd.StringDtype(storage="pyarrow")

# On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable
UNORDERED_DTYPES = [gpd.array.GeometryDtype()]
Expand Down Expand Up @@ -539,31 +549,80 @@ def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]:
return lcd_type(pd.Int64Dtype(), dtype)
if isinstance(scalar, decimal.Decimal):
# TODO: Check context to see if can use NUMERIC instead of BIGNUMERIC
return lcd_type(pd.ArrowDtype(pa.decimal128(76, 38)), dtype)
return lcd_type(pd.ArrowDtype(pa.decimal256(76, 38)), dtype)
return None


def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
def lcd_type(dtype1: Dtype, dtype2: Dtype) -> Dtype:
if dtype1 == dtype2:
return dtype1
# Implicit conversion currently only supported for numeric types
hierarchy: list[Dtype] = [
pd.BooleanDtype(),
pd.Int64Dtype(),
pd.Float64Dtype(),
pd.ArrowDtype(pa.decimal128(38, 9)),
pd.ArrowDtype(pa.decimal256(76, 38)),
pd.Float64Dtype(),
]
if (dtype1 not in hierarchy) or (dtype2 not in hierarchy):
return None
lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2))
return hierarchy[lcd_index]


def lcd_etype(etype1: ExpressionType, etype2: ExpressionType) -> ExpressionType:
if etype1 is None:
return etype2
if etype2 is None:
return etype1
return lcd_type_or_throw(etype1, etype2)


def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
result = lcd_type(dtype1, dtype2)
if result is None:
raise NotImplementedError(
f"BigFrames cannot upcast {dtype1} and {dtype2} to common type. {constants.FEEDBACK_LINK}"
)
return result


def infer_literal_type(literal) -> typing.Optional[Dtype]:
if pd.isna(literal):
return None # Null value without a definite type
# Temporary logic, use ibis inferred type
ibis_literal = literal_to_ibis_scalar(literal)
return ibis_dtype_to_bigframes_dtype(ibis_literal.type())


# Input and output types supported by BigQuery DataFrames remote functions.
# TODO(shobs): Extend the support to all types supported by BQ remote functions
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
SUPPORTED_IO_PYTHON_TYPES = {bool, float, int, str}
SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
"BOOLEAN",
"BOOL",
"FLOAT",
"FLOAT64",
"INT64",
"INTEGER",
"STRING",
}


class UnsupportedTypeError(ValueError):
def __init__(self, type_, supported_types):
self.type = type_
self.supported_types = supported_types


def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType:
if t not in SUPPORTED_IO_PYTHON_TYPES:
raise UnsupportedTypeError(t, SUPPORTED_IO_PYTHON_TYPES)
return python_type_to_bigquery_type(t)


def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType:
if tk not in SUPPORTED_IO_BIGQUERY_TYPEKINDS:
raise UnsupportedTypeError(tk, SUPPORTED_IO_BIGQUERY_TYPEKINDS)
return BigQueryType.to_ibis(tk)
50 changes: 12 additions & 38 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,19 @@
from ibis.backends.bigquery.compiler import compiles
from ibis.backends.bigquery.datatypes import BigQueryType
from ibis.expr.datatypes.core import DataType as IbisDataType
from ibis.expr.datatypes.core import dtype as python_type_to_bigquery_type
import ibis.expr.operations as ops
import ibis.expr.rules as rlz

from bigframes import clients
import bigframes.constants as constants
import bigframes.dtypes

logger = logging.getLogger(__name__)

# Protocol version 4 is available in python version 3.4 and above
# https://docs.python.org/3/library/pickle.html#data-stream-format
_pickle_protocol_version = 4

# Input and output types supported by BigQuery DataFrames remote functions.
# TODO(shobs): Extend the support to all types supported by BQ remote functions
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
SUPPORTED_IO_PYTHON_TYPES = {bool, float, int, str}
SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
"BOOLEAN",
"BOOL",
"FLOAT",
"FLOAT64",
"INT64",
"INTEGER",
"STRING",
}


def get_remote_function_locations(bq_location):
"""Get BQ location and cloud functions region given a BQ client."""
Expand Down Expand Up @@ -558,33 +544,17 @@ def f(*args, **kwargs):
return f


class UnsupportedTypeError(ValueError):
def __init__(self, type_, supported_types):
self.type = type_
self.supported_types = supported_types


def ibis_type_from_python_type(t: type) -> IbisDataType:
if t not in SUPPORTED_IO_PYTHON_TYPES:
raise UnsupportedTypeError(t, SUPPORTED_IO_PYTHON_TYPES)
return python_type_to_bigquery_type(t)


def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> IbisDataType:
if tk not in SUPPORTED_IO_BIGQUERY_TYPEKINDS:
raise UnsupportedTypeError(tk, SUPPORTED_IO_BIGQUERY_TYPEKINDS)
return BigQueryType.to_ibis(tk)


def ibis_signature_from_python_signature(
signature: inspect.Signature,
input_types: Sequence[type],
output_type: type,
) -> IbisSignature:
return IbisSignature(
parameter_names=list(signature.parameters.keys()),
input_types=[ibis_type_from_python_type(t) for t in input_types],
output_type=ibis_type_from_python_type(output_type),
input_types=[
bigframes.dtypes.ibis_type_from_python_type(t) for t in input_types
],
output_type=bigframes.dtypes.ibis_type_from_python_type(output_type),
)


Expand All @@ -599,10 +569,14 @@ def ibis_signature_from_routine(routine: bigquery.Routine) -> IbisSignature:
return IbisSignature(
parameter_names=[arg.name for arg in routine.arguments],
input_types=[
ibis_type_from_type_kind(arg.data_type.type_kind) if arg.data_type else None
bigframes.dtypes.ibis_type_from_type_kind(arg.data_type.type_kind)
if arg.data_type
else None
for arg in routine.arguments
],
output_type=ibis_type_from_type_kind(routine.return_type.type_kind),
output_type=bigframes.dtypes.ibis_type_from_type_kind(
routine.return_type.type_kind
),
)


Expand Down Expand Up @@ -908,7 +882,7 @@ def read_gbq_function(
raise ValueError(
"Function return type must be specified. {constants.FEEDBACK_LINK}"
)
except UnsupportedTypeError as e:
except bigframes.dtypes.UnsupportedTypeError as e:
raise ValueError(
f"Type {e.type} not supported, supported types are {e.supported_types}. "
f"{constants.FEEDBACK_LINK}"
Expand Down
Loading
0