8000 Merge branch 'GoogleCloudPlatform:main' into main · jrmfg/functions-framework-python@4cfe6f2 · GitHub
[go: up one dir, main page]

Skip to content
10000

Commit 4cfe6f2

Browse files
authored
Merge branch 'GoogleCloudPlatform:main' into main
2 parents 99b1b0b + 04c1fdc commit 4cfe6f2

File tree

9 files changed

+381
-8
lines changed

9 files changed

+381
-8
lines changed

examples/cloud_run_cloud_events/send_cloud_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
from cloudevents.http import CloudEvent, to_structured
1716
import requests
1817

18+
from cloudevents.http import CloudEvent, to_structured
1919

2020
# Create a cloudevent using https://github.com/cloudevents/sdk-python
2121
# Note we only need source and type because the cloudevents constructor by

playground/main.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import logging
2+
import time
3+
4+
import functions_framework
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
@functions_framework.http
10+
def main(request):
11+
timeout = 2
12+
for _ in range(timeout * 10):
13+
time.sleep(0.1)
14+
logger.info("logging message after timeout elapsed")
15+
return "Hello, world!"
16+

src/functions_framework/_http/gunicorn.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2020 Google LLC
1+
# Copyright 2024 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -16,17 +16,43 @@
1616

1717
import gunicorn.app.base
1818

19+
from gunicorn.workers.gthread import ThreadWorker
20+
21+
from ..request_timeout import ThreadingTimeout
22+
23+
# global for use in our custom gthread worker; the gunicorn arbiter spawns these
24+
# and it's not possible to inject (and self.timeout means something different to
25+
# async workers!)
26+
# set/managed in gunicorn application init for test-friendliness
27+
TIMEOUT_SECONDS = None
28+
1929

2030
class GunicornApplication(gunicorn.app.base.BaseApplication):
2131
def __init__(self, app, host, port, debug, **options):
32+
threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4))
33+
34+
global TIMEOUT_SECONDS
35+
TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0))
36+
2237
self.options = {
2338
"bind": "%s:%s" % (host, port),
24-
"workers": os.environ.get("WORKERS", 1),
25-
"threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4),
26-
"timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0),
27-
"loglevel": "error",
39+
"workers": int(os.environ.get("WORKERS", 1)),
40+
"threads": threads,
41+
"loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"),
2842
"limit_request_line": 0,
2943
}
44+
45+
if (
46+
TIMEOUT_SECONDS > 0
47+
and threads > 1
48+
and (os.environ.get("THREADED_TIMEOUT_ENABLED", "False").lower() == "true")
49+
): # pragma: no cover
50+
self.options["worker_class"] = (
51+
"functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport"
52+
)
53+
else:
54+
self.options["timeout"] = TIMEOUT_SECONDS
55+
3056
self.options.update(options)
3157
self.app = app
3258

@@ -38,3 +64,9 @@ def load_config(self):
3864

3965
def load(self):
4066
return self.app
67+
68+
69+
class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover
70+
def handle_request(self, req, conn):
71+
with ThreadingTimeout(TIMEOUT_SECONDS):
72+
super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn)

src/functions_framework/exceptions.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2020 Google LLC
1+
# Copyright 2024 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException):
3535

3636
class EventConversionException(FunctionsFrameworkException):
3737
pass
38+
39+
40+
class RequestTimeoutException(FunctionsFrameworkException):
41+
pass
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import ctypes
2+
import logging
3+
import threading
4+
5+
from .exceptions import RequestTimeoutException
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class ThreadingTimeout(object): # pragma: no cover
11+
def __init__(self, seconds):
12+
self.seconds = seconds
13+
self.target_tid = threading.current_thread().ident
14+
self.timer = None
15+
16+
def __enter__(self):
17+
self.timer = threading.Timer(self.seconds, self._raise_exc)
18+
self.timer.start()
19+
return self
20+
21+
def __exit__(self, exc_type, exc_val, exc_tb):
22+
self.timer.cancel()
23+
if exc_type is RequestTimeoutException:
24+
logger.warning(
25+
"Request handling exceeded {0} seconds timeout; terminating request handling...".format(
26+
self.seconds
27+
),
28+
exc_info=(exc_type, exc_val, exc_tb),
29+
)
30+
return False
31+
32+
def _raise_exc(self):
33+
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
34+
ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException)
35+
)
36+
if ret == 0:
37+
raise ValueError("Invalid thread ID {}".format(self.target_tid))
38+
elif ret > 1:
39+
ctypes.pythonapi.PyThreadState_SetAsyncExc(
40+
ctypes.c_long(self.target_tid), None
41+
)
42+
raise SystemError("PyThreadState_SetAsyncExc failed")

tests/test_execution_id.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy
378378
logs = record.err.strip().split("\n")
379379
logs_as_json = tuple(json.loads(log) for log in logs)
380380

381-
assert logs_as_json == expected_logs
381+
sort_key = lambda d: d["message"]
382+
assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key)

tests/test_functions/timeout/main.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import logging
2+
import time
3+
4+
logger = logging.getLogger(__name__)
5+
6+
7+
def function(request):
8+
# sleep for 1200 total ms (1.2 sec)
9+
for _ in range(12):
10+
time.sleep(0.1)
11+
logger.info("some extra logging message")
12+
return "success", 200

0 commit comments

Comments
 (0)
0