8000 Merge pull request #812 from dhermes/add-future-type · googleapis/google-cloud-python@73a341c · GitHub
[go: up one dir, main page]

Skip to content

Commit 73a341c

Browse files
committed
Merge pull request #812 from dhermes/add-future-type
Using futures in batched requests
2 parents 2cdaeb2 + 4564992 commit 73a341c

File tree

8 files changed

+442
-125
lines changed

8 files changed

+442
-125
lines changed

gcloud/connection.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def build_api_url(cls, path, query_params=None,
160160
return url
161161

162162
def _make_request(self, method, url, data=None, content_type=None,
163-
headers=None):
163+
headers=None, target_object=None):
164164
"""A low level method to send a request to the API.
165165
166166
Typically, you shouldn't need to use this method.
@@ -180,6 +180,12 @@ def _make_request(self, method, url, data=None, content_type=None,
180180
:type headers: dict
181181
:param headers: A dictionary of HTTP headers to send with the request.
182182
183+
:type target_object: object or :class:`NoneType`
184+
:param target_object: Argument to be used by library callers.
185+
This can allow custom behavior, for example, to
186+
defer an HTTP request and complete initialization
187+
of the object at a later time.
188+
183189
:rtype: tuple of ``response`` (a dictionary of sorts)
184190
and ``content`` (a string).
185191
:returns: The HTTP response object and the content of the response,
@@ -200,9 +206,10 @@ def _make_request(self, method, url, data=None, content_type=None,
200206

201207
headers['User-Agent'] = self.USER_AGENT
202208

203-
return self._do_request(method, url, headers, data)
209+
return self._do_request(method, url, headers, data, target_object)
204210

205-
def _do_request(self, method, url, headers, data):
211+
def _do_request(self, method, url, headers, data,
212+
target_object): # pylint: disable=unused-argument
206213
"""Low-level helper: perform the actual API request over HTTP.
207214
208215
Allows batch context managers to override and defer a request.
@@ -219,6 +226,10 @@ def _do_request(self, method, url, headers, data):
219226
:type data: string
220227
:param data: The data to send as the body of the request.
221228
229+
:type target_object: object or :class:`NoneType`
230+
:param target_object: Unused ``target_object`` here but may be used
231+
by a superclass.
232+
222233
:rtype: tuple of ``response`` (a dictionary of sorts)
223234
and ``content`` (a string).
224235
:returns: The HTTP response object and the content of the response.
@@ -229,7 +240,7 @@ def _do_request(self, method, url, headers, data):
229240
def api_request(self, method, path, query_params=None,
230241
data=None, content_type=None,
231242
api_base_url=None, api_version=None,
232-
expect_json=True):
243+
expect_json=True, _target_object=None):
233244
"""Make a request over the HTTP transport to the API.
234245
235246
You shouldn't need to use this method, but if you plan to
@@ -274,6 +285,12 @@ def api_request(self, method, path, query_params=None,
274285
response as JSON and raise an exception if
275286
that cannot be done. Default is True.
276287
288+
:type _target_object: object or :class:`NoneType`
289+
:param _target_object: Protected argument to be used by library
290+
callers. This can allow custom behavior, for
291+
example, to defer an HTTP request and complete
292+
initialization of the object at a later time.
293+
277294
:raises: Exception if the response code is not 200 OK.
278295
"""
279296
url = self.build_api_url(path=path, query_params=query_params,
@@ -287,12 +304,14 @@ def api_request(self, method, path, query_params=None,
287304
content_type = 'application/json'
288305

289306
response, content = self._make_request(
290-
method=method, url=url, data=data, content_type=content_type)
307+
method=method, url=url, data=data, content_type=content_type,
308+
target_object=_target_object)
291309

292310
if not 200 <= response.status < 300:
293311
raise make_exception(response, content)
294312

295-
if content and expect_json:
313+
string_or_bytes = (six.binary_type, six.text_type)
314+
if content and expect_json and isinstance(content, string_or_bytes):
296315
content_type = response.get('content-type', '')
297316
if not content_type.startswith('application/json'):
298317
raise TypeError('Expected JSON, got %s' % content_type)

gcloud/storage/_helpers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def reload(self, connection=None):
6060
# are handled via custom endpoints.
6161
query_params = {'projection': 'noAcl'}
6262
api_response = connection.api_request(
63-
method='GET', path=self.path, query_params=query_params)
63+
method='GET', path=self.path, query_params=query_params,
64+
_target_object=self)
6465
self._set_properties(api_response)
6566

6667
def _patch_property(self, name, value):
@@ -84,7 +85,7 @@ def _patch_property(self, name, value):
8485
def _set_properties(self, value):
8586
"""Set the properties for the current object.
8687
87-
:type value: dict
88+
:type value: dict or :class:`gcloud.storage.batch._FutureDict`
8889
:param value: The properties to be set.
8990
"""
9091
self._properties = value
@@ -108,7 +109,7 @@ def patch(self, connection=None):
108109
for key in self._changes)
109110
api_response = connection.api_request(
110111
method='PATCH', path=self.path, data=update_properties,
111-
query_params={'projection': 'full'})
112+
query_params={'projection': 'full'}, _target_object=self)
112113
self._set_properties(api_response)
113114

114115

gcloud/storage/batch.py

Lines changed: 128 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
from email.mime.application import MIMEApplication
2121
from email.mime.multipart import MIMEMultipart
2222
from email.parser import Parser
23+
import httplib2
2324
import io
2425
import json
2526

2627
import six
2728

2829
from gcloud._helpers import _LocalStack
30+
from gcloud.exceptions import make_exception
2931
from gcloud.storage import _implicit_environ
3032
from gcloud.storage.connection import Connection
3133

@@ -71,6 +73,54 @@ class NoContent(object):
7173
status = 204
7274

7375

76+
class _FutureDict(object):
77+
"""Class to hold a future value for a deferred request.
78+
79+
Used by for requests that get sent in a :class:`Batch`.
80+
"""
81+
82+
@staticmethod
83+
def get(key, default=None):
84+
"""Stand-in for dict.get.
85+
86+
:type key: object
87+
:param key: Hashable dictionary key.
88+
89+
:type default: object
90+
:param default: Fallback value to dict.get.
91+
92+
:raises: :class:`KeyError` always since the future is intended to fail
93+
as a dictionary.
94+
"""
95+
raise KeyError('Cannot get(%r, default=%r) on a future' % (
96+
key, default))
97+
98+
def __getitem__(self, key):
99+
"""Stand-in for dict[key].
100+
101+
:type key: object
102+
:param key: Hashable dictionary key.
103+
104+
:raises: :class:`KeyError` always since the future is intended to fail
105+
as a dictionary.
106+
"""
107+
raise KeyError('Cannot get item %r from a future' % (key,))
108+
109+
def __setitem__(self, key, value):
110+
"""Stand-in for dict[key] = value.
111+
112+
:type key: object
113+
:param key: Hashable dictionary key.
114+
115+
:type value: object
116+
:param value: Dictionary value.
117+
118+
:raises: :class:`KeyError` always since the future is intended to fail
119+
as a dictionary.
120+
"""
121+
raise KeyError('Cannot set %r -> %r on a future' % (key, value))
122+
123+
74124
class Batch(Connection):
75125
"""Proxy an underlying connection, batching up change operations.
76126
@@ -86,9 +136,9 @@ def __init__(self, connection=None):
86136
super(Batch, self).__init__()
87137
self._connection = connection
88138
self._requests = []
89-
self._responses = []
139+
self._target_objects = []
90140

91-
def _do_request(self, method, url, headers, data):
141+
def _do_request(self, method, url, headers, data, target_object):
92142
"""Override Connection: defer actual HTTP request.
93143
94144
Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred.
@@ -109,22 +159,22 @@ def _do_request(self, method, url, headers, data):
109159
and ``content`` (a string).
110160
:returns: The HTTP response object and the content of the response.
111161
"""
112-
if method == 'GET':
113-
_req = self._connection.http.request
114-
return _req(method=method, uri=url, headers=headers, body=data)
115-
116162
if len(self._requests) >= self._MAX_BATCH_SIZE:
117163
raise ValueError("Too many deferred requests (max %d)" %
118164
self._MAX_BATCH_SIZE)
119165
self._requests.append((method, url, headers, data))
120-
return NoContent(), ''
121-
122-
def finish(self):
123-
"""Submit a single `multipart/mixed` request w/ deferred requests.
124-
125-
:rtype: list of tuples
126-
:returns: one ``(status, reason, payload)`` tuple per deferred request.
127-
:raises: ValueError if no requests have been deferred.
166+
result = _FutureDict()
167+
self._target_objects.append(target_object)
168+
if target_object is not None:
169+
target_object._properties = result
170+
return NoContent(), result
171+
172+
def _prepare_batch_request(self):
173+
"""Prepares headers and body for a batch request.
174+
175+
:rtype: tuple (dict, string)
176+
:returns: The pair of headers and body of the batch request to be sent.
177+
:raises: :class:`ValueError` if no requests have been deferred.
128178
"""
129179
if len(self._requests) == 0:
130180
raise ValueError("No deferred requests")
@@ -146,14 +196,51 @@ def finish(self):
146196

147197
# Strip off redundant header text
148198
_, body = payload.split('\n\n', 1)
149-
headers = dict(multi._headers)
199+
return dict(multi._headers), body
200+
201+
def _finish_futures(self, responses):
202+
"""Apply all the batch responses to the futures created.
203+
204+
:type responses: list of (headers, payload) tuples.
205+
:param responses: List of headers and payloads from each response in
206+
the batch.
207+
208+
:raises: :class:`ValueError` if no requests have been deferred.
209+
"""
210+
# If a bad status occurs, we track it, but don't raise an exception
211+
# until all futures have been populated.
212+
exception_args = None
213+
214+
if len(self._target_objects) != len(responses):
215+
raise ValueError('Expected a response for every request.')
216+
217+
for target_object, sub_response in zip(self._target_objects,
218+
responses):
219+
resp_headers, sub_payload = sub_response
220+
if not 200 <= resp_headers.status < 300:
221+
exception_args = exception_args or (resp_headers,
222+
sub_payload)
223+
elif target_object is not None:
224+
target_object._properties = sub_payload
225+
226+
if exception_args is not None:
227+
raise make_exception(*exception_args)
228+
229+
def finish(self):
230+
"""Submit a single `multipart/mixed` request w/ deferred requests.
231+
232+
:rtype: list of tuples
233+
:returns: one ``(headers, payload)`` tuple per deferred request.
234+
"""
235+
headers, body = self._prepare_batch_request()
150236

151237
url = '%s/batch' % self.API_BASE_URL
152238

153-
_req = self._connection._make_request
154-
response, content = _req('POST', url, data=body, headers=headers)
155-
self._responses = list(_unpack_batch_response(response, content))
156-
return self._responses
239+
response, content = self._connection._make_request(
240+
'POST', url, data=body, headers=headers)
241+
responses = list(_unpack_batch_response(response, content))
242+
self._finish_futures(responses)
243+
return responses
157244

158245
@staticmethod
159246
def current():
@@ -199,7 +286,20 @@ def _generate_faux_mime_message(parser, response, content):
199286

200287

201288
def _unpack_batch_response(response, content):
202-
"""Convert response, content -> [(status, reason, payload)]."""
289+
"""Convert response, content -> [(headers, payload)].
290+
291+
Creates a generator of tuples of emulating the responses to
292+
:meth:`httplib2.Http.request` (a pair of headers and payload).
293+
294+
:type response: :class:`httplib2.Response`
295+
:param response: HTTP response / headers from a request.
296+
297+
:type content: string
298+
:param content: Response payload with a batch response.
299+
300+
:rtype: generator
301+
:returns: A generator of header, payload pairs.
302+
"""
203303
parser = Parser()
204304
message = _generate_faux_mime_message(parser, response, content)
205305

@@ -208,10 +308,13 @@ def _unpack_batch_response(response, content):
208308

209309
for subrequest in message._payload:
210310
status_line, rest = subrequest._payload.split('\n', 1)
211-
_, status, reason = status_line.split(' ', 2)
212-
message = parser.parsestr(rest)
213-
payload = message._payload
214-
ctype = message['Content-Type']
311+
_, status, _ = status_line.split(' ', 2)
312+
sub_message = parser.parsestr(rest)
313+
payload = sub_message._payload
314+
ctype = sub_message['Content-Type']
315+
msg_headers = dict(sub_message._headers)
316+
msg_headers['status'] = status
317+
headers = httplib2.Response(msg_headers)
215318
if ctype and ctype.startswith('application/json'):
216319
payload = json.loads(payload)
217-
yield status, reason, payload
320+
yield headers, payload

gcloud/storage/blob.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ def exists(self, connection=None):
227227
# minimize the returned payload.
228228
query_params = {'fields': 'name'}
229229
connection.api_request(method='GET', path=self.path,
230-
query_params=query_params)
230+
query_params=query_params,
231+
_target_object=self)
231232
return True
232233
except NotFound:
233234
return False

0 commit comments

Comments
 (0)
0