From acf3dde084381cc155f454141b9664599e9f5bd1 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 24 Feb 2026 13:26:21 +0200 Subject: [PATCH] test(scho oss): add smigrating checks for new connections --- .../smart-client-handoffs-oss.e2e.ts | 21 +++++- .../tests/test-scenario/test-scenario.util.ts | 68 ++++++++++++++++++- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/packages/client/lib/tests/test-scenario/smart-client-handoffs-oss.e2e.ts b/packages/client/lib/tests/test-scenario/smart-client-handoffs-oss.e2e.ts index c7f8e35863..bfad16b103 100644 --- a/packages/client/lib/tests/test-scenario/smart-client-handoffs-oss.e2e.ts +++ b/packages/client/lib/tests/test-scenario/smart-client-handoffs-oss.e2e.ts @@ -31,9 +31,9 @@ import diagnostics_channel from "node:diagnostics_channel"; import testUtils from "../../test-utils"; import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; -import { FaultInjectorClient, ActionTrigger, ActionType, ActionRequest } from "@redis/test-utils/lib/fault-injector"; +import { FaultInjectorClient, ActionTrigger, ActionRequest } from "@redis/test-utils/lib/fault-injector"; import { REClusterTestOptions } from "@redis/test-utils"; -import { blockCommand, filterTriggersByArgs } from "./test-scenario.util"; +import { blockCommand, newConnectionReceivedSmigraging, filterTriggersByArgs } from "./test-scenario.util"; type TestOptions = REClusterTestOptions<{}, {}, {}, 3, {}> @@ -167,9 +167,12 @@ const KEYS = [ "should not have received any notifications yet" ); + const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster); await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS); + assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection'); + // Verify notifications were received const sMigratingEventCount = diagnosticEvents.filter( (event) => event.type === "SMIGRATING" @@ -460,9 +463,13 @@ const KEYS = [ "should not have received any notifications yet" ); + const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster); + // Trigger migration await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS); + assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection'); + // Verify notifications were received const sMigratingEventCount = diagnosticEvents.filter( (event) => event.type === "SMIGRATING" @@ -752,9 +759,15 @@ const KEYS = [ "should not have received any notifications yet" ); + + const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster); + // Trigger migration await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS); + assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection'); + + // Verify notifications were received const sMigratingEventCount = diagnosticEvents.filter( (event) => event.type === "SMIGRATING" @@ -1044,9 +1057,13 @@ const KEYS = [ "should not have received any notifications yet" ); + const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster); + // Trigger migration await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS); + assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection'); + // Verify notifications were received const sMigratingEventCount = diagnosticEvents.filter( (event) => event.type === "SMIGRATING" diff --git a/packages/client/lib/tests/test-scenario/test-scenario.util.ts b/packages/client/lib/tests/test-scenario/test-scenario.util.ts index 1465aae0f6..4df6b47d4d 100644 --- a/packages/client/lib/tests/test-scenario/test-scenario.util.ts +++ b/packages/client/lib/tests/test-scenario/test-scenario.util.ts @@ -1,7 +1,11 @@ import { readFileSync } from "fs"; -import { createClient, RedisClientOptions } from "../../.."; +import { createClient, RedisClientOptions } from "../../.." import { stub } from "sinon"; import { ActionTrigger } from "@redis/test-utils/lib/fault-injector"; +import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; +import diagnostics_channel from "node:diagnostics_channel"; +import { setTimeout } from "timers/promises"; +import { RedisClusterType } from "../../cluster"; type DatabaseEndpoint = { addr: string[]; @@ -233,3 +237,65 @@ export function filterTriggersByArgs( slotShuffleTriggers: filterTriggers(result.slotShuffleTriggers), }; } + + +/** + * Waits for SMIGRATING on all existing master connections, then opens one new + * standalone connection and checks if it also receives SMIGRATING. + * + * Assumptions/limitations: + * - Uses a fixed timeout; resolves false on timeout regardless of progress. + * - Assumes one connection per master (no replicas, no pubsub/sharded pubsub). + * - Assumes the new connection observes SMIGRATING within a small wait window. + */ +export function newConnectionReceivedSmigraging(cluster: RedisClusterType<{}, {}, {}, 3, {}>, timeout = 500) { + const mastersCount = cluster.masters.length; + let received = 0; + let settled = false; + let resolve: (value: boolean) => void; + + const finish = (value: boolean) => { + if (settled) return; + settled = true; + diagnostics_channel.unsubscribe("redis.maintenance", onEvent); + resolve(value); + }; + + const onEvent = async (message: unknown) => { + const event = message as DiagnosticsEvent; + if (event.type !== "SMIGRATING") return; + + received++; + if (received !== mastersCount) return; + + const [host, port] = cluster.masters[0].address.split(":"); + const username = cluster.masters[0].client?.options.username; + const password = cluster.masters[0].client?.options.password; + const client = createClient({ + socket: { host, port: Number(port) }, + username, + password, + RESP: 3, + }); + + try { + await client.connect(); + } catch { + finish(false); + return; + } + + await setTimeout(50); + await client.close().catch(() => {}); + finish(received === mastersCount + 1); + }; + + const promise = new Promise((res) => { + resolve = res; + }); + + diagnostics_channel.subscribe("redis.maintenance", onEvent); + setTimeout(timeout, () => finish(false)); + + return promise; +}