diff --git a/polygon/websocket/websocket_client.py b/polygon/websocket/websocket_client.py index 3c046485..9b18bf7d 100644 --- a/polygon/websocket/websocket_client.py +++ b/polygon/websocket/websocket_client.py @@ -1,4 +1,3 @@ -import signal import threading from typing import Optional, Callable @@ -9,44 +8,30 @@ CRYPTO_CLUSTER = "crypto" -class WebSocketClient: - DEFAULT_HOST = "socket.polygon.io" +def _format_params(params): + return ','.join(params) + - # TODO: Either an instance of the client couples 1:1 with the cluster or an instance of the Client couples 1:3 with - # the 3 possible clusters (I think I like client per, but then a problem is the user can make multiple clients for - # the same cluster and that's not desirable behavior, - # somehow keeping track with multiple Client instances will be the difficulty) - def __init__(self, cluster: str, auth_key: str, process_message: Optional[Callable[[str], None]] = None, +class WebSocketClient: + def __init__(self, cluster: str, auth_key: str, service: str = None, + process_message: Optional[Callable[[websocket.WebSocketApp, str], None]] = None, on_close: Optional[Callable[[websocket.WebSocketApp], None]] = None, on_error: Optional[Callable[[websocket.WebSocketApp, str], None]] = None): - self._host = self.DEFAULT_HOST - self.url = f"wss://{self._host}/{cluster}" - self.ws: websocket.WebSocketApp = websocket.WebSocketApp(self.url, on_open=self._default_on_open(), - on_close=self._default_on_close, - on_error=self._default_on_error, - on_message=self._default_on_message()) + self.url = f'wss://{service or "socket"}.polygon.io/{cluster}' self.auth_key = auth_key - self.process_message = process_message - self.ws.on_close = on_close - self.ws.on_error = on_error + self.ws: websocket.WebSocketApp = websocket.WebSocketApp(self.url, + on_close=on_close, + on_error=on_error, + on_message=process_message) - # being authenticated is an event that must occur before any other action is sent to the server - self._authenticated = threading.Event() - # self._run_thread is only set if the client is run asynchronously self._run_thread: Optional[threading.Thread] = None - # TODO: this probably isn't great design. - # If the user defines their own signal handler then this will gets overwritten. - # We still need to make sure that killing, terminating, interrupting the program closes the connection - signal.signal(signal.SIGINT, self._cleanup_signal_handler()) - signal.signal(signal.SIGTERM, self._cleanup_signal_handler()) - - def run(self): - self.ws.run_forever() + def run(self, **kwargs): + self.ws.run_forever(**kwargs) - def run_async(self): - self._run_thread = threading.Thread(target=self.run) + def run_async(self, **kwargs): + self._run_thread = threading.Thread(target=self.run, kwargs=kwargs) self._run_thread.start() def close_connection(self): @@ -55,57 +40,13 @@ def close_connection(self): self._run_thread.join() def subscribe(self, *params): - # TODO: make this a decorator or context manager - self._authenticated.wait() - - sub_message = '{"action":"subscribe","params":"%s"}' % self._format_params(params) - self.ws.send(sub_message) + fparams = _format_params(params) + self.ws.send(f'{{"action":"subscribe","params":"{fparams}"}}') def unsubscribe(self, *params): - # TODO: make this a decorator or context manager - self._authenticated.wait() - - sub_message = '{"action":"unsubscribe","params":"%s"}' % self._format_params(params) - self.ws.send(sub_message) - - def _cleanup_signal_handler(self): - return lambda signalnum, frame: self.close_connection() - - def _authenticate(self, ws): - ws.send('{"action":"auth","params":"%s"}' % self.auth_key) - self._authenticated.set() - - @staticmethod - def _format_params(params): - return ",".join(params) - - @property - def process_message(self): - return self.__process_message - - @process_message.setter - def process_message(self, pm): - if pm: - self.__process_message = pm - self.ws.on_message = lambda ws, message: self.__process_message(message) - - def _default_on_message(self): - return lambda ws, message: self._default_process_message(message) - - @staticmethod - def _default_process_message(message): - print(message) - - def _default_on_open(self): - def f(ws): - self._authenticate(ws) - - return f + fparams = _format_params(params) + self.ws.send(f'{{"action":"unsubscribe","params":"{fparams}"}}') - @staticmethod - def _default_on_error(ws, error): - print("error:", error) + def authenticate(self): + self.ws.send(f'{{"action":"auth","params":"{self.auth_key}"}}') - @staticmethod - def _default_on_close(ws): - print("### closed ###") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..242e366b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,5 @@ +# pyproject.toml +[build-system] +# XXX: If your project needs other packages to build properly, add them to this list. +requires = ["setuptools >= 42.0.0"] +build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index 45095315..9a1a28a2 100644 --- a/setup.py +++ b/setup.py @@ -2,13 +2,7 @@ from setuptools import setup, find_packages -import os -import sys - -version = os.getenv("VERSION") -if not version: - print("no version supplied") - sys.exit(1) +VERSION = "0.4" def get_readme_md_contents(): """read the contents of your README file""" @@ -18,7 +12,7 @@ def get_readme_md_contents(): setup( name="polygon-api-client", - version=version, + version=VERSION, description="Polygon API client", long_description=get_readme_md_contents(), long_description_content_type="text/markdown", diff --git a/websocket_example/polygon.py b/websocket_example/polygon.py index 760a27c0..3d8c6f1e 100644 --- a/websocket_example/polygon.py +++ b/websocket_example/polygon.py @@ -5,7 +5,7 @@ from polygon import WebSocketClient, STOCKS_CLUSTER -def my_custom_process_message(message): +def my_custom_process_message(ws, message): print("this is my custom message processing", message)