8000 Threading helper workaround · rusty0209/botbuilder-python@ebbb4f7 · GitHub
[go: up one dir, main page]

Skip to content

Commit ebbb4f7

Browse files
committed
Threading helper workaround
1 parent 2e4d8c9 commit ebbb4f7

File tree

3 files changed

+169
-9
lines changed

3 files changed

+169
-9
lines changed

libraries/botbuilder-core/tests/teams/message-reactions/app.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33

4-
import asyncio
54
import sys
65
from datetime import datetime
76
from types import MethodType
@@ -16,9 +15,9 @@
1615
from botbuilder.schema import Activity, ActivityTypes
1716
from activity_log import ActivityLog
1817
from bots import MessageReactionBot
18+
from threading_helper import run_coroutine
1919

20-
# Create the loop and Flask app
21-
LOOP = asyncio.get_event_loop()
20+
# Create the Flask app
2221
APP = Flask(__name__, instance_relative_config=True)
2322
APP.config.from_object("config.DefaultConfig")
2423

@@ -80,11 +79,8 @@ def messages():
8079

8180
try:
8281
print("about to create task")
83-
task = LOOP.create_task(
84-
ADAPTER.process_activity(activity, auth_header, BOT.on_turn)
85-
)
8682
print("about to run until complete")
87-
LOOP.run_until_complete(task)
83+
run_coroutine(ADAPTER.process_activity(activity, auth_header, BOT.on_turn))
8884
print("is now complete")
8985
return Response(status=201)
9086
except Exception as exception:

libraries/botbuilder-core/tests/teams/message-reactions/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ class DefaultConfig:
99
""" Bot Configuration """
1010

1111
PORT = 3978
12-
APP_ID = os.environ.get("MicrosoftAppId", "")
13-
APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "")
12+
APP_ID = os.environ.get("MicrosoftAppId", "e4c570ca-189d-4fee-a81b-5466be24a557")
13+
APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "bghqYKJV3709;creKFP8$@@")
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import asyncio
2+
import itertools
3+
import logging
4+
import threading
5+
6+
# pylint: disable=invalid-name
7+
# pylint: disable=global-statement
8+
try:
9+
# Python 3.8 or newer has a suitable process watcher
10+
asyncio.ThreadedChildWatcher
11+
except AttributeError:
12+
# backport the Python 3.8 threaded child watcher
13+
import os
14+
import warnings
15+
16+
# Python 3.7 preferred API
17+
_get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)
18+
19+
class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
20+
def __init__(self):
21+
self._pid_counter = itertools.count(0)
22+
self._threads = {}
23+
24+
def is_active(self):
25+
return True
26+
27+
def close(self):
28+
pass
29+
30+
def __enter__(self):
31+
return self
32+
33+
def __exit__(self, exc_type, exc_val, exc_tb):
34+
pass
35+
36+
def __del__(self, _warn=warnings.warn):
37+
threads = [t for t in list(self._threads.values()) if t.is_alive()]
38+
if threads:
39+
_warn(
40+
f"{self.__class__} has registered but not finished child processes",
41+
ResourceWarning,
42+
source=self,
43+
)
44+
45+
def add_child_handler(self, pid, callback, *args):
46+
loop = _get_running_loop()
47+
thread = threading.Thread(
48+
target=self._do_waitpid,
49+
name=f"waitpid-{next(self._pid_counter)}",
50+
args=(loop, pid, callback, args),
51+
daemon=True,
52+
)
53+
self._threads[pid] = thread
54+
thread.start()
55+
56+
def remove_child_handler(self, pid):
57+
# asyncio never calls remove_child_handler() !!!
58+
# The method is no-op but is implemented because
59+
# abstract base class requires it
60+
return True
61+
62+
def attach_loop(self, loop):
63+
pass
64+
65+
def _do_waitpid(self, loop, expected_pid, callback, args):
66+
assert expected_pid > 0
67+
68+
try:
69+
pid, status = os.waitpid(expected_pid, 0)
70+
except ChildProcessError:
71+
# The child process is already reaped
72+
# (may happen if waitpid() is called elsewhere).
73+
pid = expected_pid
74+
returncode = 255
75+
logger.warning(
76+
"Unknown child process pid %d, will report returncode 255", pid
77+
)
78+
else:
79+
if os.WIFSIGNALED(status):
80+
returncode = -os.WTERMSIG(status)
81+
elif os.WIFEXITED(status):
82+
returncode = os.WEXITSTATUS(status)
83+
else:
84+
returncode = status
85+
86+
if loop.get_debug():
87+
logger.debug(
88+
"process %s exited with returncode %s", expected_pid, returncode
89+
)
90+
91+
if loop.is_closed():
92+
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
93+
else:
94+
loop.call_soon_threadsafe(callback, pid, returncode, *args)
95+
96+
self._threads.pop(expected_pid)
97+
98+
# add the watcher to the loop policy
99+
asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())
100+
101+
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
102+
103+
logger = logging.getLogger(__name__)
104+
105+
class EventLoopThread(threading.Thread):
106+
loop = None
107+
_count = itertools.count(0)
108+
109+
def __init__(self):
110+
name = f"{type(self).__name__}-{next(self._count)}"
111+
super().__init__(name=name, daemon=True)
112+
113+
def __repr__(self):
114+
loop, r, c, d = self.loop, False, True, False
115+
if loop is not None:
116+
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
117+
return (
118+
f"<{type(self).__name__} {self.name} id={self.ident} "
119+
f"running={r} closed={c} debug={d}>"
120+
)
121+
122+
def run(self):
123+
self.loop = loop = asyncio.new_event_loop()
124+
asyncio.set_event_loop(loop)
125+
126+
try:
127+
loop.run_forever()
128+
finally:
129+
try:
130+
shutdown_asyncgens = loop.shutdown_asyncgens()
131+
except AttributeError:
132+
pass
133+
else:
134+
loop.run_until_complete(shutdown_asyncgens)
135+
loop.close()
136+
asyncio.set_event_loop(None)
137+
138+
def stop(self):
139+
loop, self.loop = self.loop, None
140+
if loop is None:
141+
return
142+
loop.call_soon_threadsafe(loop.stop)
143+
self.join()
144+
145+
_lock = threading.Lock()
146+
_loop_thread = None
147+
148+
def get_event_loop():
149+
global _loop_thread
150+
with _lock:
151+
if _loop_thread is None:
152+
_loop_thread = EventLoopThread()
153+
_loop_thread.start()
154+
return _loop_thread.loop
155+
156+
def stop_event_loop():
157+
global _loop_thread
158+
with _lock:
159+
if _loop_thread is not None:
160+
_loop_thread.stop()
161+
_loop_thread = None
162+
163+
def run_coroutine(coro):
164+
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

0 commit comments

Comments
 (0)
0