8000 feat: implement query profiling (#542) · googleapis/python-datastore@1500f70 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1500f70

Browse files
feat: implement query profiling (#542)
1 parent ba20019 commit 1500f70

File tree

12 files changed

+1170
-100
lines changed

12 files changed

+1170
-100
lines changed

google/cloud/datastore/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@
6161
from google.cloud.datastore.entity import Entity
6262
from google.cloud.datastore.key import Key
6363
from google.cloud.datastore.query import Query
64+
from google.cloud.datastore.query_profile import ExplainOptions
6465
from google.cloud.datastore.transaction import Transaction
6566

66-
__all__ = ["__version__", "Batch", "Client", "Entity", "Key", "Query", "Transaction"]
67+
__all__ = [
68+
"__version__",
69+
"Batch",
70+
"Client",
71+
"Entity",
72+
"Key",
73+
"Query",
74+
"ExplainOptions",
75+
"Transaction",
76+
]

google/cloud/datastore/aggregation.py

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,11 @@
2323
from google.cloud.datastore import helpers
2424
from google.cloud.datastore.query import _pb_from_query
2525

26+
from google.cloud.datastore.query_profile import ExplainMetrics
27+
from google.cloud.datastore.query_profile import QueryExplainError
2628

27-
_NOT_FINISHED = query_pb2.QueryResultBatch.MoreResultsType.NOT_FINISHED
28-
_NO_MORE_RESULTS = query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS
29-
30-
_FINISHED = (
31-
_NO_MORE_RESULTS,
32-
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT,
33-
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR,
34-
)
29+
from google.cloud.datastore.query import _NOT_FINISHED
30+
from google.cloud.datastore.query import _FINISHED
3531

3632

3733
class BaseAggregation(ABC):
@@ -159,16 +155,25 @@ class AggregationQuery(object):
159155
160156
:type query: :class:`google.cloud.datastore.query.Query`
161157
:param query: The query used for aggregations.
158+
159+
:type explain_options: :class:`~google.cloud.datastore.ExplainOptions`
160+
:param explain_options: (Optional) Options to enable query profiling for
161+
this query. When set, explain_metrics will be available on the iterator
162+
returned by query.fetch().
163+
If not passed, will use value from given query.
162164
"""
163165

164166
def __init__(
165167
self,
166168
client,
167169
query,
170+
explain_options=None,
168171
):
169172
self._client = client
170173
self._nested_query = query
171174
self._aggregations = []
175+
# fallback to query._explain_options if not set
176+
self._explain_options = explain_options or query._explain_options
172177

173178
@property
174179
def project(self):
@@ -391,6 +396,7 @@ def __init__(
391396
self._read_time = read_time
392397
self._limit = limit
393398
# The attributes below will change over the life of the iterator.
399+
self._explain_metrics = None
394400
self._more_results = True
395401

396402
def _build_protobuf(self):
@@ -441,7 +447,6 @@ def _next_page(self):
441447
if not self._more_results:
442448
return None
443449

444-
query_pb = self._build_protobuf()
445450
transaction_id, new_transaction_options = helpers.get_transaction_options(
446451
self.client.current_transaction
447452
)
@@ -466,38 +471,68 @@ def _next_page(self):
466471
"project_id": self._aggregation_query.project,
467472
"partition_id": partition_id,
468473
"read_options": read_options,
469-
"aggregation_query": query_pb,
474+
"aggregation_query": self._build_protobuf(),
470475
}
476+
if self._aggregation_query._explain_options:
477+
request[
478+
"explain_options"
479+
] = self._aggregation_query._explain_options._to_dict()
471480
helpers.set_database_id_to_request(request, self.client.database)
472-
response_pb = self.client._datastore_api.run_aggregation_query(
473-
request=request,
474-
**kwargs,
475 10000 -
)
476481

477-
while response_pb.batch.more_results == _NOT_FINISHED:
478-
# We haven't finished processing. A likely reason is we haven't
479-
# skipped all of the results yet. Don't return any results.
480-
# Instead, rerun query, adjusting offsets. Datastore doesn't process
481-
# more than 1000 skipped results in a query.
482-
old_query_pb = query_pb
483-
query_pb = query_pb2.AggregationQuery()
484-
query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability
485-
486-
request = {
487-
"project_id": self._aggregation_query.project,
488-
"partition_id": partition_id,
489-
"read_options": read_options,
490-
"aggregation_query": query_pb,
491-
}
492-
helpers.set_database_id_to_request(request, self.client.database)
482+
response_pb = None
483+
484+
while response_pb is None or response_pb.batch.more_results == _NOT_FINISHED:
485+
if response_pb is not None:
486+
# We haven't finished processing. A likely reason is we haven't
487+
# skipped all of the results yet. Don't return any results.
488+
# Instead, rerun query, adjusting offsets. Datastore doesn't process
489+
# more than 1000 skipped results in a query.
490+
new_query_pb = query_pb2.AggregationQuery()
491+
new_query_pb._pb.CopyFrom(
492+
request["aggregation_query"]._pb
493+
) # copy for testability
494+
request["aggregation_query"] = new_query_pb
495+
493496
response_pb = self.client._datastore_api.run_aggregation_query(
494-
request=request,
495-
**kwargs,
497+
request=request.copy(), **kwargs
496498
)
499+
# capture explain metrics if present in response
500+
# should only be present in last response, and only if explain_options was set
501+
if response_pb.explain_metrics:
502+
self._explain_metrics = ExplainMetrics._from_pb(
503+
response_pb.explain_metrics
504+
)
497505

498506
item_pbs = self._process_query_results(response_pb)
499507
return page_iterator.Page(self, item_pbs, self.item_to_value)
500508

509+
@property
510+
def explain_metrics(self) -> ExplainMetrics:
511+
"""
512+
Get the metrics associated with the query execution.
513+
Metrics are only available when explain_options is set on the query. If
514+
ExplainOptions.analyze is False, only plan_summary is available. If it is
515+
True, execution_stats is also available.
516+
517+
:rtype: :class:`~google.cloud.datastore.query_profile.ExplainMetrics`
518+
:returns: The metrics associated with the query execution.
519+
:raises: :class:`~google.cloud.datastore.query_profile.QueryExplainError`
520+
if explain_metrics is not available on the query.
521+
"""
522+
if self._explain_metrics is not None:
523+
return self._explain_metrics
524+
elif self._aggregation_query._explain_options is None:
525+
raise QueryExplainError("explain_options not set on query.")
526+
elif self._aggregation_query._explain_options.analyze is False:
527+
# we need to run the query to get the explain_metrics
528+
# analyze=False only returns explain_metrics, no results
529+
self._next_page()
530+
if self._explain_metrics is not None:
531+
return self._explain_metrics
532+
raise QueryExplainError(
533+
"explain_metrics not available until query is complete."
534+
)
535+
501536

502537
# pylint: disable=unused-argument
503538
def _item_to_aggregation_result(iterator, pb):

google/cloud/datastore/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ def do_something_with(entity):
875875
kwargs["namespace"] = self.namespace
876876
return Query(self, **kwargs)
877877

878-
def aggregation_query(self, query):
878+
def aggregation_query(self, query, **kwargs):
879879
"""Proxy to :class:`google.cloud.datastore.aggregation.AggregationQuery`.
880880
881881
Using aggregation_query to count over a query:
@@ -953,7 +953,7 @@ def do_something_with(entity):
953953
:rtype: :class:`~google.cloud.datastore.aggregation.AggregationQuery`
954954
:returns: An AggregationQuery object.
955955
"""
956-
return AggregationQuery(self, query)
956+
return AggregationQuery(self, query, **kwargs)
957957

958958
def reserve_ids_sequential(self, complete_key, num_ids, retry=None, timeout=None):
959959
"""Reserve a list of IDs sequentially from a complete key.

google/cloud/datastore/query.py

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,21 @@
1313
# limitations under the License.
1414

1515
"""Create / interact with Google Cloud Datastore queries."""
16-
1716
import base64
1817
import warnings
1918

20-
2119
from google.api_core import page_iterator
2220
from google.cloud._helpers import _ensure_tuple_or_list
2321

24-
2522
from google.cloud.datastore_v1.types import entity as entity_pb2
2623
from google.cloud.datastore_v1.types import query as query_pb2
2724
from google.cloud.datastore import helpers
2825
from google.cloud.datastore.key import Key
2926

27+
28+
from google.cloud.datastore.query_profile import ExplainMetrics
29+
from google.cloud.datastore.query_profile import QueryExplainError
30+
3031
import abc
3132
from abc import ABC
3233

@@ -38,6 +39,7 @@
3839
_NO_MORE_RESULTS,
3940
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT,
4041
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR,
42+
query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_TYPE_UNSPECIFIED, # received when explain_options(analyze=False)
4143
)
4244

4345
KEY_PROPERTY_NAME = "__key__"
@@ -176,6 +178,11 @@ class Query(object):
176178
:type distinct_on: sequence of string
177179
:param distinct_on: field names used to group query results.
178180
181+
:type explain_options: :class:`~google.cloud.datastore.ExplainOptions`
182+
:param explain_options: (Optional) Options to enable query profiling for
183+
this query. When set, explain_metrics will be available on the iterator
184+
returned by query.fetch().
185+
179186
:raises: ValueError if ``project`` is not passed and no implicit
180187
default is set.
181188
"""
@@ -203,6 +210,7 @@ def __init__(
203210
projection=(),
204211
order=(),
205212
distinct_on=(),
213+
explain_options=None,
206214
):
207215
self._client = client
208216
self._kind = kind
@@ -221,6 +229,7 @@ def __init__(
221229
else:
222230
self._namespace = None
223231

232+
self._explain_options = explain_options
224233
self._ancestor = ancestor
225234
self._filters = []
226235

@@ -704,6 +713,7 @@ def __init__(
704713
self._timeout = timeout
705714
self._read_time = read_time
706715
# The attributes below will change over the life of the iterator.
716+
self._explain_metrics = None
707717
self._more_results = True
708718
self._skipped_results = 0
709719

@@ -777,7 +787,6 @@ def _next_page(self):
777787
if not self._more_results:
778788
return None
779789

780-
query_pb = self._build_protobuf()
781790
new_transaction_options = None
782791
transaction_id, new_transaction_options = helpers.get_transaction_options(
783792
self.client.current_transaction
@@ -804,46 +813,70 @@ def _next_page(self):
804813
"project_id": self._query.project,
805814
"partition_id": partition_id,
806815
"read_options": read_options,
807-
"query": query_pb,
816+
"query": self._build_protobuf(),
808817
}
818+
if self._query._explain_options:
819+
request["explain_options"] = self._query._explain_options._to_dict()
809820

810821
helpers.set_database_id_to_request(request, self.client.database)
811822

812-
response_pb = self.client._datastore_api.run_query(
813-
request=request,
814-
**kwargs,
815-
)
823+
response_pb = None
816824

817-
while (
825+
while response_pb is None or (
818826
response_pb.batch.more_results == _NOT_FINISHED
819-
and response_pb.batch.skipped_results < query_pb.offset
827+
and response_pb.batch.skipped_results < request["query"].offset
820828
):
821-
# We haven't finished processing. A likely reason is we haven't
822-
# skipped all of the results yet. Don't return any results.
823-
# Instead, rerun query, adjusting offsets. Datastore doesn't process
824-
# more than 1000 skipped results in a query.
825-
old_query_pb = query_pb
826-
query_pb = query_pb2.Query()
827-
query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability
828-
query_pb.start_cursor = response_pb.batch.end_cursor
829-
query_pb.offset -= response_pb.batch.skipped_results
830-
831-
request = {
832-
"project_id": self._query.project,
833-
"partition_id": partition_id,
834-
"read_options": read_options,
835-
"query": query_pb,
836-
}
837-
helpers.set_database_id_to_request(request, self.client.database)
829+
if response_pb is not None:
830+
# We haven't finished processing. A likely reason is we haven't
831+
# skipped all of the results yet. Don't return any results.
832+
# Instead, rerun query, adjusting offsets. Datastore doesn't process
833+
# more than 1000 skipped results in a query.
834+
new_query_pb = query_pb2.Query()
835+
new_query_pb._pb.CopyFrom(request["query"]._pb) # copy for testability
836+
new_query_pb.start_cursor = response_pb.batch.end_cursor
837+
new_query_pb.offset -= response_pb.batch.skipped_results
838+
request["query"] = new_query_pb
838839

839840
response_pb = self.client._datastore_api.run_query(
840-
request=request,
841-
**kwargs,
841+
request=request.copy(), **kwargs
842842
)
843+
# capture explain metrics if present in response
844+
# should only be present in last response, and only if explain_options was set
845+
if response_pb and response_pb.explain_metrics:
846+
self._explain_metrics = ExplainMetrics._from_pb(
847+
response_pb.explain_metrics
848+
)
843849

844850
entity_pbs = self._process_query_results(response_pb)
845851
return page_iterator.Page(self, entity_pbs, self.item_to_value)
846852

853+
@property
854+
def explain_metrics(self) -> ExplainMetrics:
855+
"""
856+
Get the metrics associated with the query execution.
857+
Metrics are only available when explain_options is set on the query. If
858+
ExplainOptions.analyze is False, only plan_summary is available. If it is
859+
True, execution_stats is also available.
860+
861+
:rtype: :class:`~google.cloud.datastore.query_profile.ExplainMetrics`
862+
:returns: The metrics associated with the query execution.
863+
:raises: :class:`~google.cloud.datastore.query_profile.QueryExplainError`
864+
if explain_metrics is not available on the query.
865+
"""
866+
if self._explain_metrics is not None:
867+
return self._explain_metrics
868+
elif self._query._explain_options is None:
869+
raise QueryExplainError("explain_options not set on query.")
870+
elif self._query._explain_options.analyze is False:
871+
# we need to run the query to get the explain_metrics
872+
# analyze=False only returns explain_metrics, no results
873+
self._next_page()
874+
if self._explain_metrics is not None:
875+
return self._explain_metrics
876+
raise QueryExplainError(
877+
"explain_metrics not available until query is complete."
878+
)
879+
847880

848881
def _pb_from_query(query):
849882
"""Convert a Query instance to the corresponding protobuf.

0 commit comments

Comments
 (0)
0