-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Hitless upgrades #3142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Hitless upgrades #3142
Conversation
a6aa6bd to
9e052d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ The following Jit checks failed to run:
- secret-detection-trufflehog
- static-code-analysis-semgrep-pro
#jit_bypass_commit in this PR to bypass, Jit Admin privileges required.
More info in the Jit platform.
9e49969 to
9e052d3
Compare
❌ Security scan failedSecurity scan failed: Branch hitless-upgrades does not exist in the remote repository 💡 Need to bypass this check? Comment |
1 similar comment
❌ Security scan failedSecurity scan failed: Branch hitless-upgrades does not exist in the remote repository 💡 Need to bypass this check? Comment |
84962df to
34835d1
Compare
| try { | ||
| this.#onSMigrated(push); | ||
| } catch (e) { | ||
| throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we handle this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted
| if ('pubSub' in sourceNode) { | ||
| sourceNode.pubSub?.client._unpause(); | ||
| } | ||
| throw err; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this result in unhandled promise rejection, should we just emit it instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted
| // Prepend to the last destination (or could distribute - for now keeping simple) | ||
| const lastDestNode = destinationNodes[destinationNodes.length - 1]; | ||
| lastDestNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct behavior, what if we have more than 1 destination?
| inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete(inflightOptions)); | ||
| if ('pubSub' in sourceNode) { | ||
| inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete(inflightOptions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add guards to check that sourceNode.client & sourceNode.pubSub exist instead of always assuming they do with sourceNode.client! and sourceNode.pubSub!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted
| import { RedisClusterOptions, RedisClusterClientOptions } from './index'; | ||
| import { RedisClusterClientOptions } from './index'; | ||
| import RedisClusterSlots from './cluster-slots'; | ||
| import TestUtils, { GLOBAL } from '../test-utils' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this is not used any where?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted
| // describe("Notifications", () => { | ||
|
|
||
| // assert(slotShuffleTriggers.length > 0, "slotShuffleTriggers should have at least one trigger"); | ||
|
|
||
| // for (const trigger of slotShuffleTriggers) { | ||
| // describe(`[${trigger.name}]`, () => { | ||
| // const dbConfig = trigger.requirements[0].dbconfig; | ||
| // // dbConfig.name = 'foo'; | ||
| // const testOptions = { | ||
| // clusterConfiguration: { | ||
| // defaults: { | ||
| // maintNotifications: "enabled", | ||
| // }, | ||
| // RESP: 3 as const, | ||
| // }, | ||
| // dbConfig, | ||
| // } as const; | ||
|
|
||
| // testUtils.testWithProxiedCluster( | ||
| // `should NOT receive notifications when maintNotifications is disabled`, | ||
| // async (cluster, faultInjectorClient) => { | ||
| // assert.equal( | ||
| // diagnosticEvents.length, | ||
| // 0, | ||
| // "should not have received any notifications yet" | ||
| // ); | ||
|
|
||
| // await cluster.set('key', 'value'); | ||
| // const key = await cluster.get('key'); | ||
| // assert.equal(key, 'value'); | ||
| // console.log(key); | ||
|
|
||
| // // Trigger migration | ||
| // await faultInjectorClient.triggerAction({ | ||
| // type: "slot_migrate", | ||
| // parameters: { | ||
| // effect: "slot-shuffle", | ||
| // cluster_index: 0, | ||
| // trigger: trigger.name, | ||
| // }, | ||
| // }); | ||
|
|
||
| // // Verify NO maintenance notifications received | ||
| // assert.strictEqual( | ||
| // diagnosticEvents.length, | ||
| // 0, | ||
| // "should NOT receive any SMIGRATING/SMIGRATED notifications when disabled" | ||
| // ); | ||
| // }, | ||
| // testOptions | ||
| // ); | ||
| // }); | ||
| // } | ||
| // }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we uncomment or delete this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted
|
|
||
| beforeEach(function() { | ||
| // Clear FIRST before subscribing to avoid capturing stale events | ||
| const staleEventCount = diagnosticEvents.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted
smigrating notification should effect in increased command and socket timeout for the given connection
Buffer.equals() was failing when reply[0] was a string instead of a Buffer, causing hangs on push notifications. Now converts strings to Buffers before comparison in PubSub and commands-queue handlers. Changes: - PubSub.isStatusReply: convert reply[0] to Buffer if string - PubSub.isShardedUnsubscribe: convert reply[0] to Buffer if string - PubSub.handleMessageReply: convert reply[0] to Buffer if string - commands-queue PONG handler: convert reply[0] to Buffer if string
Test Infrastructure: - Migrate maintenance tests from maintenance.spec.ts to dedicated e2e test files - Add maintenance.e2e.ts for direct RE cluster testing with testWithRECluster helper - Add maintenance.proxy.e2e.ts for proxy-based cluster testing - Dynamically generate tests based on available action triggers from fault injector API Fault Injector Client: - Add listActionTriggers() to query available triggers by action and effect - Add selectDbConfig() and createAndSelectDatabase() for database context management - Auto-resolve bdb_id from selected database when not explicitly provided - Support trigger-specific database configurations from requirements Test Utils: - Export REClusterTestOptions interface - Refactor testWithRECluster to reset cluster state before each test - Add cluster reset and cleanup between tests for isolation RESP Decoder & Socket: - Add wire-level debug logging for troubleshooting Cluster: - Add debug logging for command execution and MOVED error handling - Add debug logging for slot discovery and client routing Enterprise Maintenance Manager: - Add debug logging for push message handling
…e tests - Extract parseSMigratedPush into static method with proper type definitions - Add Address, Destination, and SMigratedEntry interfaces for better type safety - Support multiple source entries in SMIGRATED events (previously only handled single source) - Add comprehensive test suite covering single slots, ranges, multiple sources/destinations - Update cluster-slots to iterate over all entries in SMIGRATED event - Remove debug console.log statements from production code
…loop - Track all moving slots and destination nodes during destinations loop - Wait for inflight commands AFTER all destinations are processed - Extract commands and handle source cleanup once per entry, not per destination - Unpause all destination nodes at the end of entry processing This fixes an issue where source nodes were being unpaused prematurely when multiple destinations existed, potentially allowing new commands to queue before all slot migrations were complete.
- Wrap entry processing in try-catch to handle async operation failures - Unpause source node in catch block to prevent deadlock on error - Move destination unpause to finally block to ensure cleanup always runs - Re-throw error after cleanup to propagate failures - Remove debug console.log statements
dbs are deleted as part of the reset_cluster action
- Handle pubSubNode replacement BEFORE destroying source connections to ensure subscriptions are resubscribed on a new node while we can still read listeners from the old client - Create new pubSubClient before destroying old one to prevent window where pubSubNode is undefined - Use destroy() instead of close() for source node connections since close() can hang when the server is unresponsive during removal
The publish loops in PubSub tests were using a fire-and-forget pattern, creating promises without awaiting them. During slot migration, this caused unbounded accumulation of pending promises which blocked the Node.js event loop, preventing fault injector polling from continuing. Changes: - Fix all 8 publish loops to use await Promise.all(batchPromises) - Add 30-second timeout to fault injector fetch requests - Fix misleading assertion message in enterprise-maintenance-manager
Two bugs fixed: 1. extractCommandsForSlots infinite loop: When iterating through the linked list, if a command's slot was NOT in the moving slots set, the code never advanced 'current' to 'current.next', causing an infinite loop. Added the missing else branch. 2. Commands stuck in waitingForReply: During slot migration, commands sent to the source node could get stuck waiting for replies that never come. Added timeout and flushOnTimeout options to waitForInflightCommandsToComplete() - when timeout fires with flushOnTimeout=true, pending commands are rejected with TimeoutError instead of blocking forever.
1852290 to
f24e98a
Compare
Description
Checklist
npm testpass with this change (including linting)?