diff --git a/src/OpenClaw.Shared/OpenClawGatewayClient.cs b/src/OpenClaw.Shared/OpenClawGatewayClient.cs index f93b3905..57a31e02 100644 --- a/src/OpenClaw.Shared/OpenClawGatewayClient.cs +++ b/src/OpenClaw.Shared/OpenClawGatewayClient.cs @@ -47,6 +47,16 @@ public class OpenClawGatewayClient : WebSocketClientBase, IOperatorGatewayClient private string? _mainSessionKey; private bool _hasHandshakeSnapshot; + // Application-level heartbeat — see WebSocketClientBase.ApplicationPingInterval. + private readonly Dictionary> _pendingHeartbeats = new(); + private readonly object _heartbeatLock = new(); + private long _heartbeatSeq; + // Bounds the pre-handshake heartbeat skip — if the gateway accepts the WS upgrade but + // never sends hello-ok (stuck auth handler, partial deploy), the WS protocol keep-alive + // won't catch it as long as the server still pongs at the protocol layer. After N + // consecutive pre-handshake intervals we abort the socket and force reconnect. + private int _consecutivePreHandshakeSkips; + /// /// The gateway's canonical main session key as published in the hello-ok /// snapshot (preferring the canonical sessionDefaults.mainSessionKey @@ -154,6 +164,13 @@ protected override Task ProcessMessageAsync(string json) protected override Task OnConnectedAsync() { ResetUnsupportedMethodFlags(); + // Reset the pre-handshake skip counter at the start of each new connection, + // not just on OnDisconnected. OnDisconnected is only fired from two of the five + // listen-loop exit paths (Close frame, ConnectionClosedPrematurely); the others + // (OperationCanceledException, ObjectDisposedException, generic Exception — which + // includes KeepAliveTimeout aborts surfaced as plain WebSocketException) silently + // skip it. Resetting on connect makes the 3-strike budget robust to all exit paths. + Interlocked.Exchange(ref _consecutivePreHandshakeSkips, 0); return Task.CompletedTask; } @@ -166,6 +183,10 @@ protected override bool ShouldAutoReconnect() protected override void OnDisconnected() { ClearPendingRequests(); + ClearPendingHeartbeats(); + // Reset the pre-handshake skip counter so a fresh reconnect starts a fresh budget; + // otherwise a previously-stalled connection could prematurely fail the next one. + Interlocked.Exchange(ref _consecutivePreHandshakeSkips, 0); // Invalidate the handshake snapshot — the next hello-ok must // re-establish the canonical session key, scopes, etc. Without this, // a reconnect-after-server-restart could leave the tray sending to a @@ -178,6 +199,99 @@ protected override void OnDisconnected() protected override void OnDisposing() { ClearPendingRequests(); + ClearPendingHeartbeats(); + } + + // Application ping cadence — 45s sits between the WS protocol keep-alive (30s) and its + // 60s timeout so the two layers don't fire in lockstep. The point is to verify the + // gateway is still processing application-level requests, not just answering pings. + protected override TimeSpan? ApplicationPingInterval => TimeSpan.FromSeconds(45); + + protected override async Task SendApplicationPingAsync(CancellationToken ct) + { + // Skip pings until the application-level handshake (hello-ok) has completed — + // sending a `req` before then would either be rejected or fail silently. BUT, + // if we sit pre-handshake for too long (gateway WS-pongs but never sends hello-ok), + // force a reconnect so we don't hang forever in a half-handshaken state. + if (!Volatile.Read(ref _hasHandshakeSnapshot)) + { + var skipped = Interlocked.Increment(ref _consecutivePreHandshakeSkips); + const int maxPreHandshakeSkips = 3; // ~135s at 45s cadence + if (skipped >= maxPreHandshakeSkips) + { + _logger.Warn($"gateway hello-ok not received after {skipped} heartbeat intervals; forcing reconnect"); + return false; + } + return true; + } + Interlocked.Exchange(ref _consecutivePreHandshakeSkips, 0); + + var id = $"hb-{Interlocked.Increment(ref _heartbeatSeq)}"; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + lock (_heartbeatLock) { _pendingHeartbeats[id] = tcs; } + + try + { + var msg = new { type = "req", id, method = "ping" }; + // Pass ct through to the SendRawAsync(string, CancellationToken) overload. + // Wrapping the parameterless SendRawAsync in .WaitAsync(ct) only short-circuits + // the OUTER task; the underlying SendAsync keeps running, still holding + // _sendLock, still blocking on a half-dead socket — and may later land on a + // successor socket after reconnect. The overload links ct with the client + // lifetime token and propagates it into both _sendLock.WaitAsync and + // ws.SendAsync so cancellation actually unwinds the send. + await SendRawAsync(JsonSerializer.Serialize(msg), ct); + + using var reg = ct.Register(() => tcs.TrySetResult(false)); + return await tcs.Task; + } + catch (Exception ex) + { + _logger.Debug($"gateway heartbeat send failed: {ex.Message}"); + return false; + } + finally + { + lock (_heartbeatLock) { _pendingHeartbeats.Remove(id); } + } + } + + private bool TryCompleteHeartbeat(string? requestId) + { + if (requestId == null) return false; + TaskCompletionSource? tcs; + lock (_heartbeatLock) + { + if (!_pendingHeartbeats.TryGetValue(requestId, out tcs)) return false; + } + tcs.TrySetResult(true); + return true; + } + + private void ClearPendingHeartbeats() + { + TaskCompletionSource[] pending; + lock (_heartbeatLock) + { + pending = _pendingHeartbeats.Values.ToArray(); + _pendingHeartbeats.Clear(); + } + foreach (var tcs in pending) tcs.TrySetResult(false); + } + + private async Task SendPongAsync(string? requestId) + { + if (requestId == null) return; + var msg = new { type = "res", id = requestId, ok = true, payload = new { pong = true } }; + try { await SendRawAsync(JsonSerializer.Serialize(msg)); } + catch (Exception ex) { _logger.Debug($"gateway pong send failed: {ex.Message}"); } + } + + private async Task SendErrorResAsync(string requestId, string error) + { + var msg = new { type = "res", id = requestId, ok = false, error }; + try { await SendRawAsync(JsonSerializer.Serialize(msg)); } + catch (Exception ex) { _logger.Debug($"gateway error-res send failed: {ex.Message}"); } } // Events @@ -1441,6 +1555,9 @@ private void ProcessMessage(string json) case "res": HandleResponse(root); break; + case "req": + HandleIncomingRequest(root); + break; case "event": HandleEvent(root, json.Length); break; @@ -1456,13 +1573,51 @@ private void ProcessMessage(string json) } } + private void HandleIncomingRequest(JsonElement root) + { + // JsonElement.GetString() throws InvalidOperationException for non-string kinds (Number, + // Object, etc.). Guard with ValueKind so a malformed gateway req only logs and doesn't + // bubble into the read loop's catch-all where it would spam errors. + string? method = (root.TryGetProperty("method", out var mProp) && mProp.ValueKind == JsonValueKind.String) + ? mProp.GetString() : null; + string? id = (root.TryGetProperty("id", out var idProp) && idProp.ValueKind == JsonValueKind.String) + ? idProp.GetString() : null; + + // The gateway may also send application-level pings to verify the operator socket + // is alive. Until we added this handler the gateway would (a) get no response and + // (b) eventually time us out and drop the socket — visible to users as an + // unexplained "reconnecting" toast at the gateway's heartbeat cadence. + switch (method) + { + case "ping": + _ = SendPongAsync(id); + break; + default: + _logger.Debug($"gateway sent unsupported req method: {method ?? ""}"); + // Reply with an error res so the gateway doesn't sit waiting on its own request + // timeout if it adds new server-initiated methods we don't yet support. Only + // reply when we have a valid id — otherwise there's no correlator to set. + if (id != null) _ = SendErrorResAsync(id, "unsupported_method"); + break; + } + } + private void HandleResponse(JsonElement root) { - string? requestMethod = null; string? requestId = null; if (root.TryGetProperty("id", out var idProp)) { requestId = idProp.GetString(); + } + + // Heartbeats are tracked separately from normal request bookkeeping; complete and + // return before any other processing — a heartbeat response carries no useful + // payload for the rest of the pipeline. + if (TryCompleteHeartbeat(requestId)) return; + + string? requestMethod = null; + if (requestId != null) + { requestMethod = TakePendingRequestMethod(requestId); } diff --git a/src/OpenClaw.Shared/WebSocketClientBase.cs b/src/OpenClaw.Shared/WebSocketClientBase.cs index 713574c6..cdb47a58 100644 --- a/src/OpenClaw.Shared/WebSocketClientBase.cs +++ b/src/OpenClaw.Shared/WebSocketClientBase.cs @@ -18,9 +18,18 @@ public abstract class WebSocketClientBase : IDisposable private readonly string _gatewayUrl; private readonly string? _credentials; private CancellationTokenSource _cts; - private bool _disposed; + private CancellationTokenSource? _connectionCts; + private volatile bool _disposed; + // Set via Interlocked.Exchange at the top of Dispose so only the first caller proceeds; + // any concurrent Dispose call observes 1 and returns. Read by ConnectAsync/reconnect + // gates as the intent-to-dispose signal so they bail out before installing a new socket + // or calling subclass OnConnectedAsync. _disposed (flipped after OnDisposing returns) + // preserves the existing "OnDisposing observes live state" contract for subclasses. + private int _disposingFlag; + private bool _disposing => Volatile.Read(ref _disposingFlag) != 0; private int _reconnectAttempts; private int _reconnectLoopActive; + private int _connectInProgress; private readonly SemaphoreSlim _sendLock = new(1, 1); private static readonly int[] BackoffMs = { 1000, 2000, 4000, 8000, 15000, 30000, 60000 }; @@ -88,6 +97,27 @@ protected virtual void OnDisposing() { } /// protected virtual bool ShouldAutoReconnect() => true; + /// + /// Interval between client-initiated application-level ping requests. Return null + /// (the default) to disable. The base class drives the loop and calls + /// on each tick; if that returns false (or throws, + /// or times out per ) the WebSocket is aborted to + /// force reconnect. This is the belt-and-braces companion to KeepAliveTimeout: + /// it catches cases where the protocol is alive but the gateway has stopped processing + /// application traffic. + /// + protected virtual TimeSpan? ApplicationPingInterval => null; + + /// Maximum time to wait for a pong before declaring the connection dead. + protected virtual TimeSpan ApplicationPingTimeout => TimeSpan.FromSeconds(15); + + /// + /// Send a single application-level ping and await its response. Return true on success, + /// false (or throw) to signal a failed liveness check. Default implementation is a no-op. + /// Subclasses opt in by overriding both this and . + /// + protected virtual Task SendApplicationPingAsync(CancellationToken ct) => Task.FromResult(true); + protected WebSocketClientBase(string gatewayUrl, string token, IOpenClawLogger? logger = null) { if (string.IsNullOrEmpty(gatewayUrl)) @@ -105,35 +135,104 @@ protected WebSocketClientBase(string gatewayUrl, string token, IOpenClawLogger? public async Task ConnectAsync() { - if (_disposed) + if (_disposed || _disposing) { - _logger.Debug($"Skipping {ClientRole} connect: client already disposed"); + _logger.Debug($"Skipping {ClientRole} connect: client disposed or disposing"); return; } + // Serialize ConnectAsync. Without this guard, concurrent connects (auto-reconnect + // loop + manual UI "Reconnect") would race on the _webSocket field assignment and the + // listen loop could observe a partly-constructed socket — surfacing as + // "Already one outstanding ReceiveAsync" thrown into the generic catch in + // ListenForMessagesAsync (which does NOT call OnDisconnected). + if (Interlocked.CompareExchange(ref _connectInProgress, 1, 0) != 0) + { + _logger.Debug($"{ClientRole} connect already in progress; skipping concurrent call"); + return; + } + + ClientWebSocket? newWebSocket = null; + CancellationTokenSource? connectionCts = null; + bool ownershipTransferred = false; try { - RaiseStatusChanged(ConnectionStatus.Connecting); + // Subscriber exceptions here must not skip the connect attempt or surface to the + // caller, which would otherwise look like a connect failure and trigger reconnect. + try { RaiseStatusChanged(ConnectionStatus.Connecting); } catch { } _logger.Info($"Connecting to {ClientRole}: {GatewayUrlForDisplay}"); - _webSocket = new ClientWebSocket(); - _webSocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(30); + // Replace any prior per-connection token — cancelling it terminates the + // previous socket's heartbeat task before we spawn a new one for the new socket. + // Use Interlocked.Exchange to make the swap atomic against a concurrent ConnectAsync — + // without it, two connects could race the field assignment and the loser's + // locally-captured CTS could be cancelled/disposed by the winner. + connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token); + var priorCts = Interlocked.Exchange(ref _connectionCts, connectionCts); + try { priorCts?.Cancel(); priorCts?.Dispose(); } catch { } + var connectionToken = connectionCts.Token; + + // Re-check _disposed after installing per-connection state. A racing + // Dispose between the early _disposed check and now would otherwise leak the + // freshly-constructed CTS (and below, the freshly-constructed _webSocket). + if (_disposed || _disposing) + { + _logger.Debug($"{ClientRole} connect aborted: disposed/disposing during setup"); + return; // finally cleans up local CTS / socket + } + + newWebSocket = new ClientWebSocket(); + newWebSocket.Options.KeepAliveInterval = TimeSpan.FromSeconds(30); + // KeepAliveInterval alone only schedules outbound WS protocol pings; it does not + // enforce that pongs arrive. Without KeepAliveTimeout (.NET 9+) a half-open TCP + // (NAT idle drop, VPN reconnect, captive proxy) can leave the socket "Open" + // indefinitely with ReceiveAsync blocking forever and no reconnect ever firing. + // This timeout tells .NET to abort the socket if a pong is not received in time. + newWebSocket.Options.KeepAliveTimeout = TimeSpan.FromSeconds(60); // Set Origin header (convert ws/wss to http/https) var uri = new Uri(_gatewayUrl); var originScheme = uri.Scheme == "wss" ? "https" : "http"; var origin = $"{originScheme}://{uri.Host}:{uri.Port}"; - _webSocket.Options.SetRequestHeader("Origin", origin); + newWebSocket.Options.SetRequestHeader("Origin", origin); if (!string.IsNullOrEmpty(_credentials)) { var credentialsToEncode = GatewayUrlHelper.DecodeCredentials(_credentials); - _webSocket.Options.SetRequestHeader( + newWebSocket.Options.SetRequestHeader( "Authorization", $"Basic {Convert.ToBase64String(Encoding.UTF8.GetBytes(credentialsToEncode))}"); } - await _webSocket.ConnectAsync(uri, _cts.Token); + // Swap _webSocket atomically and dispose the prior socket. The + // re-entrancy guard above prevents two ConnectAsync calls from racing here, + // but ReconnectWithBackoffAsync also pre-disposes the old socket — that's + // intentional (no-op double-dispose is harmless) and keeps the old code + // path's behavior unchanged for non-ConnectAsync callers. + var priorSocket = Interlocked.Exchange(ref _webSocket, newWebSocket); + try { priorSocket?.Dispose(); } catch { } + ownershipTransferred = true; + + // Final disposed check after socket install — Dispose may have run after the + // earlier check and Cleared _connectionCts; in that case bail without spawning loops. + // If _disposing is set but Dispose has not yet reached _webSocket Exchange, the + // newWebSocket we just installed will not be picked up by Dispose; reverse the + // install ourselves so it does not orphan in the field. Abort first (TCP RST if + // past handshake; no-op pre-handshake) then Dispose, mirroring the orphan-clean + // else-if branch below. + if (_disposed || _disposing) + { + _logger.Debug($"{ClientRole} connect aborted: disposed/disposing after socket install"); + var leaked = Interlocked.CompareExchange(ref _webSocket, null, newWebSocket); + if (ReferenceEquals(leaked, newWebSocket)) + { + try { newWebSocket.Abort(); } catch { } + try { newWebSocket.Dispose(); } catch { } + } + return; + } + + await newWebSocket.ConnectAsync(uri, connectionToken); // Don't reset _reconnectAttempts here — TCP connect succeeding doesn't mean // auth will succeed. Reset only after the full application-level handshake @@ -142,7 +241,30 @@ public async Task ConnectAsync() await OnConnectedAsync(); - _ = Task.Run(() => ListenForMessagesAsync(), _cts.Token); + // OnConnectedAsync can yield long enough for Dispose to set _disposing and run + // synchronous OnDisposing. _cts.Cancel() happens only AFTER OnDisposing returns, + // so connectionToken may still be live here even though the client is mid-teardown. + // Check the dispose intent flag explicitly before scheduling background loops so we + // do not start a listener/heartbeat that would race the dispose path. + if (_disposed || _disposing) + { + _logger.Debug($"{ClientRole} connect aborted: disposed/disposing after OnConnectedAsync"); + return; + } + + // Capture both the socket and the per-connection CTS as locals and pass them + // into the loops. Inside the loops, code must use ONLY these captures — never + // re-read _webSocket or _cts. A reconnect can Interlocked.Exchange a successor + // socket into the field while this loop is still running against the prior one; + // reading the field would make us Abort the successor or observe its state. + var capturedSocket = newWebSocket; + var capturedConnectionCts = connectionCts; + _ = Task.Run(() => ListenForMessagesAsync(capturedSocket, capturedConnectionCts), connectionToken); + + if (ApplicationPingInterval is { } pingInterval && pingInterval > TimeSpan.Zero) + { + _ = Task.Run(() => HeartbeatLoopAsync(capturedSocket, pingInterval, connectionToken), connectionToken); + } } catch (OperationCanceledException) { @@ -154,29 +276,77 @@ public async Task ConnectAsync() } catch (Exception ex) { - _logger.Error($"{ClientRole} connection failed", ex); - RaiseStatusChanged(ConnectionStatus.Error); - - if (!_disposed && !_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) + // Each event raise is individually guarded so a throwing logger or subscriber + // can't skip the others and, more importantly, can't skip the reconnect kickoff + // below — that would leave the client permanently disconnected with no further + // retry. Symmetric to the listener finally's hardening. + try { _logger.Error($"{ClientRole} connection failed", ex); } catch { } + try { RaiseStatusChanged(ConnectionStatus.Error); } catch { } + + if (!_disposed && !_disposing && !_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) { _ = ReconnectWithBackoffAsync(); } + else if (ownershipTransferred && !_disposed) + { + // No reconnect will run, so the failed socket would otherwise orphan in + // _webSocket — no listener was started (Task.Run lines never reached) and + // no future ConnectAsync will Interlocked.Exchange it away. Clear and + // dispose it now if it's still ours. Same for the per-connection CTS. + var orphaned = Interlocked.CompareExchange(ref _webSocket, null, newWebSocket); + if (ReferenceEquals(orphaned, newWebSocket)) + { + // Abort first so a post-handshake socket sends a TCP RST instead of leaving + // the gateway holding a phantom session entry until its own idle timeout. + // ClientWebSocket.Dispose() on an open socket aborts the transport but does + // not send a WebSocket close frame; Abort() is the explicit-intent equivalent. + try { newWebSocket?.Abort(); } catch { } + try { newWebSocket?.Dispose(); } catch { } + } + var orphanCts = Interlocked.CompareExchange(ref _connectionCts, null, connectionCts); + if (ReferenceEquals(orphanCts, connectionCts)) + { + try { connectionCts?.Cancel(); } catch { } + try { connectionCts?.Dispose(); } catch { } + } + } + } + finally + { + // If we aborted before transferring ownership to fields, dispose the + // local socket/CTS so they don't leak. After ownership transfer, Dispose() or + // the next ConnectAsync owns them. + if (!ownershipTransferred) + { + try { newWebSocket?.Dispose(); } catch { } + // The CTS was already installed into the field via Exchange; only dispose + // it locally if it's still the current field value (meaning Dispose hasn't + // already swung past it). + var current = Interlocked.CompareExchange(ref _connectionCts, null, connectionCts); + if (ReferenceEquals(current, connectionCts)) + { + try { connectionCts?.Cancel(); } catch { } + try { connectionCts?.Dispose(); } catch { } + } + } + Interlocked.Exchange(ref _connectInProgress, 0); } } - private async Task ListenForMessagesAsync() + private async Task ListenForMessagesAsync(ClientWebSocket ws, CancellationTokenSource listenerCts) { - // Rent a pooled buffer — consistent with the SendRawAsync hot path; avoids a large - // (16–64 KB) heap allocation per connection that would otherwise land on the LOH. + // Use ONLY the captured ws and listenerCts. Re-reading _webSocket / _cts here + // would TOCTOU against a reconnect that has already swapped in a successor socket. + var listenerToken = listenerCts.Token; var buffer = ArrayPool.Shared.Rent(ReceiveBufferSize); var sb = new StringBuilder(); try { - while (_webSocket?.State == WebSocketState.Open && !_cts.Token.IsCancellationRequested) + while (ws.State == WebSocketState.Open && !listenerToken.IsCancellationRequested) { - var result = await _webSocket.ReceiveAsync( - new ArraySegment(buffer, 0, ReceiveBufferSize), _cts.Token); + var result = await ws.ReceiveAsync( + new ArraySegment(buffer, 0, ReceiveBufferSize), listenerToken); if (result.MessageType == WebSocketMessageType.Text) { @@ -198,11 +368,9 @@ private async Task ListenForMessagesAsync() } else if (result.MessageType == WebSocketMessageType.Close) { - var closeStatus = _webSocket.CloseStatus?.ToString() ?? "unknown"; - var closeDesc = _webSocket.CloseStatusDescription ?? "no description"; + var closeStatus = ws.CloseStatus?.ToString() ?? "unknown"; + var closeDesc = ws.CloseStatusDescription ?? "no description"; _logger.Info($"Server closed connection: {closeStatus} - {closeDesc}"); - OnDisconnected(); - RaiseStatusChanged(ConnectionStatus.Disconnected); break; } } @@ -210,33 +378,61 @@ private async Task ListenForMessagesAsync() catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) { _logger.Warn("Connection closed prematurely"); - OnDisconnected(); - RaiseStatusChanged(ConnectionStatus.Disconnected); } catch (OperationCanceledException) { } - catch (ObjectDisposedException) { /* CTS or WebSocket disposed during shutdown */ } + catch (ObjectDisposedException) { /* WebSocket disposed during shutdown */ } catch (Exception ex) { - _logger.Error($"{ClientRole} listen error", ex); - OnError(ex); - RaiseStatusChanged(ConnectionStatus.Error); + try { _logger.Error($"{ClientRole} listen error", ex); } catch { } + try { OnError(ex); } catch { } + try { RaiseStatusChanged(ConnectionStatus.Error); } catch { } } finally { + // Atomically claim ownership of this connection's teardown. All exit paths + // (Close frame, premature close, swallowed OCE/ODE, generic Exception) funnel + // through this single CAS so OnDisconnected / status / reconnect cannot fire + // for a stale listener whose socket was already replaced by a successor + // ConnectAsync. The CAS atomically null-out is the source of truth: only the + // path that swings _webSocket from `ws` to null owns the teardown. + // + // The entire teardown lives in finally so a throwing event handler + // (OnError / RaiseStatusChanged) cannot leak the buffer or skip the CAS, + // which would otherwise leave _webSocket pointing at a dead socket with + // no Disconnected event and no reconnect ever scheduled. ArrayPool.Shared.Return(buffer); - } - - // Auto-reconnect if not intentionally disposed - if (!_disposed) - { - try + try { listenerCts.Cancel(); } catch { } + + // Gate event emission and reconnect on both _disposed AND _disposing so a + // listener exiting mid-Dispose (e.g. subclass OnDisposing blocks on a graceful + // close handshake that the server responds to before _disposed flips) does not + // fire spurious OnDisconnected/Disconnected callbacks into a subclass that is + // already in teardown, or schedule a reconnect that ReconnectWithBackoffAsync + // would only bail out of after the loop entry check. The CAS still runs so the + // socket reference is cleared either by us here or by Dispose's later Exchange. + bool ownedExit = false; + if (!_disposed && !_disposing + && Interlocked.CompareExchange(ref _webSocket, null, ws) == ws) { - if (!_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) + ownedExit = true; + try { ws.Dispose(); } catch { } + if (!_disposed && !_disposing) { - await ReconnectWithBackoffAsync(); + try { OnDisconnected(); } catch { } + try { RaiseStatusChanged(ConnectionStatus.Disconnected); } catch { } } } - catch (ObjectDisposedException) { /* CTS disposed during check */ } + + // Only the owning listener may trigger reconnect. A stale listener whose CAS + // failed must not spawn a reconnect loop against the healthy successor. + // Fire-and-forget so the listener Task completes promptly; awaiting here + // would make any escape from the reconnect loop an unobserved Task exception. + if (ownedExit && !_disposed && !_disposing + && !_cts.Token.IsCancellationRequested + && ShouldAutoReconnect()) + { + _ = ReconnectWithBackoffAsync(); + } } } @@ -249,29 +445,31 @@ protected async Task ReconnectWithBackoffAsync() try { - while (!_disposed && !_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) + while (!_disposed && !_disposing && !_cts.Token.IsCancellationRequested && ShouldAutoReconnect()) { - var delay = BackoffMs[Math.Min(_reconnectAttempts, BackoffMs.Length - 1)]; - // Add 0-25% jitter to prevent thundering herd when multiple clients - // (operator + node) reconnect on the same schedule - var jitter = Random.Shared.Next(0, delay / 4); - delay += jitter; + var baseDelay = BackoffMs[Math.Min(_reconnectAttempts, BackoffMs.Length - 1)]; + // Centered ±25% jitter (was additive 0-25%, i.e. always >= base). With operator + // and node sharing this schedule, a tight one-sided jitter window means both + // reconnect in near-lockstep after every drop, amplifying load on the gateway + // during incidents. Centering around the base value spreads the retry storm + // without materially extending the reconnect time. + var delay = (int)(baseDelay * (0.75 + Random.Shared.NextDouble() * 0.5)); _reconnectAttempts++; _logger.Warn($"{ClientRole} reconnecting in {delay}ms (attempt {_reconnectAttempts})"); - RaiseStatusChanged(ConnectionStatus.Connecting); + // Guard the raise — a throwing subscriber must not abort the reconnect loop. + try { RaiseStatusChanged(ConnectionStatus.Connecting); } catch { } await Task.Delay(delay, _cts.Token); - if (_cts.Token.IsCancellationRequested || _disposed || !ShouldAutoReconnect()) + if (_cts.Token.IsCancellationRequested || _disposed || _disposing || !ShouldAutoReconnect()) { break; } - // Safely dispose old socket - var oldSocket = _webSocket; - _webSocket = null; - try { oldSocket?.Dispose(); } catch { /* ignore dispose errors */ } - + // Don't manually null/dispose _webSocket here — ConnectAsync's + // Interlocked.Exchange handles atomic replacement and prior-socket disposal. + // The old pre-disposal pattern raced against a concurrent ConnectAsync that + // had already swapped in a new socket: we'd null out the brand-new field. await ConnectAsync(); if (IsConnected) @@ -281,11 +479,10 @@ protected async Task ReconnectWithBackoffAsync() } } catch (OperationCanceledException) { } - catch (ObjectDisposedException) { } catch (Exception ex) { - _logger.Error($"{ClientRole} reconnect failed", ex); - RaiseStatusChanged(ConnectionStatus.Error); + try { _logger.Error($"{ClientRole} reconnect failed", ex); } catch { } + try { RaiseStatusChanged(ConnectionStatus.Error); } catch { } } finally { @@ -293,61 +490,161 @@ protected async Task ReconnectWithBackoffAsync() } } - /// Send a text message over the WebSocket. Thread-safe. - protected async Task SendRawAsync(string message) + private async Task HeartbeatLoopAsync(ClientWebSocket ws, TimeSpan interval, CancellationToken ct) { + // Use ONLY the captured ws — never re-read _webSocket. A reconnect can have already + // installed a successor in the field; aborting that would kill a healthy connection. try { - await _sendLock.WaitAsync(_cts.Token); + while (!ct.IsCancellationRequested && ws.State == WebSocketState.Open) + { + try { await Task.Delay(interval, ct); } + catch (OperationCanceledException) { return; } + + if (ct.IsCancellationRequested || ws.State != WebSocketState.Open) + return; + + using var pingCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + pingCts.CancelAfter(ApplicationPingTimeout); + + bool ok; + try + { + ok = await SendApplicationPingAsync(pingCts.Token); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + catch (OperationCanceledException) + { + ok = false; + } + catch (Exception ex) + { + _logger.Warn($"{ClientRole} application ping threw: {ex.Message}"); + ok = false; + } + + if (ct.IsCancellationRequested) return; + + if (!ok) + { + _logger.Warn($"{ClientRole} application ping failed; aborting socket to force reconnect"); + // Only abort if the field still references OUR socket. If a reconnect has + // already replaced it, the new socket is healthy and we must not touch it. + if (ReferenceEquals(Volatile.Read(ref _webSocket), ws)) + { + try { ws.Abort(); } catch { } + } + return; + } + } } - catch (OperationCanceledException) + catch (ObjectDisposedException) { } + } + + /// Send a text message over the WebSocket, observing only the client lifetime. + protected Task SendRawAsync(string message) => SendRawAsync(message, CancellationToken.None); + + /// + /// Send a text message over the WebSocket. Thread-safe. is + /// linked with the client lifetime token; cancellation propagates to BOTH the + /// wait and the underlying . + /// Callers that need per-send cancellation must use this overload — wrapping the + /// parameterless version in .WaitAsync(ct) only observes the outer task and + /// leaves the underlying send orphaned (still holding the lock, still blocking on a + /// half-dead socket); the orphan can later land on a successor socket after reconnect. + /// + protected async Task SendRawAsync(string message, CancellationToken ct) + { + CancellationTokenSource? linked = null; + CancellationToken token; + if (ct.CanBeCanceled) { - return; + linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, ct); + token = linked.Token; } - catch (ObjectDisposedException) + else { - return; + token = _cts.Token; } try { - // Serialize sends; reconnect/dispose can still close the captured socket, - // so the send below keeps the existing state-change guards. - var ws = _webSocket; - if (ws?.State != WebSocketState.Open) return; + try + { + await _sendLock.WaitAsync(token); + } + catch (OperationCanceledException) + { + // ct timed out waiting for lock OR shutdown raced. Propagate so heartbeat + // path can treat this as a failed ping (returns false, triggers Abort) — + // earlier code swallowed this and looked like a success. + if (ct.IsCancellationRequested) throw; + return; + } + catch (ObjectDisposedException) + { + return; + } try { - // Rent a pooled buffer to avoid per-send heap allocations on the hot send path. - var byteCount = Encoding.UTF8.GetByteCount(message); - var buffer = ArrayPool.Shared.Rent(byteCount); + // Serialize sends; reconnect/dispose can still close the captured socket, + // so the send below keeps the existing state-change guards. + var ws = _webSocket; + if (ws is null) return; + try { - var written = Encoding.UTF8.GetBytes(message, buffer); - await ws.SendAsync(buffer.AsMemory(0, written), - WebSocketMessageType.Text, true, _cts.Token); + // Read State inside the ODE-catching try: WebSocket.State is virtual and + // not contractually safe against a concurrent Dispose; reading it outside + // this catch would propagate ObjectDisposedException to the caller. + if (ws.State != WebSocketState.Open) return; + + // Rent a pooled buffer to avoid per-send heap allocations on the hot send path. + var byteCount = Encoding.UTF8.GetByteCount(message); + var buffer = ArrayPool.Shared.Rent(byteCount); + try + { + var written = Encoding.UTF8.GetBytes(message, buffer); + await ws.SendAsync(buffer.AsMemory(0, written), + WebSocketMessageType.Text, true, token); + } + finally + { + ArrayPool.Shared.Return(buffer); + } } - finally + catch (OperationCanceledException) when (ct.IsCancellationRequested) { - ArrayPool.Shared.Return(buffer); + // Per-send timeout fired. Caller's ct takes precedence over shutdown so + // the heartbeat sees the cancellation rather than a phantom "send ok". + // If both are canceled simultaneously, the caller still gets a clean OCE. + throw; + } + catch (OperationCanceledException) when (_cts.Token.IsCancellationRequested) + { + // Shutdown/reconnect canceled an in-flight send (caller's ct was not the cause). + } + catch (ObjectDisposedException) + { + // WebSocket was disposed between state check and send. + } + catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.InvalidState) + { + _logger.Warn($"WebSocket send failed (state changed): {ex.Message}"); } } - catch (OperationCanceledException) when (_cts.Token.IsCancellationRequested) - { - // Shutdown/reconnect canceled an in-flight send. - } - catch (ObjectDisposedException) - { - // WebSocket was disposed between state check and send. - } - catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.InvalidState) + finally { - _logger.Warn($"WebSocket send failed (state changed): {ex.Message}"); + _sendLock.Release(); } } finally { - _sendLock.Release(); + linked?.Dispose(); } } @@ -355,10 +652,19 @@ await ws.SendAsync(buffer.AsMemory(0, written), protected async Task CloseWebSocketAsync() { var ws = _webSocket; - if (ws?.State == WebSocketState.Open) + if (ws is null) return; + try { - await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Disconnecting", System.Threading.CancellationToken.None); + if (ws.State != WebSocketState.Open) return; + // Pass _cts.Token so a Dispose-driven cancel surfaces as OperationCanceledException + // instead of letting CloseAsync block indefinitely on a half-open connection. + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Disconnecting", _cts.Token); } + // The socket can be disposed or torn down concurrently — Close on a graceful shutdown + // path must not throw, otherwise subclass shutdown sequences see phantom errors. + catch (ObjectDisposedException) { } + catch (OperationCanceledException) { } + catch (WebSocketException) { } } /// Fire the StatusChanged event. Use this instead of directly invoking the event. @@ -367,18 +673,39 @@ protected void RaiseStatusChanged(ConnectionStatus status) public void Dispose() { - if (_disposed) return; - _disposed = true; + // Atomic claim — only the first caller proceeds. A plain "if (_disposed) return" + // would let two concurrent disposers both pass the guard before either set the + // flag, and OnDisposing would then run twice. Interlocked.Exchange returning the + // previous value is the canonical single-winner pattern. It also doubles as the + // intent-to-dispose signal read by the ConnectAsync / reconnect gates below + // (via the _disposing property), before _disposed is flipped after OnDisposing. + // _disposed is intentionally flipped only after OnDisposing returns so subclass + // graceful-shutdown logic (sending a "bye" frame, IsDisposed-guarded helpers) + // still observes a live state. OnDisposing is wrapped in try/catch so a throwing + // subclass cannot skip the cleanup below — otherwise _cts and _webSocket would + // leak until process exit. + if (Interlocked.Exchange(ref _disposingFlag, 1) != 0) return; + + try { OnDisposing(); } catch { } - OnDisposing(); + _disposed = true; try { _cts.Cancel(); } catch { } - - var ws = _webSocket; - _webSocket = null; + // Snapshot _connectionCts once. Reading the field twice (Cancel then Dispose) + // races with a concurrent ConnectAsync that Exchange-installs a new CTS between the + // two reads — we'd Cancel the old CTS but Dispose the new one, leaking the old and + // tripping ObjectDisposedException for consumers of the new. + var connectionCts = Interlocked.Exchange(ref _connectionCts, null); + try { connectionCts?.Cancel(); } catch { } + try { connectionCts?.Dispose(); } catch { } + + var ws = Interlocked.Exchange(ref _webSocket, null); try { ws?.Dispose(); } catch { } - // Don't dispose _cts immediately — listen loop or reconnect may still reference it. - // It will be GC'd after all pending tasks complete. + // Intentionally do NOT Dispose(_cts). The CancellationToken property and SendRawAsync's + // CreateLinkedTokenSource(_cts.Token, ct) setup read _cts.Token outside try/catch; if + // _cts were disposed, those reads throw ObjectDisposedException to callers instead of + // observing cancellation. _cts.Cancel() above is sufficient to unblock all consumers, + // and the CancellationTokenSource finalizer releases native resources at GC. } } diff --git a/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs b/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs index cc24793c..f086266e 100644 --- a/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs +++ b/tests/OpenClaw.Shared.Tests/WebSocketClientBaseTests.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -17,7 +18,8 @@ public class TestWebSocketClient : WebSocketClientBase public int OnDisconnectedCallCount { get; private set; } public int OnErrorCallCount { get; private set; } public Exception? LastError { get; private set; } - public int OnDisposingCallCount { get; private set; } + private int _onDisposingCallCount; + public int OnDisposingCallCount => Volatile.Read(ref _onDisposingCallCount); public bool AutoReconnectEnabled { get; set; } = true; protected override int ReceiveBufferSize => 8192; @@ -51,7 +53,7 @@ protected override void OnError(Exception ex) protected override void OnDisposing() { - OnDisposingCallCount++; + Interlocked.Increment(ref _onDisposingCallCount); } protected override bool ShouldAutoReconnect() => AutoReconnectEnabled; @@ -155,6 +157,28 @@ public void Dispose_IsIdempotent() Assert.Equal(1, client.OnDisposingCallCount); // hook called only once } + [Fact] + public void Dispose_ConcurrentCallers_RunOnDisposingExactlyOnce() + { + // Two threads racing into Dispose() must not both pass the entry guard and + // double-invoke OnDisposing. The atomic Interlocked.Exchange guard is what + // enforces this; a plain "if (_disposed) return" would allow both threads + // through before either set the flag. + var client = new TestWebSocketClient("ws://localhost:18789", "token", _logger); + using var start = new System.Threading.ManualResetEventSlim(false); + var threads = new System.Threading.Thread[8]; + for (int i = 0; i < threads.Length; i++) + { + threads[i] = new System.Threading.Thread(() => { start.Wait(); client.Dispose(); }); + threads[i].Start(); + } + start.Set(); + foreach (var t in threads) t.Join(); + + Assert.True(client.TestIsDisposed); + Assert.Equal(1, client.OnDisposingCallCount); + } + [Fact] public void Dispose_CallsOnDisposingHook() { @@ -251,7 +275,7 @@ await WaitForConditionAsync( Assert.Contains(ConnectionStatus.Error, statuses); Assert.True(statuses.Count(s => s == ConnectionStatus.Connecting) >= 2); - Assert.Contains(_logger.Logs, line => line.Contains("reconnecting in 1", StringComparison.OrdinalIgnoreCase) && line.Contains("ms (attempt 1)", StringComparison.OrdinalIgnoreCase)); + Assert.Contains(_logger.Logs, line => line.Contains("reconnecting in ", StringComparison.OrdinalIgnoreCase) && line.Contains("ms (attempt 1)", StringComparison.OrdinalIgnoreCase)); client.Dispose(); }