-
-
Notifications
You must be signed in to change notification settings - Fork 223
Keep connection open and lock to prevent duplicate requests #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keep connection open and lock to prevent duplicate requests #213
Conversation
Codecov Report
@@ Coverage Diff @@
## master #213 +/- ##
==========================================
+ Coverage 80.53% 81.68% +1.15%
==========================================
Files 12 12
Lines 1269 1316 +47
Branches 170 173 +3
==========================================
+ Hits 1022 1075 +53
+ Misses 219 210 -9
- Partials 28 31 +3
Continue to review full report at Codecov.
|
home-assistant/core#56512 seems stable after this |
Before: 8.475s
After: 3.387s
Test code import asyncio
from kasa import SmartStrip
import logging
# logging.basicConfig(level=logging.DEBUG)
import pprint
async def print_eme():
strip = SmartStrip("192.168.211.210")
await strip.update()
pprint.pprint(["emeter_this_month", strip.emeter_this_month])
pprint.pprint(["emeter_today", strip.emeter_today])
pprint.pprint(["emeter_realtime", strip.emeter_realtime])
asyncio.ensure_future(strip.update())
res = await strip.get_emeter_realtime()
pprint.pprint(dict(res))
res = await strip.get_emeter_daily(year=2021, month=9)
pprint.pprint(dict(res))
res = await strip.get_emeter_monthly(year=2021)
pprint.pprint(dict(res))
for plug in strip.children:
pprint.pprint(await plug.get_emeter_daily(year=2021, month=9))
pprint.pprint(strip)
asyncio.run(print_eme()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, this is a great improvement! Could you add a couple of tests (e.g., to make sure that the reconnection logic works)?
The tests on a real device are now failing, either with RuntimeError: Event loop is closed
or Unable to query the device: 0 bytes read on a total of 4 expected bytes
. Test with pytest kasa --ip 192.168...
, looks like it's failing on the follow-up request for emeter-enabled devices:
DEBUG kasa.protocol:protocol.py:79 > (33) {"system": {"get_sysinfo": null}}
DEBUG kasa.protocol:protocol.py:87 < (593) {'system': {'get_sysinfo': {'active_mode': 'schedule',
<discovery snip>
DEBUG kasa.smartdevice:smartdevice.py:199 Initializing 192.168.250.188 of type <class 'kasa.smartplug.SmartPlug'>
DEBUG kasa.smartdevice:smartdevice.py:287 Performing the initial update to obtain sysinfo
DEBUG kasa.protocol:protocol.py:79 > (33) {"system": {"get_sysinfo": null}}
DEBUG kasa.protocol:protocol.py:87 < (593) {'system': {'get_sysinfo': {'active_mode': 'schedule',
<initial update snip>
DEBUG kasa.smartdevice:smartdevice.py:297 The device has emeter, querying its information along sysinfo
DEBUG kasa.protocol:protocol.py:79 > (143) {"system": {"get_sysinfo": null}, "emeter": {"get_realtime": null, "get_monthstat": {"year": 2021}, "get_daystat": {"month": 9, "year": 2021}}}
DEBUG kasa.protocol:protocol.py:119 Unable to query the device, retrying: 0 bytes read on a total of 4 expected bytes
<failure on emeter followup>
――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――― ERROR at setup of test_get_light_state[KL130(EU)_1.0_1.8.8.json] ―――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――
self = <kasa.protocol.TPLinkSmartHomeProtocol object at 0x7f7c6dcf7910>, request = '{"system": {"get_sysinfo": null}, "emeter": {"get_realtime": null, "get_monthstat": {"year": 2021}, "get_daystat": {"month": 9, "year": 2021}}}'
retry_count = 3, timeout = 5
async def _query(self, request: str, retry_count: int, timeout: int) -> Dict:
"""Try to query a device."""
for retry in range(retry_count + 1):
if not await self._connect(timeout):
continue
try:
assert self.reader is not None
assert self.writer is not None
> return await asyncio.wait_for(
self._execute_query(request), timeout=timeout
)
kasa/protocol.py:108:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task finished name='Task-278' coro=<TPLinkSmartHomeProtocol._execute_query() done, defined at /home/tpr/code/python-kasa/kasa/protocol.py:75> exception=IncompleteReadError('0 bytes read on a total of 4 expected bytes')>
timeout = 5
async def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout is None:
return await fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc
else:
raise exceptions.TimeoutError()
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
> return fut.result()
/usr/lib/python3.9/asyncio/tasks.py:481:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kasa.protocol.TPLinkSmartHomeProtocol object at 0x7f7c6dcf7910>, request = '{"system": {"get_sysinfo": null}, "emeter": {"get_realtime": null, "get_monthstat": {"year": 2021}, "get_daystat": {"month": 9, "year": 2021}}}'
async def _execute_query(self, request: str) -> Dict:
"""Execute a query on the device and wait for the response."""
assert self.writer is not None
assert self.reader is not None
_LOGGER.debug("> (%i) %s", len(request), request)
self.writer.write(TPLinkSmartHomeProtocol.encrypt(request))
await self.writer.drain()
> packed_block_size = await self.reader.readexactly(self.BLOCK_SIZE)
kasa/protocol.py:82:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <StreamReader eof transport=<_SelectorSocketTransport closing fd=36>>, n = 4
async def readexactly(self, n):
"""Read exactly `n` bytes.
Raise an IncompleteReadError if EOF is reached before `n` bytes can be
read. The IncompleteReadError.partial attribute of the exception will
contain the partial read bytes.
if n is zero, return empty bytes object.
Returned value is not limited with limit, configured at stream
creation.
If stream was paused, this function will automatically resume it if
needed.
"""
if n < 0:
raise ValueError('readexactly size can not be less than zero')
if self._exception is not None:
raise self._exception
if n == 0:
return b''
while len(self._buffer) < n:
if self._eof:
incomplete = bytes(self._buffer)
self._buffer.clear()
> raise exceptions.IncompleteReadError(incomplete, n)
E asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes
/usr/lib/python3.9/asyncio/streams.py:721: IncompleteReadError
During handling of the above exception, another exception occurred:
request = <SubRequest 'dev' for <Function test_bulb_sysinfo[KL130(EU)_1.0_1.8.8.json]>>
@pytest.fixture(params=SUPPORTED_DEVICES, scope="session")
def dev(request):
"""Device fixture.
Provides a device (given --ip) or parametrized fixture for the supported devices.
The initial update is called automatically before returning the device.
"""
file = request.param
ip = request.config.getoption("--ip")
if ip:
d = asyncio.run(Discover.discover_single(ip))
> asyncio.run(d.update())
kasa/tests/conftest.py:168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib/python3.9/asyncio/runners.py:44: in run
return loop.run_until_complete(main)
/usr/lib/python3.9/asyncio/base_events.py:642: in run_until_complete
return future.result()
kasa/smartdevice.py:302: in update
self._last_update = await self.protocol.query(req)
kasa/protocol.py:61: in query
return await self._query(request, retry_count, timeout)
kasa/protocol.py:112: in _query
await self._close()
kasa/protocol.py:93: in _close
self.writer.close()
/usr/lib/python3.9/asyncio/streams.py:353: in close
return self._transport.close()
/usr/lib/python3.9/asyncio/selector_events.py:700: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.9/asyncio/base_events.py:746: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/usr/lib/python3.9/asyncio/base_events.py:510: RuntimeError
Looks like the test are failing because the connection is being created in one event loop and then used again in a different event loop. I'll take a look in a bit |
Co-authored-by: Teemu R. <tpr@iki.fi>
Co-authored-by: Teemu R. <tpr@iki.fi>
Co-authored-by: Teemu R. <tpr@iki.fi>
Co-authored-by: Teemu R. <tpr@iki.fi>
717d3e3
to
e617d16
Compare
Added coverage for the reconnect, and covered the protocol tests quite a bit more |
This looks good now as it is and is a huge improvement on all devices when used on long-running processes like homeassistant, thanks! 👍 |
This release introduces re-using the device connection to get rid of (sometimes slow) connection establishment. This is especially useful for emeter-enabled smart strips or any other usecases requiring consecutive I/O requests. [Full Changelog](python-kasa/python-kasa@0.4.0.dev4...0.4.0.dev5) **Merged pull requests:** - Add KL130 fixture, initial lightstrip tests [\python-kasa#214](python-kasa#214) ([rytilahti](https://github.com/rytilahti)) - Keep connection open and lock to prevent duplicate requests [\python-kasa#213](python-kasa#213) ([bdraco](https://github.com/bdraco)) - Cleanup discovery & add tests [\python-kasa#212](python-kasa#212) ([rytilahti](https://github.com/rytilahti))
This release introduces re-using the device connection to get rid of (sometimes slow) connection establishment. This is especially useful for emeter-enabled smart strips or any other usecases requiring consecutive I/O requests. [Full Changelog](0.4.0.dev4...0.4.0.dev5) **Merged pull requests:** - Add KL130 fixture, initial lightstrip tests [\#214](#214) ([rytilahti](https://github.com/rytilahti)) - Keep connection open and lock to prevent duplicate requests [\#213](#213) ([bdraco](https://github.com/bdraco)) - Cleanup discovery & add tests [\#212](#212) ([rytilahti](https://github.com/rytilahti))
…asa#213) * Keep connection open and lock to prevent duplicate requests * option to not update children * tweaks * typing * tweaks * run tests in the same event loop * memorize model * Update kasa/protocol.py Co-authored-by: Teemu R. <tpr@iki.fi> * Update kasa/protocol.py Co-authored-by: Teemu R. <tpr@iki.fi> * Update kasa/protocol.py Co-authored-by: Teemu R. <tpr@iki.fi> * Update kasa/protocol.py Co-authored-by: Teemu R. <tpr@iki.fi> * dry * tweaks * warn when the event loop gets switched out from under us * raise on unable to connect multiple times * fix patch target * tweaks * isrot * reconnect test * prune * fix mocking * fix mocking * fix test under python 3.7 * fix test under python 3.7 * less patching * isort * use mocker to patch * disable on old python since mocking doesnt work * avoid disconnect/reconnect cycles * isort * Fix hue validation * Fix latitude_i/longitude_i units Co-authored-by: Teemu R. <tpr@iki.fi>
This release introduces re-using the device connection to get rid of (sometimes slow) connection establishment. This is especially useful for emeter-enabled smart strips or any other usecases requiring consecutive I/O requests. [Full Changelog](python-kasa/python-kasa@0.4.0.dev4...0.4.0.dev5) **Merged pull requests:** - Add KL130 fixture, initial lightstrip tests [\python-kasa#214](python-kasa#214) ([rytilahti](https://github.com/rytilahti)) - Keep connection open and lock to prevent duplicate requests [\python-kasa#213](python-kasa#213) ([bdraco](https://github.com/bdraco)) - Cleanup discovery & add tests [\python-kasa#212](python-kasa#212) ([rytilahti](https://github.com/rytilahti))
The devices didn't behave well with competing requests. We now lock and keep the connection open to ensure only one request (update or change) is in flight at a time.
Polling is MUCH faster with this strategy which seems to mirror what the iOS app is doing (except its polling more frequently)
The constant disconnect and reconnect seemed to put a strain on the device.
2021-09-23 23:03:01 DEBUG (MainThread) [homeassistant.components.tplink] Finished fetching Bulb3 data in 0.027 seconds (success: True)
Also we probably can drop all the retry logic in Home Assistant with this.