diff --git a/src/cryptoadvance/spectrum/elsock.py b/src/cryptoadvance/spectrum/elsock.py index 5b7dc98..216b57f 100644 --- a/src/cryptoadvance/spectrum/elsock.py +++ b/src/cryptoadvance/spectrum/elsock.py @@ -17,6 +17,14 @@ logger = logging.getLogger(__name__) +def _ver(version_str: str) -> tuple: + """Parse a "X.Y" version string into a comparable tuple.""" + try: + return tuple(int(x) for x in str(version_str).split(".")) + except ValueError: + return (0,) + + class ElSockTimeoutException(Exception): """Called in different contexts where a timeout is relevant""" @@ -54,6 +62,10 @@ class ElectrumSocket: sleep_recv_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse sleep_write_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse socket_timeout = 10 # seconds for self._socket.recv(2048) (won't show up in the logs) + + CLIENT_NAME = "Spectrum/1.0" + PROTOCOL_MIN = "1.3" # blockchain.block.header was added in 1.3 + PROTOCOL_MAX = "1.4" # fmt: on def __init__( @@ -108,6 +120,8 @@ def __init__( self._requests = [] self._notifications = [] self._wanted_status = "ok" # "ok" or "down" + self.server_software_version = None # set during version negotiation, e.g. "/libbitcoin:4.0.0/" + self.negotiated_protocol = None # set during version negotiation, e.g. "1.4" # The monitor-thread will create the other threads self._monitor_thread = create_and_start_bg_thread(self._monitor_loop) while not (self.status == "ok" or self.status.startswith("broken_")): @@ -231,11 +245,10 @@ def _create_threads(self) -> bool: try: self._recv_thread = create_and_start_bg_thread(self.recv_loop) self._write_thread = create_and_start_bg_thread(self._write_loop) - self._ping_thread = create_and_start_bg_thread(self._ping_loop) self._notify_thread = create_and_start_bg_thread(self._notify_loop) return True except Exception as e: - logger.exception() + logger.exception(e) return False def is_socket_closed(self) -> bool: @@ -257,14 +270,15 @@ def _monitor_loop(self): If the ping thread is not alive, the socket connection and threads will be recreated via walking through this state-machine: - [![](https://mermaid.ink/img/pako:eNqdkj9vAyEMxb_KyWOVWzre0KkZM3VrqU4OOAmCwxFn-kdRvnu5I0mVKmEok-H9eDbiHUCzIehgFBR6triNOLQfjyo0eb09vDdt-9ToSCg2bPuRtSMpIrtZW0d2FHpnvZ8I2WXWjAW5rd23rPCGP0OBpuq-xZ_Da_BquFvkaYDacP9oUH13haUv0kmoj1Rkzt3R-zVqV25VgNmAHSxgoDigNfmPD9MtBbKjgRR0uTS0weRFgQrHjKa9ySlYGiscodugH2kBmIRfvoOGTmKiM3SKyoXaY3hl_t3TbLIq4ZozdvwB373ZuA?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqdkj9vAyEMxb_KyWOVWzre0KkZM3VrqU4OOAmCwxFn-kdRvnu5I0mVKmEok-H9eDbiHUCzIehgFBR6triNOLQfjyo0eb09vDdt-9ToSCg2bPuRtSMpIrtZW0d2FHpnvZ8I2WXWjAW5rd23rPCGP0OBpuq-xZ_Da_BquFvkaYDacP9oUH13haUv0kmoj1Rkzt3R-zVqV25VgNmAHSxgoDigNfmPD9MtBbKjgRR0uTS0weRFgQrHjKa9ySlYGiscodugH2kBmIRfvoOGTmKiM3SKyoXaY3hl_t3TbLIq4ZozdvwB373ZuA) + [![](https://mermaid.ink/img/pako:eNqdkk0OwiAQha9CWBq7cenClbdQ04wwFsJfQqmaNL27FFxYQzGRFXnvm5lHmJEyx5HuCe0DBDxK6DyY5r47WxLPaXMhTXMgzCMEabu2d0xhyKZTybt6p9C2Smo9E0FElvcZKXvrLSs8dw-bofm23uJLXIKLcCXyHaAW7o8BP99dJAVY3gtQUc_Uh5AAfCIbArYec72L4UDrKzBVLqh9VaVZqnWKbgk16A1IHhdmpEGgSavD8QaDDnSaXhlMzyQ?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqdkk0OwiAQha9CWBq7cenClbdQ04wwFsJfQqmaNL27FFxYQzGRFXnvm5lHmJEyx5HuCe0DBDxK6DyY5r47WxLPaXMhTXMgzCMEabu2d0xhyKZTybt6p9C2Smo9E0FElvcZKXvrLSs8dw-bofm23uJLXIKLcCXyHaAW7o8BP99dJAVY3gtQUc_Uh5AAfCIbArYec72L4UDrKzBVLqh9VaVZqnWKbgk16A1IHhdmpEGgSavD8QaDDnSaXhlMzyQ) The states are stored in the `ElectrumSocket.state` property. The Constructor of the `ElectrumSocket` is hardly doing more than just setting up the `_monitor_thread` which is an endless loop going through these states: * `creating_sockets` will create the sockets and pass to `creating_threads` or to `broken_creating_sockets` if that fails * `broken_creating_sockets` will try to create the socket and sleep for some time if that fails (and endlessly try to do that) - * `creating_threads` will create the write/recv/ping/notify threads and start them + * `creating_threads` will create the write/recv/notify threads (not ping yet) and start them + * `handshaking` calls server.version synchronously. On success it stores the negotiated protocol version and starts the ping thread. On failure it transitions to `broken_killing_threads`. * `execute_recreation_callback` will call that callback after setting the status to `ok` - * the `ok` state will now simply check the other thready and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads` + * the `ok` state will now simply check the other threads and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads` * `broken_killing_threads` will set `self.running` to false and wait for the threads to terminate. Especially the `recv` thread might not terminate until he get internet connection (again). This might take forever. If all threads are terminated, it will transition to `creating_socket` """ @@ -296,7 +310,47 @@ def _monitor_loop(self): if not self._create_threads(): time.sleep(10) continue - self.status = "execute_recreation_callback" + self.status = "handshaking" + + if self.status == "handshaking": + self.server_software_version = None + self.negotiated_protocol = None + _handshake_ok = False + try: + result = self.call( + "server.version", + [self.CLIENT_NAME, [self.PROTOCOL_MIN, self.PROTOCOL_MAX]], + ) + if isinstance(result, list) and len(result) == 2: + self.server_software_version, self.negotiated_protocol = result + logger.info( + f"Electrum handshake complete: server={self.server_software_version!r} " + f"protocol={self.negotiated_protocol!r}" + ) + if _ver(self.negotiated_protocol) < _ver(self.PROTOCOL_MIN): + logger.error( + f"Negotiated protocol {self.negotiated_protocol!r} is below " + f"required minimum {self.PROTOCOL_MIN!r}. Reconnecting." + ) + else: + _handshake_ok = True + else: + logger.error(f"Unexpected server.version response: {result!r}. Reconnecting.") + except RPCError as e: + logger.error( + f"server.version rejected (code={e.code}): {e.message!r}. " + f"Server may not support protocol range " + f"[{self.PROTOCOL_MIN}, {self.PROTOCOL_MAX}]. Reconnecting." + ) + except ElSockTimeoutException: + logger.error("server.version timed out. Reconnecting.") + except Exception as e: + logger.error(f"server.version failed: {e}. Reconnecting.") + if not _handshake_ok: + self.status = "broken_killing_threads" + else: + self._ping_thread = create_and_start_bg_thread(self._ping_loop) + self.status = "execute_recreation_callback" if self.status == "execute_recreation_callback": # set the new status here before we call the callback @@ -502,7 +556,7 @@ def _ping_loop(self): try: self.ping() tries = 0 - except ElSockTimeoutException as e: + except ElSockTimeoutException: tries = tries + 1 logger.error( f"Timeout in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met" diff --git a/tests/integration/elsock_test.py b/tests/integration/elsock_test.py index 5b5dbde..2ce1a33 100644 --- a/tests/integration/elsock_test.py +++ b/tests/integration/elsock_test.py @@ -54,7 +54,13 @@ def callback(something): ) assert ( caplog.text.count( - "ElectrumSocket Status changed from creating_threads to execute_recreation_callback" + "ElectrumSocket Status changed from creating_threads to handshaking" + ) + == 1 + ) + assert ( + caplog.text.count( + "ElectrumSocket Status changed from handshaking to execute_recreation_callback" ) == 1 ) @@ -74,6 +80,14 @@ def callback(something): f"...................................... timer: {i} seconds passed (elsock.is_socket_closed() returns {elsock.is_socket_closed()})" ) time.sleep(1) + for i in range(0, 20): + if elsock.status == "ok": + break + logger.info( + f"...................................... waiting for socket recovery: {i} seconds passed (status={elsock.status})" + ) + time.sleep(1) + assert elsock.status == "ok" logger.info( f"{datetime.now()}========================The socket connection should now work properly again================================" ) @@ -81,7 +95,7 @@ def callback(something): ts = elsock.ping() logger.info(f"second working ping in {ts} ms") assert ts < 10 - assert caplog.text.count("ElectrumSocket Status changed") == 9 + assert caplog.text.count("ElectrumSocket Status changed") == 11 assert ( caplog.text.count( @@ -103,7 +117,13 @@ def callback(something): ) assert ( caplog.text.count( - "ElectrumSocket Status changed from creating_threads to execute_recreation_callback" + "ElectrumSocket Status changed from creating_threads to handshaking" + ) + == 2 + ) + assert ( + caplog.text.count( + "ElectrumSocket Status changed from handshaking to execute_recreation_callback" ) == 2 )