8000 make websockets have 2 apis (#163) · Raptorly1/client-python@603d05b · GitHub
[go: up one dir, main page]

Skip to content

Commit 603d05b

Browse files
author
clickingbuttons
authored
make websockets have 2 apis (polygon-io#163)
* make websockets have 2 apis * lint
1 parent 2f0da95 commit 603d05b

File tree

10000

5 files changed

+255
-52
lines changed

5 files changed

+255
-52
lines changed

README.md

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
Python client for the [Polygon.io API](https://polygon.io).
77

8-
## Getting started
9-
### Install
8+
## Install
109

1110
`pip install polygon-api-client`
1211

1312
Requires python version >= 3.7
1413

14+
## REST getting started
1515
### Getting aggs
1616
```python
1717
from polygon import RESTClient
@@ -42,19 +42,86 @@ client = RESTClient() # Uses POLYGON_API_KEY env var. Can optionally supply your
4242
response = client.get_aggs("AAPL", 1, "day", "2005-04-01", "2005-04-04", raw=True)
4343
```
4444

45-
### Streaming websockets
45+
## WebSocket getting started
4646

47+
### Simple synchronous callback
4748
```python
4849
from polygon import WebSocketClient
49-
from polygon.websocket.models import Market, Feed, WebSocketMessage
50-
import asyncio
50+
from polygon.websocket.models import WebSocketMessage
51+
from typing import List
5152

52-
client = WebSocketClient(market=Market.Stocks, feed=Feed.RealTime) # Uses POLYGON_API_KEY env var. Can optionally supply your key.
53-
client.subscribe('T.AAPL')
53+
c = WebSocketClient(subscriptions=['T.AAPL']) # Uses POLYGON_API_KEY env var. Can optionally supply your key.
5454

55-
def handle_msg(msg: WebSocketMessage):
56-
print(msg)
55+
def handle_msg(msgs: List[WebSocketMessage]):
56+
for m in msgs:
57+
print(m)
5758

58-
asyncio.run(client.connect(handle_msg))
59+
c.run(handle_msg)
5960
```
6061

62+
### Synchronous aggregates
63+
```python
64+
from polygon import WebSocketClient
65+
from polygon.websocket.models import WebSocketMessage
66+
from typing import List
67+
68+
class MessageHandler:
69+
count = 0
70+
71+
def handle_msg(self, msgs: List[WebSocketMessage]):
72+
for m in msgs:
73+
if type(m) == EquityTrade:
74+
print(self.count, m)
75+
self.count += 1
76+
77+
h = MessageHandler()
78+
79+
def handle_msg(msgs: List[WebSocketMessage]):
80+
h.handle_msg(msgs)
81+
82+
c.run(handle_msg)
83+
```
84+
85+
### Asynchronous callback
86+
```python
87+
from polygon import WebSocketClient
88+
from polygon.websocket.models import WebSocketMessage
89+
from typing import List
90+
91+
c = WebSocketClient(subscriptions=['T.AAPL']) # Uses POLYGON_API_KEY env var. Can optionally supply your key.
92+
93+
async def handle_msg(msgs: List[WebSocketMessage]):
94+
for m in msgs:
95+
print(m)
96+
97+
async def timeout():
98+
await asyncio.sleep(1)
99+
print('unsubscribe_all')
100+
c.unsubscribe_all()
101+
await asyncio.sleep(1)
102+
print('close')
103+
await c.close()
104+
105+
async def main():
106+
await asyncio.gather(
107+
c.connect(handle_msg),
108+
timeout()
109+
)
110+
111+
asyncio.run(main())
112+
```
113+
114+
### Getting raw response
115+
```python
116+
from polygon import WebSocketClient
117+
from polygon.websocket.models import WebSocketMessage
118+
from typing import Union
119+
import json
120+
121+
c = WebSocketClient(subscriptions=['T.*'], raw=True)
122+
123+
def handle_msg(msgs: Union[str, bytes]):
124+
print(json.loads(msgs))
125+
126+
c.run(handle_msg)
127+
```

docs/source/Getting-Started.rst

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,100 @@ If it is a paginated :code:`list_*` response it's up to you to handle the "next_
175175
# }'
176176
177177
178-
Websocket client usage
178+
WebSocket client usage
179179
----------------------
180180

181+
.. automethod:: polygon.WebSocketClient.__init__
182+
183+
The simplest way to use the websocket client is to just provide a callback:
184+
181185
.. code-block:: python
182186
183187
from polygon import WebSocketClient
184-
from polygon.websocket.models import Market, Feed, WebSocketMessage
188+
from polygon.websocket.models import WebSocketMessage
185189
from typing import List
186-
import asyncio
187190
188-
client = WebSocketClient(market=Market.Stocks, feed=Feed.RealTime) # Uses POLYGON_API_KEY env var. Can optionally supply your key.
189-
client.subscribe('T.AAPL')
191+
c = WebSocketClient(subscriptions=['T.AAPL'])
190192
191-
async def handle_msg(msg: List[WebSocketMessage]):
192-
print(msg)
193+
def handle_msg(msgs: List[WebSocketMessage]):
194+
for m in msgs:
195+
print(m)
193196
194-
asyncio.run(client.connect(handle_msg))
197+
c.run(handle_msg)
195198
196199
.. note::
197-
Raises :code:`AuthError` if invalid API key is provided.
200+
Raises :code:`AuthError` if invalid API key is provided.
201+
202+
If you want to capture state you can use a global variable inside the callback.
203+
Alternatively, you can wrap a class method in a closure.
204+
205+
.. code-block:: python
206+
207+
from polygon import WebSocketClient
208+
from polygon.websocket.models import WebSocketMessage
209+
from typing import List
210+
211+
class MessageHandler:
212+
count = 0
213+
214+
def handle_msg(self, msgs: List[WebSocketMessage]):
215+
for m in msgs:
216+
if type(m) == EquityTrade:
217+
print(self.count, m)
218+
self.count += 1
219+
220+
h = MessageHandler()
221+
222+
def handle_msg(msgs: List[WebSocketMessage]):
223+
h.handle_msg(msgs)
224+
225+
c.run(handle_msg)
226+
227+
Under the hood our client uses an asynchronous runtime. To manage the runtime
228+
yourself (including unsubscribing and subscribing) you'll need to use asyncio
229+
and the :code:`.connect` method:
230+
231+
.. code-block:: python
232+
233+
from polygon import WebSocketClient
234+
from polygon.websocket.models import WebSocketMessage
235+
from typing import List
236+
237+
c = WebSocketClient(subscriptions=['T.AAPL']) # Uses POLYGON_API_KEY env var. Can optionally supply your key.
238+
239+
async def handle_msg(msgs: List[WebSocketMessage]):
240+
for m in msgs:
241+
print(m)
242+
243+
async def timeout():
244+
await asyncio.sleep(1)
245+
print('unsubscribe_all')
246+
c.unsubscribe_all()
247+
await asyncio.sleep(1)
248+
print('close')
249+
await c.close()
250+
251+
async def main():
252+
await asyncio.gather(
253+
c.connect(handle_msg),
254+
timeout()
255+
)
256+
257+
asyncio.run(main())
258+
259+
To handle raw messages yourself pass `raw=True`:
260+
261+
.. code-block:: python
262+
263+
from polygon import WebSocketClient
264+
from polygon.websocket.models import WebSocketMessage
265+
from typing import Union
266+
import json
267+
268+
c = WebSocketClient(subscriptions=['T.*'], raw=True)
269+
270+
def handle_msg(msgs: Union[str, bytes]):
271+
print(json.loads(msgs))
272+
273+
c.run(handle_msg)
274+

docs/source/WebSocket.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ Connect
1313
============================
1414
.. automethod:: polygon.WebSocketClient.connect
1515

16+
============================
17+
Run
18+
============================
19+
.. automethod:: polygon.WebSocketClient.run
20+
1621
============================
1722
Subscribe
1823
============================

polygon/websocket/__init__.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
from enum import Enum
33
from typing import Optional, Union, List, Set, Callable, Awaitable
44
import json
5-
import inspect
5+
import asyncio
66
import ssl
77
import certifi
88
from .models import *
99
from websockets.client import connect, WebSocketClientProtocol
1010
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
11-
from websockets.typing import Data
1211

1312
env_key = "POLYGON_API_KEY"
1413

@@ -66,7 +65,10 @@ def __init__(
6665
# https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection
6766
async def connect(
6867
self,
69-
processor: Callable[[Union[List[WebSocketMessage], Data]], Optional[Awaitable]],
68+
processor: Union[
69+
Callable[[List[WebSocketMessage]], Awaitable],
70+
Callable[[Union[str, bytes]], Awaitable],
71+
],
7072
close_timeout: int = 1,
7173
**kwargs,
7274
):
@@ -78,7 +80,6 @@ async def connect(
7880
:raises AuthError: If invalid API key is supplied.
7981
"""
8082
reconnects = 0
81-
isasync = inspect.iscoroutinefunction(processor)
8283
if self.verbose:
8384
print("connect:", self.url)
8485
# darwin needs some extra <3
@@ -116,7 +117,9 @@ async def connect(
116117
self.subs = set(self.scheduled_subs)
117118
self.schedule_resub = False
118119

119-
cmsg: Union[List[WebSocketMessage], Data] = await s.recv()
120+
cmsg: Union[
121+
List[WebSocketMessage], Union[str, bytes]
122+
] = await s.recv()
120123
# we know cmsg is Data
121124
10000 msgJson = json.loads(cmsg) # type: ignore
122125
for m in msgJson:
@@ -128,11 +131,7 @@ async def connect(
128131
cmsg = parse(msgJson)
129132

130133
if len(cmsg) > 0:
131-
if isasync:
132-
# we know processor is Awaitable
133-
await processor(cmsg) # type: ignore
134-
else:
135-
processor(cmsg)
134+
await processor(cmsg) # type: ignore
136135
except ConnectionClosedOK:
137136
if self.verbose:
138137
print("connection closed (OK)")
@@ -145,6 +144,28 @@ async def connect(
145144
return
146145
continue
147146

147+
def run(
148+
self,
149+
handle_msg: Union[
150+
Callable[[List[WebSocketMessage]], None],
151+
Callable[[Union[str, bytes]], None],
152+
],
153+
close_timeout: int = 1,
154+
**kwargs,
155+
):
156+
"""
157+
Connect to websocket server and run `processor(msg)` on every new `msg`. Synchronous version of `.connect`.
158+
159+
:param processor: The callback to process messages.
160+
:param close_timeout: How long to wait for handshake when calling .close.
161+
:raises AuthError: If invalid API key is supplied.
162+
"""
163+
164+
async def handle_msg_wrapper(msgs):
165+
handle_msg(msgs)
166+
167+
asyncio.run(self.connect(handle_msg_wrapper, close_timeout, **kwargs))
168+
148169
async def _subscribe(self, topics: Union[List[str], Set[str]]):
149170
if self.websocket is None or len(topics) == 0:
150171
return

0 commit comments

Comments
 (0)
0