Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 156 additions & 1 deletion src/OpenClaw.Shared/OpenClawGatewayClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public class OpenClawGatewayClient : WebSocketClientBase, IOperatorGatewayClient
private string? _mainSessionKey;
private bool _hasHandshakeSnapshot;

// Application-level heartbeat — see WebSocketClientBase.ApplicationPingInterval.
private readonly Dictionary<string, TaskCompletionSource<bool>> _pendingHeartbeats = new();
private readonly object _heartbeatLock = new();
private long _heartbeatSeq;
// Bounds the pre-handshake heartbeat skip — if the gateway accepts the WS upgrade but
// never sends hello-ok (stuck auth handler, partial deploy), the WS protocol keep-alive
// won't catch it as long as the server still pongs at the protocol layer. After N
// consecutive pre-handshake intervals we abort the socket and force reconnect.
private int _consecutivePreHandshakeSkips;

/// <summary>
/// The gateway's canonical main session key as published in the hello-ok
/// snapshot (preferring the canonical <c>sessionDefaults.mainSessionKey</c>
Expand Down Expand Up @@ -154,6 +164,13 @@ protected override Task ProcessMessageAsync(string json)
protected override Task OnConnectedAsync()
{
ResetUnsupportedMethodFlags();
// Reset the pre-handshake skip counter at the start of each new connection,
// not just on OnDisconnected. OnDisconnected is only fired from two of the five
// listen-loop exit paths (Close frame, ConnectionClosedPrematurely); the others
// (OperationCanceledException, ObjectDisposedException, generic Exception — which
// includes KeepAliveTimeout aborts surfaced as plain WebSocketException) silently
// skip it. Resetting on connect makes the 3-strike budget robust to all exit paths.
Interlocked.Exchange(ref _consecutivePreHandshakeSkips, 0);
return Task.CompletedTask;
}

Expand All @@ -166,6 +183,10 @@ protected override bool ShouldAutoReconnect()
protected override void OnDisconnected()
{
ClearPendingRequests();
ClearPendingHeartbeats();
// Reset the pre-handshake skip counter so a fresh reconnect starts a fresh budget;
// otherwise a previously-stalled connection could prematurely fail the next one.
Interlocked.Exchange(ref _consecutivePreHandshakeSkips, 0);
// Invalidate the handshake snapshot — the next hello-ok must
// re-establish the canonical session key, scopes, etc. Without this,
// a reconnect-after-server-restart could leave the tray sending to a
Expand All @@ -178,6 +199,99 @@ protected override void OnDisconnected()
protected override void OnDisposing()
{
ClearPendingRequests();
ClearPendingHeartbeats();
}

// Application ping cadence — 45s sits between the WS protocol keep-alive (30s) and its
// 60s timeout so the two layers don't fire in lockstep. The point is to verify the
// gateway is still processing application-level requests, not just answering pings.
protected override TimeSpan? ApplicationPingInterval => TimeSpan.FromSeconds(45);

protected override async Task<bool> SendApplicationPingAsync(CancellationToken ct)
{
// Skip pings until the application-level handshake (hello-ok) has completed —
// sending a `req` before then would either be rejected or fail silently. BUT,
// if we sit pre-handshake for too long (gateway WS-pongs but never sends hello-ok),
// force a reconnect so we don't hang forever in a half-handshaken state.
if (!Volatile.Read(ref _hasHandshakeSnapshot))
{
var skipped = Interlocked.Increment(ref _consecutivePreHandshakeSkips);
const int maxPreHandshakeSkips = 3; // ~135s at 45s cadence
if (skipped >= maxPreHandshakeSkips)
{
_logger.Warn($"gateway hello-ok not received after {skipped} heartbeat intervals; forcing reconnect");
return false;
}
return true;
}
Interlocked.Exchange(ref _consecutivePreHandshakeSkips, 0);

var id = $"hb-{Interlocked.Increment(ref _heartbeatSeq)}";
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
lock (_heartbeatLock) { _pendingHeartbeats[id] = tcs; }

try
{
var msg = new { type = "req", id, method = "ping" };
// Pass ct through to the SendRawAsync(string, CancellationToken) overload.
// Wrapping the parameterless SendRawAsync in .WaitAsync(ct) only short-circuits
// the OUTER task; the underlying SendAsync keeps running, still holding
// _sendLock, still blocking on a half-dead socket — and may later land on a
// successor socket after reconnect. The overload links ct with the client
// lifetime token and propagates it into both _sendLock.WaitAsync and
// ws.SendAsync so cancellation actually unwinds the send.
await SendRawAsync(JsonSerializer.Serialize(msg), ct);

using var reg = ct.Register(() => tcs.TrySetResult(false));
return await tcs.Task;
}
catch (Exception ex)
{
_logger.Debug($"gateway heartbeat send failed: {ex.Message}");
return false;
}
finally
{
lock (_heartbeatLock) { _pendingHeartbeats.Remove(id); }
}
}

private bool TryCompleteHeartbeat(string? requestId)
{
if (requestId == null) return false;
TaskCompletionSource<bool>? tcs;
lock (_heartbeatLock)
{
if (!_pendingHeartbeats.TryGetValue(requestId, out tcs)) return false;
}
tcs.TrySetResult(true);
return true;
}

private void ClearPendingHeartbeats()
{
TaskCompletionSource<bool>[] pending;
lock (_heartbeatLock)
{
pending = _pendingHeartbeats.Values.ToArray();
_pendingHeartbeats.Clear();
}
foreach (var tcs in pending) tcs.TrySetResult(false);
}

private async Task SendPongAsync(string? requestId)
{
if (requestId == null) return;
var msg = new { type = "res", id = requestId, ok = true, payload = new { pong = true } };
try { await SendRawAsync(JsonSerializer.Serialize(msg)); }
catch (Exception ex) { _logger.Debug($"gateway pong send failed: {ex.Message}"); }
}

private async Task SendErrorResAsync(string requestId, string error)
{
var msg = new { type = "res", id = requestId, ok = false, error };
try { await SendRawAsync(JsonSerializer.Serialize(msg)); }
catch (Exception ex) { _logger.Debug($"gateway error-res send failed: {ex.Message}"); }
}

// Events
Expand Down Expand Up @@ -1441,6 +1555,9 @@ private void ProcessMessage(string json)
case "res":
HandleResponse(root);
break;
case "req":
HandleIncomingRequest(root);
break;
case "event":
HandleEvent(root, json.Length);
break;
Expand All @@ -1456,13 +1573,51 @@ private void ProcessMessage(string json)
}
}

private void HandleIncomingRequest(JsonElement root)
{
// JsonElement.GetString() throws InvalidOperationException for non-string kinds (Number,
// Object, etc.). Guard with ValueKind so a malformed gateway req only logs and doesn't
// bubble into the read loop's catch-all where it would spam errors.
string? method = (root.TryGetProperty("method", out var mProp) && mProp.ValueKind == JsonValueKind.String)
? mProp.GetString() : null;
string? id = (root.TryGetProperty("id", out var idProp) && idProp.ValueKind == JsonValueKind.String)
? idProp.GetString() : null;

// The gateway may also send application-level pings to verify the operator socket
// is alive. Until we added this handler the gateway would (a) get no response and
// (b) eventually time us out and drop the socket — visible to users as an
// unexplained "reconnecting" toast at the gateway's heartbeat cadence.
switch (method)
{
case "ping":
_ = SendPongAsync(id);
break;
default:
_logger.Debug($"gateway sent unsupported req method: {method ?? "<null>"}");
// Reply with an error res so the gateway doesn't sit waiting on its own request
// timeout if it adds new server-initiated methods we don't yet support. Only
// reply when we have a valid id — otherwise there's no correlator to set.
if (id != null) _ = SendErrorResAsync(id, "unsupported_method");
break;
}
}

private void HandleResponse(JsonElement root)
{
string? requestMethod = null;
string? requestId = null;
if (root.TryGetProperty("id", out var idProp))
{
requestId = idProp.GetString();
}

// Heartbeats are tracked separately from normal request bookkeeping; complete and
// return before any other processing — a heartbeat response carries no useful
// payload for the rest of the pipeline.
if (TryCompleteHeartbeat(requestId)) return;

string? requestMethod = null;
if (requestId != null)
{
requestMethod = TakePendingRequestMethod(requestId);
}

Expand Down
Loading
Loading