8000 [DRAFT] feat: add test proxy for v3 client by daniel-sanche · Pull Request #747 · googleapis/python-bigtable · GitHub
[go: up one dir, main page]

Skip to content

[DRAFT] feat: add test proxy for v3 client #747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 503 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
503 commits
Select commit Hold shift + click to select a range
2e50c51
added tests for request stats
daniel-sanche Apr 6, 2023
0b63b2b
added tests for exceptions
daniel-sanche Apr 6, 2023
de102bb
clean up on_error
daniel-sanche Apr 6, 2023
bbdb8e6
await sleep
daniel-sanche Apr 6, 2023
83472dc
got tests working
daniel-sanche Apr 6, 2023
bef40bd
updated api-core
daniel-sanche Apr 6, 2023
29a98ed
Merge branch 'v3' into read_rows_retries
daniel-sanche Apr 6, 2023
534005a
ran blacken
daniel-sanche Apr 6, 2023
6f1c781
made invalid chunk a server error
daniel-sanche Apr 6, 2023
38f66e5
moved invalid chunk with other exceptions
daniel-sanche Apr 6, 2023
bf24c25
made row merger and classes private
daniel-sanche Apr 6, 2023
4dbacb5
added read_rows
daniel-sanche Apr 6, 2023
6e6978e
ran blacken
daniel-sanche Apr 6, 2023
21f7846
added comments
daniel-sanche Apr 6, 2023
52e9dbf
added test for revise rowset
daniel-sanche Apr 6, 2023
715be51
fixed lint issues
daniel-sanche Apr 6, 2023
2f50cb7
moved ReadRowsIterator into new file
daniel-sanche Apr 6, 2023
1486d5a
Merge branch 'v3' into add_new_transport
daniel-sanche Apr 6, 2023
28d5a7a
fixed lint issues
daniel-sanche Apr 6, 2023
3b11580
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 6, 2023
d47c941
changed comment
daniel-sanche Apr 6, 2023
d1bd128
added comments to iterator
daniel-sanche Apr 6, 2023
039d623
added var for idle timeout
daniel-sanche Apr 6, 2023
3d34dcd
sped up acceptance tests
daniel-sanche Apr 6, 2023
0125474
Merge branch 'read_rows_retries' into test_proxy
daniel-sanche Apr 6, 2023
a764fef
made adjustments to use live client
daniel-sanche Apr 6, 2023
738ae4b
added empty implementations for all functions
daniel-sanche Apr 6, 2023
70fbff9
reduced size of template by making subclass
daniel-sanche Apr 7, 2023
383d8eb
reverted unintentional gapic generation changes
daniel-sanche Apr 7, 2023
018fe03
updated submodule
daniel-sanche Apr 7, 2023
3764a98
added default timeouts to table surface
daniel-sanche Apr 7, 2023
95e178b
Merge branch 'add_new_transport' into test_proxy
daniel-sanche Apr 7, 2023
3d98c3e
allow connecting to emulator
daniel-sanche Apr 7, 2023
2e38464
fixed function name
daniel-sanche Apr 7, 2023
1a1a365
return row data from proxy
daniel-sanche Apr 7, 2023
4ed6621
return pbs
daniel-sanche Apr 7, 2023
f028617
add default timeout
daniel-sanche Apr 7, 2023
8e27c1d
fixed issue with exception handling
daniel-sanche Apr 7, 2023
9f20df7
added missing methods
daniel-sanche Apr 7, 2023
8e9cf9b
return proto from remove client
daniel-sanche Apr 12, 2023
01169e6
include error message in status
daniel-sanche Apr 12, 2023
a0c199f
improve json formatting to include ints
daniel-sanche Apr 12, 2023
78994ac
set project id in client
daniel-sanche Apr 12, 2023
b334821
changed default timezone
daniel-sanche Apr 12, 2023
4f40cc7
improved json parsing
daniel-sanche Apr 12, 2023
c285d51
run tests in parallel
daniel-sanche Apr 12, 2023
ea1433d
improved dict formatting
daniel-sanche Apr 12, 2023
276ddd7
set headers in line with conformance tests
daniel-sanche Apr 13, 2023
78e2c3b
filter out warnings
daniel-sanche Apr 13, 2023
5c1be38
added extra prints
daniel-sanche Apr 13, 2023
745ae38
end after row_limit rows
daniel-sanche Apr 13, 2023
3d11d55
changed retryable exceptions
daniel-sanche Apr 13, 2023
f0403e7
changed warning stack level
daniel-sanche Apr 13, 2023
84a775a
changed retryable errors
daniel-sanche Apr 13, 2023
15a9d23
improved comments
daniel-sanche Apr 13, 2023
8636654
improved idle timeouts
daniel-sanche Apr 13, 2023
1aca392
changed retry parameters
daniel-sanche Apr 13, 2023
45fef1e
added limit revision to each retry
daniel-sanche Apr 13, 2023
951a77b
removed unneeded check
daniel-sanche Apr 13, 2023
e3a0b66
fixed idle timeout test
daniel-sanche Apr 13, 2023
6089934
removed tracking of emitted rows
daniel-sanche Apr 13, 2023
f5fb36d
Merge branch 'read_rows_retries' into test_proxy
daniel-sanche Apr 13, 2023
8a2cd17
core restructuring
daniel-sanche Apr 13, 2023
918072d
reorganizing files
daniel-sanche Apr 13, 2023
15682a5
renamed file
daniel-sanche Apr 13, 2023
f1d57af
use argparse for arguments
daniel-sanche Apr 13, 2023
2f5c232
added legacy client implementation
daniel-sanche Apr 13, 2023
d243d9c
fixed legacy client formatting
daniel-sanche Apr 13, 2023
fb4b0ca
removed revise_on_retry flag
daniel-sanche Apr 14, 2023
83b908c
changed initial sleep
daniel-sanche Apr 14, 2023
5688561
added extra timeout check
daniel-sanche Apr 14, 2023
7f57e7c
implemented sample_keys
daniel-sanche Apr 14, 2023
a6a140b
initial implementation of query.shard
daniel-sanche Apr 14, 2023
0f03aea
added read_rows_sharded implementation
daniel-sanche Apr 14, 2023
cfa181d
fixed bugs in implementation
daniel-sanche Apr 14, 2023
e8007c8
added str and equal to query and range
daniel-sanche Apr 14, 2023
e190dc6
added a test for sharding
daniel-sanche Apr 14, 2023
5aa89da
got first set of tests passing
daniel-sanche Apr 15, 2023
a565f47
added table scan test
daniel-sanche Apr 15, 2023
7041dfd
added more tests
daniel-sanche Apr 15, 2023
2f7973a
made row ranges into set
daniel-sanche Apr 15, 2023
872480f
added unsorted test
daniel-sanche Apr 15, 2023
47be958
ran blacken
daniel-sanche Apr 15, 2023
c945687
fixed mypy issues
daniel-sanche Apr 15, 2023
b0dbaed
fixed lint issues
daniel-sanche Apr 15, 2023
53878a9
fixed bug in from_dict
daniel-sanche Apr 15, 2023
ff2dfca
fixed tests
daniel-sanche Apr 17, 2023
ff3724d
removed outdated test
daniel-sanche Apr 17, 2023
78a309c
fixed type annotations
daniel-sanche Apr 17, 2023
c50ae18
added slots
daniel-sanche Apr 17, 2023
d73121b
renamed cache to buffer
daniel-sanche Apr 17, 2023
14d8527
renamed errors
daniel-sanche Apr 17, 2023
4b89c86
replaced type check with None check
daniel-sanche Apr 17, 2023
9f89577
added comment for last_scanned_row heartbeat
daniel-sanche Apr 17, 2023
4b229b9
added early return
daniel-sanche Apr 17, 2023
152bccf
moved validation
daniel-sanche Apr 17, 2023
67c2911
added close call to ReadRowsIterator
daniel-sanche Apr 18, 2023
ff11ad3
removed del
daniel-sanche Apr 18, 2023
78bd5d3
pull out buffer control logic
daniel-sanche Apr 18, 2023
ca4a16d
got buffering working
daniel-sanche Apr 18, 2023
0dba121
check for full table scan revision
daniel-sanche Apr 18, 2023
3537566
renamed and added underscores
daniel-sanche Apr 18, 2023
981f169
added extra check
daniel-sanche Apr 18, 2023
d3d4c76
removed unneeded validation
daniel-sanche Apr 18, 2023
1901094
renamed RowMerger to ReadRowsOperation
daniel-sanche Apr 18, 2023
947fe9b
changed _read_rows test file name
daniel-sanche Apr 18, 2023
773d4e5
added row builder tests
daniel-sanche Apr 18, 2023
cbb0513
added revise_row tests
daniel-sanche Apr 19, 2023
2bec693
ran blacken
daniel-sanche Apr 19, 2023
5cd8e00
added constructor tests
daniel-sanche Apr 19, 2023
d6f3ae1
upgraded submodule
daniel-sanche Apr 19, 2023
f2d7e71
added tests
daniel-sanche Apr 19, 2023
cb23d32
update docstring
daniel-sanche Apr 19, 2023
bc31ab8
update docstring
daniel-sanche Apr 19, 2023
f54dfde
fix typo
daniel-sanche Apr 19, 2023
46cfc49
docstring improvements
daniel-sanche Apr 19, 2023
573bbd1
made creating table outside loop into error
daniel-sanche Apr 19, 2023
4f2657d
make tables own active instances, and remove instances when tables close
daniel-sanche Apr 19, 2023
59955be
added pool_size and channels as public properties
daniel-sanche Apr 19, 2023
377a8c9
fixed typo
daniel-sanche Apr 19, 2023
8a29898
simplified pooled multicallable
daniel-sanche Apr 20, 2023
50aa5ba
ran blacken
daniel-sanche Apr 20, 2023
42a52a3
associate ids with instances, instead of Table objects
daniel-sanche Apr 20, 2023
abc7a2d
fixed tests
daniel-sanche Apr 20, 2023
836af0f
made sure that empty strings are valid family and qualifier inputs
daniel-sanche Apr 20, 2023
e73551d
added tests for state machine
daniel-sanche Apr 20, 2023
792aba1
added state machine tests
daniel-sanche Apr 20, 2023
e57c510
fixed broken mock
daniel-sanche Apr 20, 2023
88748a9
added additional tests
daniel-sanche Apr 20, 2023
0c38981
ran blacken
daniel-sanche Apr 20, 2023
50dc608
reverted pooled multicallable changes
daniel-sanche Apr 20, 2023
b116755
pass scopes to created channels
daniel-sanche Apr 21, 2023
ec5eb07
added basic ping system test
daniel-sanche Apr 21, 2023
55cdcc2
keep both the names and ids in table object
daniel-sanche Apr 21, 2023
0253692
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 21, 2023
3855333
added api-core to noxfile tests
daniel-sanche Apr 21, 2023
213519e
added basic read rows stream to system tests
daniel-sanche Apr 21, 2023
9e3b411
pull project details out of env vars
daniel-sanche Apr 21, 2023
d8cf158
added automatic row creation for system tests
daniel-sanche Apr 21, 2023
c9b8217
added read_rows non stream
daniel-sanche Apr 21, 2023
500eff0
added range query system test
daniel-sanche Apr 21, 2023
27130f0
added logic for temporary test tables and instances
daniel-sanche Apr 21, 2023
f4f4fac
made iterator active into a property
daniel-sanche Apr 21, 2023
06dee54
added more read_rows system tests
daniel-sanche Apr 21, 2023
9e11f88
fixed lint issues
daniel-sanche Apr 21, 2023
794c55a
added iterator tests
daniel-sanche Apr 21, 2023
ccd9545
added tests for timeouts
daniel-sanche Apr 21, 2023
ca84b96
ran black
daniel-sanche Apr 21, 2023
eb936cf
fixed lint issues
daniel-sanche Apr 21, 2023
ab43138
restructured test_client
daniel-sanche Apr 21, 2023
cb1884d
changed how random is mocked
daniel-sanche Apr 21, 2023
9a89d74
ran black
daniel-sanche Apr 21, 2023
7f783fc
restructred test_client
daniel-sanche Apr 21, 2023
6a6d219
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 21, 2023
72eca75
restructured test_client_read_rows
daniel-sanche Apr 21, 2023
ad42436
moved read rows tests in test_client
daniel-sanche Apr 21, 2023
7606e3a
update submodules in nox
daniel-sanche Apr 22, 2023
829e68f
ran black
daniel-sanche Apr 22, 2023
e8eff39
Merge branch 'v3' into read_rows_retries
daniel-sanche Apr 24, 2023
6a58e86
removed submodule update
daniel-sanche Apr 24, 2023
9be5b07
removed unneeded import
daniel-sanche Apr 24, 2023
1cafbd6
Merge branch 'read_rows_retries' into test_proxy
daniel-sanche Apr 24, 2023
b9f4309
fixed tests
daniel-sanche Apr 24, 2023
966389f
ran blacken
daniel-sanche Apr 24, 2023
4f819b2
Merge branch 'read_rows_retries' into sharded_read_rows
daniel-sanche Apr 24, 2023
1d02154
added initial implementation of mutate_rows
daniel-sanche Apr 24, 2023
ab63cba
implemented mutation models
daniel-sanche Apr 24, 2023
cf9daa5
added retries to mutate_row
daniel-sanche Apr 24, 2023
1247da4
return exception group if possible
daniel-sanche Apr 24, 2023
3b3ed8c
check for idempotence
daniel-sanche Apr 24, 2023
5d20037
initial implementation for bulk_mutations
daniel-sanche Apr 24, 2023
3d322a1
include successes in bulk mutation error message
daniel-sanche Apr 24, 2023
a31232b
fixed style checks
daniel-sanche Apr 24, 2023
8da2d65
added basic system tests
daniel-sanche Apr 24, 2023
2b89d9c
added unit tests for mutate_row
daniel-sanche Apr 25, 2023
47c5985
ran blacken
daniel-sanche Apr 25, 2023
38fdcd7
improved exceptions
daniel-sanche Apr 25, 2023
504d2d8
added bulk_mutate_rows unit tests
daniel-sanche Apr 25, 2023
b16067f
ran blacken
daniel-sanche Apr 25, 2023
3ab1405
support __new___ for exceptions for python3.11+
daniel-sanche Apr 25, 2023
0a6c0c6
added exception unit tests
daniel-sanche Apr 25, 2023
ec043cf
makde exceptions tuple
daniel-sanche Apr 26, 2023
518530e
got exceptions to print consistently across versions
daniel-sanche Apr 26, 2023
9624729
added test for 311 rich traceback
daniel-sanche Apr 27, 2023
3087081
moved retryable row mutations to new file
daniel-sanche Apr 27, 2023
9df588f
use index map
daniel-sanche Apr 27, 2023
7ed8be3
added docstring
daniel-sanche Apr 27, 2023
2536cc4
added predicate check to failed mutations
daniel-sanche Apr 27, 2023
1f6875c
added _mutate_rows tests
daniel-sanche Apr 27, 2023
1ea24e6
improved client tests
daniel-sanche Apr 27, 2023
25ca2d2
refactored to loop by raising exception
daniel-sanche Apr 28, 2023
c0787db
refactored retry deadline logic into shared wrapper
daniel-sanche Apr 28, 2023
3ed5c3d
ran black
daniel-sanche Apr 28, 2023
a91fbcb
pulled in table default timeouts
daniel-sanche Apr 28, 2023
df8a058
added tests for shared deadline parsing function
daniel-sanche Apr 28, 2023
b866b57
added tests for mutation models
daniel-sanche Apr 28, 2023
54a4d43
fixed linter errors
daniel-sanche Apr 28, 2023
bd51dc4
added tests for BulkMutationsEntry
daniel-sanche Apr 28, 2023
921b05a
improved mutations documentation
daniel-sanche Apr 28, 2023
82ea61f
refactored mutate_rows logic into helper function
daniel-sanche May 2, 2023
fa42b86
implemented callbacks for mutate_rows
daniel-sanche May 2, 2023
01a16f3
made exceptions into a tuple
daniel-sanche May 5, 2023
6140acb
remove aborted from retryable errors
daniel-sanche May 22, 2023
36ba2b6
improved SetCell mutation
daniel-sanche May 22, 2023
b3c9017
fixed mutations tests
daniel-sanche May 22, 2023
cac9e2d
SetCell timestamps use millisecond precision
daniel-sanche May 22, 2023
34b051f
renamed BulkMutationsEntry to RowMutationEntry
daniel-sanche May 22, 2023
2956700
Merge branch 'v3' into test_proxy
daniel-sanche May 24, 2023
f476ad7
Merge branch 'v3' into sharded_read_rows
daniel-sanche May 24, 2023
63ac35c
Merge branch 'v3' into mutate_rows
daniel-sanche May 24, 2023
653cb2e
Merge branch 'mutate_rows' into test_proxy
daniel-sanche May 25, 2023
f8d13b7
test proxy supports mutate row and bulk mutate rows
daniel-sanche May 25, 2023
2324638
improved error propagation
daniel-sanche May 25, 2023
822a2d8
decode row keys
daniel-sanche May 25, 2023
4c97759
use grpc_code where possible
daniel-sanche May 25, 2023
8eb2e57
fixed proto field
daniel-sanche May 25, 2023
c8d85be
report status as expected for BulkMutate
daniel-sanche May 25, 2023
c1f2736
updated legacy handler for mutate rows
daniel-sanche May 26, 2023
62dcbb5
cleaned up read_rows_sharded function
daniel-sanche May 30, 2023
b4a95b3
refactored read_rows_query tests to match other files
daniel-sanche May 30, 2023
5972722
finished read_rows_query tests
daniel-sanche May 30, 2023
7c1643c
fixed issue with ping and warm
daniel-sanche May 30, 2023
a39d931
added tests for sharded queries
daniel-sanche May 30, 2023
482eed9
added new exception type for sharded rpcs
daniel-sanche May 30, 2023
faec93e
added test for concurrency
daniel-sanche May 30, 2023 8000
6f6e010
removed subclass for sharded tests
daniel-sanche May 30, 2023
a005ec8
added sample_key samples
daniel-sanche May 30, 2023
dd10624
added system tests
daniel-sanche May 30, 2023
82789ec
refactoring shard function
daniel-sanche May 31, 2023
d39fd0f
added extra checks to query class
daniel-sanche May 31, 2023
7e26d40
fixed comment
daniel-sanche May 31, 2023
34aea1a
added extra docstring
daniel-sanche May 31, 2023
42cac01
renamed sample_keys to sample_row_keys
daniel-sanche May 31, 2023
632a106
Merge branch 'v3' into sharded_read_rows
daniel-sanche Jun 6, 2023
05a311e
added metadata to sample_row_keys
daniel-sanche Jun 6, 2023
f53af32
changed shard points to be range ends instead of starts
daniel-sanche Jun 6, 2023
ac4378d
added concurrency limit
daniel-sanche Jun 7, 2023
9eaa279
added retries for sample_keys
daniel-sanche Jun 7, 2023
6cca7cf
cleaned up code block
daniel-sanche Jun 15, 2023
88e88d4
documented and simplified sharding function
daniel-sanche Jun 15, 2023
26ffe0c
Merge branch 'v3' into sharded_read_rows
daniel-sanche Jun 18, 2023
9302286
split row_range sharding into own helper
daniel-sanche Jun 18, 2023
ad1e1c4
Merge branch 'v3' into test_proxy
daniel-sanche Jun 19, 2023
9025c0f
implemented check and mutate tests
daniel-sanche Jun 19, 2023
967e48f
added read_row
daniel-sanche Jun 19, 2023
75b029f
added read_modify_write tests
daniel-sanche Jun 19, 2023
a4efd7d
Merge branch 'sharded_read_rows' into test_proxy
daniel-sanche Jun 19, 2023
8a2556a
don't start background processes when in emulator mode
daniel-sanche Jun 19, 2023
d72caed
implemented sample_row_keys tests
daniel-sanche Jun 19, 2023
ec85af2
fixed bytes handling
daniel-sanche Jun 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 149 additions & 35 deletions google/cloud/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
Any,
Optional,
Set,
Callable,
Coroutine,
TYPE_CHECKING,
)

Expand All @@ -31,15 +29,20 @@
import warnings
import sys
import random
import os
from itertools import chain

from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient
from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
PooledBigtableGrpcAsyncIOTransport,
PooledChannel,
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
from google.cloud.client import ClientWithProject
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
from google.api_core import retry_async as retries
from google.api_core import exceptions as core_exceptions
from google.cloud.bigtable._read_rows import _ReadRowsOperation
Expand All @@ -50,10 +53,14 @@
from google.cloud.bigtable.row import Row
from google.cloud.bigtable.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.iterators import ReadRowsIterator
from google.cloud.bigtable.exceptions import FailedQueryShardError
from google.cloud.bigtable.exceptions import ShardedReadRowsExceptionGroup

from google.cloud.bigtable.mutations import Mutation, RowMutationEntry
from google.cloud.bigtable._mutate_rows import _MutateRowsOperation
from google.cloud.bigtable._helpers import _make_metadata
from google.cloud.bigtable._helpers import _convert_retry_deadline
from google.cloud.bigtable._helpers import _attempt_timeout_generator

from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule
from google.cloud.bigtable.row_filters import RowFilter
Expand All @@ -65,6 +72,9 @@
from google.cloud.bigtable.mutations_batcher import MutationsBatcher
from google.cloud.bigtable import RowKeySamples

# used by read_rows_sharded to limit how many requests are attempted in parallel
CONCURRENCY_LIMIT = 10


class BigtableDataClient(ClientWithProject):
def __init__(
Expand Down Expand Up @@ -107,6 +117,7 @@ def __init__(
# set up client info headers for veneer library
client_info = DEFAULT_CLIENT_INFO
client_info.client_library_version = client_info.gapic_version
client_info.user_agent = f"bigtable-python/{client_info.client_library_version}"
# parse client options
if type(client_options) is dict:
client_options = client_options_lib.from_dict(client_options)
Expand Down Expand Up @@ -137,23 +148,41 @@ def __init__(
# attempt to start background tasks
self._channel_init_time = time.time()
self._channel_refresh_tasks: list[asyncio.Task[None]] = []
try:
self.start_background_channel_refresh()
except RuntimeError:
self._emulator_host = os.getenv(BIGTABLE_EMULATOR)
if self._emulator_host is not None:
# connect to an emulator host
warnings.warn(
f"{self.__class__.__name__} should be started in an "
"asyncio event loop. Channel refresh will not be started",
"Connecting to Bigtable emulator at {}".format(self._emulator_host),
RuntimeWarning,
stacklevel=2,
)
self.transport._grpc_channel = PooledChannel(
pool_size=pool_size,
host=self._emulator_host,
insecure=True,
)
# refresh cached stubs to use emulator pool
self.transport._stubs = {}
self.transport._prep_wrapped_messages(client_info)
else:
# attempt to start background channel refresh tasks
try:
self.start_background_channel_refresh()
except RuntimeError:
warnings.warn(
f"{self.__class__.__name__} should be started in an "
"asyncio event loop. Channel refresh will not be started",
RuntimeWarning,
stacklevel=2,
)

def start_background_channel_refresh(self) -> None:
"""
Starts a background task to ping and warm each channel in the pool
Raises:
- RuntimeError if not called in an asyncio event loop
"""
if not self._channel_refresh_tasks:
if not self._channel_refresh_tasks and not self._emulator_host:
# raise RuntimeError if there is no event loop
asyncio.get_running_loop()
for channel_idx in range(self.transport.pool_size):
Expand Down Expand Up @@ -190,10 +219,13 @@ async def _ping_and_warm_instances(
- sequence of results or exceptions from the ping requests
"""
ping_rpc = channel.unary_unary(
"/google.bigtable.v2.Bigtable/PingAndWarmChannel"
"/google.bigtable.v2.Bigtable/PingAndWarm",
request_serializer=PingAndWarmRequest.serialize,
)
tasks = [ping_rpc({"name": n}) for n in self._active_instances]
return await asyncio.gather(*tasks, return_exceptions=True)
result = await asyncio.gather(*tasks, return_exceptions=True)
# return None in place of empty successful responses
return [r or None for r in result]

async def _manage_channel(
self,
Expand Down Expand Up @@ -534,20 +566,59 @@ async def read_rows_sharded(
self,
query_list: list[ReadRowsQuery] | list[dict[str, Any]],
*,
limit: int | None,
operation_timeout: int | float | None = 60,
operation_timeout: int | float | None = None,
per_request_timeout: int | float | None = None,
) -> ReadRowsIterator:
) -> list[Row]:
"""
Runs a sharded query in parallel
Runs a sharded query in parallel, then return the results in a single list.
Results will be returned in the order of the input queries.

Each query in query list will be run concurrently, with results yielded as they are ready
yielded results may be out of order
This function is intended to be run on the results on a query.shard() call:

```
table_shard_keys = await table.sample_row_keys()
query = ReadRowsQuery(...)
shard_queries = query.shard(table_shard_keys)
results = await table.read_rows_sharded(shard_queries)
```

Args:
- query_list: a list of queries to run in parallel
Raises:
- ShardedReadRowsExceptionGroup: if any of the queries failed
- ValueError: if the query_list is empty
"""
raise NotImplementedError
if not query_list:
raise ValueError("query_list must contain at least one query")
routine_list = [
self.read_rows(
query,
operation_timeout=operation_timeout,
per_request_timeout=per_request_timeout,
)
for query in query_list
]
# submit requests in batches to limit concurrency
batched_routines = [
routine_list[i : i + CONCURRENCY_LIMIT]
for i in range(0, len(routine_list), CONCURRENCY_LIMIT)
]
# run batches and collect results
results_list = []
for batch in batched_routines:
batch_result = await asyncio.gather(*batch, return_exceptions=True)
results_list.extend(batch_result)
# collect exceptions
exception_list = [
FailedQueryShardError(idx, query_list[idx], e)
for idx, e in enumerate(results_list)
if isinstance(e, Exception)
]
if exception_list:
# if any sub-request failed, raise an exception instead of returning results
raise ShardedReadRowsExceptionGroup(exception_list, len(query_list))
combined_list = list(chain.from_iterable(results_list))
return combined_list

async def row_exists(
self,
Expand Down Expand Up @@ -577,32 +648,81 @@ async def row_exists(
)
return len(results) > 0

async def sample_keys(
async def sample_row_keys(
self,
*,
operation_timeout: int | float | None = 60,
per_sample_timeout: int | float | None = 10,
per_request_timeout: int | float | None = None,
operation_timeout: float | None = None,
per_request_timeout: float | None = None,
) -> RowKeySamples:
"""
Return a set of RowKeySamples that delimit contiguous sections of the table of
approximately equal size

RowKeySamples output can be used with ReadRowsQuery.shard() to create a sharded query that
can be parallelized across multiple backend nodes read_rows and read_rows_stream
requests will call sample_keys internally for this purpose when sharding is enabled
requests will call sample_row_keys internally for this purpose when sharding is enabled

RowKeySamples is simply a type alias for list[tuple[bytes, int]]; a list of
row_keys, along with offset positions in the table

Returns:
- a set of RowKeySamples the delimit contiguous sections of the table
Raises:
- DeadlineExceeded: raised after operation timeout
will be chained with a RetryExceptionGroup containing all GoogleAPIError
exceptions from any retries that failed
- GoogleAPICallError: if the sample_row_keys request fails
"""
raise NotImplementedError
# prepare timeouts
operation_timeout = operation_timeout or self.default_operation_timeout
per_request_timeout = per_request_timeout or self.default_per_request_timeout

if operation_timeout <= 0:
raise ValueError("operation_timeout must be greater than 0")
if per_request_timeout is not None and per_request_timeout <= 0:
raise ValueError("per_request_timeout must be greater than 0")
if per_request_timeout is not None and per_request_timeout > operation_timeout:
raise ValueError(
"per_request_timeout must not be greater than operation_timeout"
)
attempt_timeout_gen = _attempt_timeout_generator(
per_request_timeout, operation_timeout
)
# prepare retryable
predicate = retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
)
transient_errors = []

def on_error_fn(exc):
# add errors to list if retryable
if predicate(exc):
transient_errors.append(exc)

retry = retries.AsyncRetry(
predicate=predicate,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
on_error=on_error_fn,
is_stream=False,
)

# prepare request
metadata = _make_metadata(self.table_name, self.app_profile_id)

async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=next(attempt_timeout_gen),
metadata=metadata,
)
return [(s.row_key, s.offset_bytes) async for s in results]

wrapped_fn = _convert_retry_deadline(
retry(execute_rpc), operation_timeout, transient_errors
)
return await wrapped_fn()

def mutations_batcher(self, **kwargs) -> MutationsBatcher:
"""
Expand Down Expand Up @@ -712,10 +832,6 @@ async def bulk_mutate_rows(
*,
operation_timeout: float | None = 60,
per_request_timeout: float | None = None,
on_success: Callable[
[int, RowMutationEntry], None | Coroutine[None, None, None]
]
| None = None,
):
"""
Applies mutations for multiple rows in a single batched request.
Expand All @@ -741,9 +857,6 @@ async def bulk_mutate_rows(
in seconds. If it takes longer than this time to complete, the request
will be cancelled with a DeadlineExceeded exception, and a retry will
be attempted if within operation_timeout budget
- on_success: a callback function that will be called when each mutation
entry is confirmed to be applied successfully. Will be passed the
index and the entry itself.
Raises:
- MutationsExceptionGroup if one or more mutations fails
Contains details about any failed entries in .exceptions
Expand Down Expand Up @@ -896,16 +1009,17 @@ async def close(self):
"""
Called to close the Table instance and release any resources held by it.
"""
self._register_instance_task.cancel()
await self.client._remove_instance_registration(self.instance_id, self)

async def __aenter__(self):
"""
Implement async context manager protocol

Register this instance with the client, so that
Ensure registration task has time to run, so that
grpc channels will be warmed for the specified instance
"""
await self.client._register_instance(self.instance_id, self)
await self._register_instance_task
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand Down
39 changes: 38 additions & 1 deletion google/cloud/bigtable/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import sys

from typing import TYPE_CHECKING
from typing import Any, TYPE_CHECKING

from google.api_core import exceptions as core_exceptions

is_311_plus = sys.version_info >= (3, 11)

if TYPE_CHECKING:
from google.cloud.bigtable.mutations import RowMutationEntry
from google.cloud.bigtable.read_rows_query import ReadRowsQuery


class IdleTimeout(core_exceptions.DeadlineExceeded):
Expand Down Expand Up @@ -137,3 +138,39 @@ def __init__(self, excs: list[Exception]):

def __new__(cls, excs: list[Exception]):
return super().__new__(cls, cls._format_message(excs), excs)


class ShardedReadRowsExceptionGroup(BigtableExceptionGroup):
"""
Represents one or more exceptions that occur during a sharded read rows operation
"""

@staticmethod
def _format_message(excs: list[FailedQueryShardError], total_queries: int):
query_str = "query" if total_queries == 1 else "queries"
plural_str = "" if len(excs) == 1 else "s"
return f"{len(excs)} sub-exception{plural_str} (from {total_queries} {query_str} attempted)"

def __init__(self, excs: list[FailedQueryShardError], total_queries: int):
super().__init__(self._format_message(excs, total_queries), excs)

def __new__(cls, excs: list[FailedQueryShardError], total_queries: int):
return super().__new__(cls, cls._format_message(excs, total_queries), excs)


class FailedQueryShardError(Exception):
"""
Represents an individual failed query in a sharded read rows operation
"""

def __init__(
self,
failed_index: int,
failed_query: "ReadRowsQuery" | dict[str, Any],
cause: Exception,
):
message = f"Failed query at index {failed_index} with cause: {cause!r}"
super().__init__(message)
self.index = failed_index
self.query = failed_query
self.__cause__ = cause
Loading
0