8000 feat: (opt-in): terminate handling of work when the request has alrea… · di/functions-framework-python@2601975 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2601975

Browse files
authored
feat: (opt-in): terminate handling of work when the request has already timed out (GoogleCloudPlatform#328)
Overhead-free (or at least very cheap). The “timeout” gunicorn config means drastically different things for sync and non-sync workers: > Workers silent for more than this many seconds are killed and restarted. > > Value is a positive number or 0. Setting it to 0 has the effect of > infinite timeouts by disabling timeouts for all workers entirely. > > Generally, the default of thirty seconds should suffice. Only set this > noticeably higher if you’re sure of the repercussions for sync workers. > For the non sync workers it just means that the worker process is still > communicating and is not tied to the length of time required to handle a > single request. So. For cases where threads = 1 (user set or our defaults), we’ll use the sync worker and let the regular timeout functionality do its thing. For cases where threads > 1, we’re using the gthread worker, and timeout means something completely different and not really user-observable. So we’ll leave the communication timeout (default gunicorn “timeout”) at 30 seconds, but create our own gthread-derived worker class to use instead, which terminates request handling (with no mind to gunicorn’s “graceful shutdown” config), to emulate GCFv1. The arbiter spawns these workers, so we have to maintain some sort of global timeout state for us to read in our custom gthread worker. In the future, we should consider letting the user adjust the graceful shutdown seconds. But the default of 30 seems like it’s worked fine historically, so it’s hard to argue for changing it. IIUC, this means that on gen 2, there’s a small behavior difference for the sync workers compared to gen 1, in that gen 2 sync worker workloads will get an extra 30 seconds of timeout to gracefully shut down. I don’t think monkeying with this config and opting-in to sync workers is very common, though, so let’s not worry about it here; everyone should be on the gthread path outlined above. * fix tests * small test fixes give up on coverage support for things that are tested in different processes, or in gthread, because it looks like pytest-cov gave up on support for these, where as coverage has out-of-the-box support * format * isort everything * skip tests on mac there's something test-specific about how mac pickles functions for execution in multiprocessing.Process which is causing problems. it seems somewhere in the innards of flask and gunicorn and macos... since this feature is opt-in anyway, let's just skip testing darwin. * sort tuple of dicts in async tests before asserting causes flakes sometimes in workflows * use double-quotes * also skip tests on windows - this is all built for gunicorn, there's no value adding it for windows anyway * skip import on windows * easy stuff * add a few tests for sync worker timeouts these shouldn't have changed with this commit
1 parent fff38ae commit 2601975

File tree

9 files changed

+381
-8
lines changed

9 files changed

+381
-8
lines changed

examples/cloud_run_cloud_events/send_cloud_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
from cloudevents.http import CloudEvent, to_structured
1716
import requests
1817

18+
from cloudevents.http import CloudEvent, to_structured
1919

2020
# Create a cloudevent using https://github.com/cloudevents/sdk-python
2121
# Note we only need source and type because the cloudevents constructor by

playground/main.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import logging
2+
import time
3+
4+
import functions_framework
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
@functions_framework.http
10+
def main(request):
11+
timeout = 2
12+
for _ in range(timeout * 10):
13+
time.sleep(0.1)
14+
logger.info("logging message after timeout elapsed")
15+
return "Hello, world!"
16+

src/functions_framework/_http/gunicorn.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2020 Google LLC
1+
# Copyright 2024 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -16,17 +16,43 @@
1616

1717
import gunicorn.app.base
1818

19+
from gunicorn.workers.gthread import ThreadWorker
20+
21+
from ..request_timeout import ThreadingTimeout
22+
23+
# global for use in our custom gthread worker; the gunicorn arbiter spawns these
24+
# and it's not possible to inject (and self.timeout means something different to
25+
# async workers!)
26+
# set/managed in gunicorn application init for test-friendliness
27+
TIMEOUT_SECONDS = None
28+
1929

2030
class GunicornApplication(gunicorn.app.base.BaseApplication):
2131
def __init__(self, app, host, port, debug, **options):
32+
threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4))
33+
34+
global TIMEOUT_SECONDS
35+
TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0))
36+
2237
self.options = {
2338
"bind": "%s:%s" % (host, port),
24-
"workers": os.environ.get("WORKERS", 1),
25-
"threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4),
26-
"timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0),
27-
"loglevel": "error",
39+
"workers": int(os.environ.get("WORKERS", 1)),
40+
"threads": threads,
41+
"loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"),
2842
"limit_request_line": 0,
2943
}
44+
45+
if (
46+
TIMEOUT_SECONDS > 0
47+
and threads > 1
48+
and (os.environ.get("THREADED_TIMEOUT_ENABLED", "False").lower() == "true")
49+
): # pragma: no cover
50+
self.options["worker_class"] = (
51+
"functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport"
52+
)
53+
else:
54+
self.options["timeout"] = TIMEOUT_SECONDS
55+
3056
self.options.update(options)
3157
self.app = app
3258

@@ -38,3 +64,9 @@ def load_config(self):
3864

3965
def load(self):
4066
return self.app
67+
68+
69+
class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover
70+
def handle_request(self, req, conn):
71+
with ThreadingTimeout(TIMEOUT_SECONDS):
72+
super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn)

src/functions_framework/exceptions.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2020 Google LLC
1+
# Copyright 2024 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException):
3535

3636
class EventConversionException(FunctionsFrameworkException):
3737
pass
38+
39+
40+
class RequestTimeoutException(FunctionsFrameworkException):
41+
pass
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import ctypes
2+
import logging
3+
import threading
4+
5+
from .exceptions import RequestTimeoutException
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class ThreadingTimeout(object): # pragma: no cover
11+
def __init__(self, seconds):
12+
self.seconds = seconds
13+
self.target_tid = threading.current_thread().ident
14+
self.timer = None
15+
16+
def __enter__(self):
17+
self.timer = threading.Timer(self.seconds, self._raise_exc)
18+
self.timer.start()
19+
return self
20+
21+
def __exit__(self, exc_type, exc_val, exc_tb):
22+
self.timer.cancel()
23+
if exc_type is RequestTimeoutException:
24+
logger.warning(
25+
"Request handling exceeded {0} seconds timeout; terminating request handling...".format(
26+
self.seconds
27+
),
28+
exc_info=(exc_type, exc_val, exc_tb),
29+
)
30+
return False
31+
32+
def _raise_exc(self):
33+
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
34+
ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException)
35+
)
36+
if ret == 0:
37+
raise ValueError("Invalid thread ID {}".format(self.target_tid))
38+
elif ret > 1:
39+
ctypes.pythonapi.PyThreadState_SetAsyncExc(
40+
ctypes.c_long(self.target_tid), None
41+
)
42+
raise SystemError("PyThreadState_SetAsyncExc failed")

tests/test_execution_id.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy
378378
logs = record.err.strip().split("\n")
379379
logs_as_json = tuple(json.loads(log) for log in logs)
380380

381-
assert logs_as_json == expected_logs
381+
sort_key = lambda d: d["message"]
382+
assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key)

tests/test_functions/timeout/main.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import logging
2+
import time
3+
4+
logger = logging.getLogger(__name__)
5+
6+
7+
def function(request):
8+
# sleep for 1200 total ms (1.2 sec)
9+
for _ in range(12):
10+
time.sleep(0.1)
11+
logger.info("some extra logging message")
12+
return "success", 200

0 commit comments

Comments
 (0)
0