Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions src/cryptoadvance/spectrum/elsock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
logger = logging.getLogger(__name__)


def _ver(version_str: str) -> tuple:
"""Parse a "X.Y" version string into a comparable tuple."""
try:
return tuple(int(x) for x in str(version_str).split("."))
except ValueError:
return (0,)


class ElSockTimeoutException(Exception):
"""Called in different contexts where a timeout is relevant"""

Expand Down Expand Up @@ -54,6 +62,10 @@ class ElectrumSocket:
sleep_recv_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse
sleep_write_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse
socket_timeout = 10 # seconds for self._socket.recv(2048) (won't show up in the logs)

CLIENT_NAME = "Spectrum/1.0"
PROTOCOL_MIN = "1.3" # blockchain.block.header was added in 1.3
PROTOCOL_MAX = "1.4"
# fmt: on

def __init__(
Expand Down Expand Up @@ -108,6 +120,8 @@ def __init__(
self._requests = []
self._notifications = []
self._wanted_status = "ok" # "ok" or "down"
self.server_software_version = None # set during version negotiation, e.g. "/libbitcoin:4.0.0/"
self.negotiated_protocol = None # set during version negotiation, e.g. "1.4"
# The monitor-thread will create the other threads
self._monitor_thread = create_and_start_bg_thread(self._monitor_loop)
while not (self.status == "ok" or self.status.startswith("broken_")):
Expand Down Expand Up @@ -231,11 +245,10 @@ def _create_threads(self) -> bool:
try:
self._recv_thread = create_and_start_bg_thread(self.recv_loop)
self._write_thread = create_and_start_bg_thread(self._write_loop)
self._ping_thread = create_and_start_bg_thread(self._ping_loop)
self._notify_thread = create_and_start_bg_thread(self._notify_loop)
return True
except Exception as e:
logger.exception()
logger.exception(e)
return False

def is_socket_closed(self) -> bool:
Expand All @@ -257,14 +270,15 @@ def _monitor_loop(self):
If the ping thread is not alive, the socket connection and threads will be recreated via walking through
this state-machine:

[![](https://mermaid.ink/img/pako:eNqdkj9vAyEMxb_KyWOVWzre0KkZM3VrqU4OOAmCwxFn-kdRvnu5I0mVKmEok-H9eDbiHUCzIehgFBR6triNOLQfjyo0eb09vDdt-9ToSCg2bPuRtSMpIrtZW0d2FHpnvZ8I2WXWjAW5rd23rPCGP0OBpuq-xZ_Da_BquFvkaYDacP9oUH13haUv0kmoj1Rkzt3R-zVqV25VgNmAHSxgoDigNfmPD9MtBbKjgRR0uTS0weRFgQrHjKa9ySlYGiscodugH2kBmIRfvoOGTmKiM3SKyoXaY3hl_t3TbLIq4ZozdvwB373ZuA?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqdkj9vAyEMxb_KyWOVWzre0KkZM3VrqU4OOAmCwxFn-kdRvnu5I0mVKmEok-H9eDbiHUCzIehgFBR6triNOLQfjyo0eb09vDdt-9ToSCg2bPuRtSMpIrtZW0d2FHpnvZ8I2WXWjAW5rd23rPCGP0OBpuq-xZ_Da_BquFvkaYDacP9oUH13haUv0kmoj1Rkzt3R-zVqV25VgNmAHSxgoDigNfmPD9MtBbKjgRR0uTS0weRFgQrHjKa9ySlYGiscodugH2kBmIRfvoOGTmKiM3SKyoXaY3hl_t3TbLIq4ZozdvwB373ZuA)
[![](https://mermaid.ink/img/pako:eNqdkk0OwiAQha9CWBq7cenClbdQ04wwFsJfQqmaNL27FFxYQzGRFXnvm5lHmJEyx5HuCe0DBDxK6DyY5r47WxLPaXMhTXMgzCMEabu2d0xhyKZTybt6p9C2Smo9E0FElvcZKXvrLSs8dw-bofm23uJLXIKLcCXyHaAW7o8BP99dJAVY3gtQUc_Uh5AAfCIbArYec72L4UDrKzBVLqh9VaVZqnWKbgk16A1IHhdmpEGgSavD8QaDDnSaXhlMzyQ?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqdkk0OwiAQha9CWBq7cenClbdQ04wwFsJfQqmaNL27FFxYQzGRFXnvm5lHmJEyx5HuCe0DBDxK6DyY5r47WxLPaXMhTXMgzCMEabu2d0xhyKZTybt6p9C2Smo9E0FElvcZKXvrLSs8dw-bofm23uJLXIKLcCXyHaAW7o8BP99dJAVY3gtQUc_Uh5AAfCIbArYec72L4UDrKzBVLqh9VaVZqnWKbgk16A1IHhdmpEGgSavD8QaDDnSaXhlMzyQ)

The states are stored in the `ElectrumSocket.state` property. The Constructor of the `ElectrumSocket` is hardly doing more than just setting up the `_monitor_thread` which is an endless loop going through these states:
* `creating_sockets` will create the sockets and pass to `creating_threads` or to `broken_creating_sockets` if that fails
* `broken_creating_sockets` will try to create the socket and sleep for some time if that fails (and endlessly try to do that)
* `creating_threads` will create the write/recv/ping/notify threads and start them
* `creating_threads` will create the write/recv/notify threads (not ping yet) and start them
* `handshaking` calls server.version synchronously. On success it stores the negotiated protocol version and starts the ping thread. On failure it transitions to `broken_killing_threads`.
* `execute_recreation_callback` will call that callback after setting the status to `ok`
* the `ok` state will now simply check the other thready and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads`
* the `ok` state will now simply check the other threads and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads`
* `broken_killing_threads` will set `self.running` to false and wait for the threads to terminate. Especially the `recv` thread might not terminate until he get internet connection (again). This might take forever. If all threads are terminated, it will transition to `creating_socket`

"""
Expand Down Expand Up @@ -296,7 +310,47 @@ def _monitor_loop(self):
if not self._create_threads():
time.sleep(10)
continue
self.status = "execute_recreation_callback"
self.status = "handshaking"

if self.status == "handshaking":
self.server_software_version = None
self.negotiated_protocol = None
_handshake_ok = False
try:
result = self.call(
"server.version",
[self.CLIENT_NAME, [self.PROTOCOL_MIN, self.PROTOCOL_MAX]],
)
if isinstance(result, list) and len(result) == 2:
self.server_software_version, self.negotiated_protocol = result
logger.info(
f"Electrum handshake complete: server={self.server_software_version!r} "
f"protocol={self.negotiated_protocol!r}"
)
if _ver(self.negotiated_protocol) < _ver(self.PROTOCOL_MIN):
logger.error(
f"Negotiated protocol {self.negotiated_protocol!r} is below "
f"required minimum {self.PROTOCOL_MIN!r}. Reconnecting."
)
else:
_handshake_ok = True
else:
logger.error(f"Unexpected server.version response: {result!r}. Reconnecting.")
except RPCError as e:
logger.error(
f"server.version rejected (code={e.code}): {e.message!r}. "
f"Server may not support protocol range "
f"[{self.PROTOCOL_MIN}, {self.PROTOCOL_MAX}]. Reconnecting."
)
except ElSockTimeoutException:
logger.error("server.version timed out. Reconnecting.")
except Exception as e:
logger.error(f"server.version failed: {e}. Reconnecting.")
if not _handshake_ok:
self.status = "broken_killing_threads"
else:
self._ping_thread = create_and_start_bg_thread(self._ping_loop)
self.status = "execute_recreation_callback"

if self.status == "execute_recreation_callback":
# set the new status here before we call the callback
Expand Down Expand Up @@ -502,7 +556,7 @@ def _ping_loop(self):
try:
self.ping()
tries = 0
except ElSockTimeoutException as e:
except ElSockTimeoutException:
tries = tries + 1
logger.error(
f"Timeout in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met"
Expand Down