Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import diagnostics_channel from "node:diagnostics_channel";

import testUtils from "../../test-utils";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import { FaultInjectorClient, ActionTrigger, ActionType, ActionRequest } from "@redis/test-utils/lib/fault-injector";
import { FaultInjectorClient, ActionTrigger, ActionRequest } from "@redis/test-utils/lib/fault-injector";
import { REClusterTestOptions } from "@redis/test-utils";
import { blockCommand, filterTriggersByArgs } from "./test-scenario.util";
import { blockCommand, newConnectionReceivedSmigraging, filterTriggersByArgs } from "./test-scenario.util";

type TestOptions = REClusterTestOptions<{}, {}, {}, 3, {}>

Expand Down Expand Up @@ -167,9 +167,12 @@ const KEYS = [
"should not have received any notifications yet"
);

const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);

await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);

assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');

// Verify notifications were received
const sMigratingEventCount = diagnosticEvents.filter(
(event) => event.type === "SMIGRATING"
Expand Down Expand Up @@ -460,9 +463,13 @@ const KEYS = [
"should not have received any notifications yet"
);

const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);

// Trigger migration
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);

assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');

// Verify notifications were received
const sMigratingEventCount = diagnosticEvents.filter(
(event) => event.type === "SMIGRATING"
Expand Down Expand Up @@ -752,9 +759,15 @@ const KEYS = [
"should not have received any notifications yet"
);


const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);

// Trigger migration
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);

assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');


// Verify notifications were received
const sMigratingEventCount = diagnosticEvents.filter(
(event) => event.type === "SMIGRATING"
Expand Down Expand Up @@ -1044,9 +1057,13 @@ const KEYS = [
"should not have received any notifications yet"
);

const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);

// Trigger migration
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);

assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');

// Verify notifications were received
const sMigratingEventCount = diagnosticEvents.filter(
(event) => event.type === "SMIGRATING"
Expand Down
68 changes: 67 additions & 1 deletion packages/client/lib/tests/test-scenario/test-scenario.util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { readFileSync } from "fs";
import { createClient, RedisClientOptions } from "../../..";
import { createClient, RedisClientOptions } from "../../.."
import { stub } from "sinon";
import { ActionTrigger } from "@redis/test-utils/lib/fault-injector";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import diagnostics_channel from "node:diagnostics_channel";
import { setTimeout } from "timers/promises";
import { RedisClusterType } from "../../cluster";

type DatabaseEndpoint = {
addr: string[];
Expand Down Expand Up @@ -233,3 +237,65 @@ export function filterTriggersByArgs(
slotShuffleTriggers: filterTriggers(result.slotShuffleTriggers),
};
}


/**
* Waits for SMIGRATING on all existing master connections, then opens one new
* standalone connection and checks if it also receives SMIGRATING.
*
* Assumptions/limitations:
* - Uses a fixed timeout; resolves false on timeout regardless of progress.
* - Assumes one connection per master (no replicas, no pubsub/sharded pubsub).
* - Assumes the new connection observes SMIGRATING within a small wait window.
*/
export function newConnectionReceivedSmigraging(cluster: RedisClusterType<{}, {}, {}, 3, {}>, timeout = 500) {
const mastersCount = cluster.masters.length;
let received = 0;
let settled = false;
let resolve: (value: boolean) => void;

const finish = (value: boolean) => {
if (settled) return;
settled = true;
diagnostics_channel.unsubscribe("redis.maintenance", onEvent);
resolve(value);
};

const onEvent = async (message: unknown) => {
const event = message as DiagnosticsEvent;
if (event.type !== "SMIGRATING") return;

received++;
Copy link
Contributor

@PavelPashov PavelPashov Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a counter, one client could end up receiving 2 events while another receives none.

if (received !== mastersCount) return;

const [host, port] = cluster.masters[0].address.split(":");
const username = cluster.masters[0].client?.options.username;
const password = cluster.masters[0].client?.options.password;
const client = createClient({
socket: { host, port: Number(port) },
username,
password,
RESP: 3,
});

try {
await client.connect();
} catch {
finish(false);
return;
}

await setTimeout(50);
await client.close().catch(() => {});
finish(received === mastersCount + 1);
};

const promise = new Promise<boolean>((res) => {
resolve = res;
});

diagnostics_channel.subscribe("redis.maintenance", onEvent);
setTimeout(timeout, () => finish(false));
Copy link
Contributor

@PavelPashov PavelPashov Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this setTimeout be awaited?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is basically an abort signal that we run in the background.


return promise;
}
Loading