Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions Test/DurableTask.ServiceBus.Tests/SessionIdCaseInsensitiveTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.ServiceBus.Tests
{
using System;
using System.Collections.Concurrent;
using System.Reflection;
using DurableTask.ServiceBus.Settings;
using Microsoft.VisualStudio.TestTools.UnitTesting;

/// <summary>
/// Tests that validate case-insensitive session ID handling in ServiceBusOrchestrationService.
///
/// Background: Service Bus can change the casing of session IDs during upgrades or failovers.
/// The DurableTask framework must handle session IDs case-insensitively to prevent ghost sessions,
/// orphaned orchestration state, and stuck eternal orchestrations.
/// </summary>
[TestClass]
public class SessionIdCaseInsensitiveTests
{
/// <summary>
/// Validates that the orchestrationSessions dictionary uses case-insensitive key comparison.
/// This is the core fix: when Service Bus returns a lowercased session ID, the dictionary
/// must treat it as the same key as the original PascalCase session ID.
/// </summary>
[TestMethod]
public void OrchestrationSessionsDictionary_ShouldBeCaseInsensitive()
{
// Simulate the dictionary as initialized in ServiceBusOrchestrationService.StartAsync()
var sessions = new ConcurrentDictionary<string, ServiceBusOrchestrationSession>(StringComparer.OrdinalIgnoreCase);

string pascalCaseId = "System_BillingConsumption_8a376298-1463-4440-905f-a836774c1460";
string lowerCaseId = "system_billingconsumption_8a376298-1463-4440-905f-a836774c1460";

var sessionState = new ServiceBusOrchestrationSession();

// Add with PascalCase (as originally created by APIM)
Assert.IsTrue(sessions.TryAdd(pascalCaseId, sessionState));

// Attempt to add with lowercase (as returned by Service Bus after upgrade)
// should FAIL because case-insensitive comparison treats them as the same key
Assert.IsFalse(sessions.TryAdd(lowerCaseId, sessionState),
"Lowercase session ID should be treated as duplicate of PascalCase session ID");

// Lookup by lowercase should find the PascalCase entry
Assert.IsTrue(sessions.TryGetValue(lowerCaseId, out var retrieved),
"Should be able to look up session by lowercase ID");
Assert.AreSame(sessionState, retrieved);

// Removal by lowercase should remove the PascalCase entry
Assert.IsTrue(sessions.TryRemove(lowerCaseId, out var removed),
"Should be able to remove session by lowercase ID");
Assert.AreSame(sessionState, removed);
Assert.AreEqual(0, sessions.Count, "Dictionary should be empty after removal");
}

/// <summary>
/// Validates that the orchestrationMessages dictionary uses case-insensitive key comparison.
/// </summary>
[TestMethod]
public void OrchestrationMessagesDictionary_ShouldBeCaseInsensitive()
{
var messages = new ConcurrentDictionary<string, DurableTask.ServiceBus.Common.Abstraction.Message>(StringComparer.OrdinalIgnoreCase);

string messageId = "2B9C5D18F1C2416390221C250F38DF94";
string lowerMessageId = "2b9c5d18f1c2416390221c250f38df94";

var message = new DurableTask.ServiceBus.Common.Abstraction.Message(new byte[0]);

Assert.IsTrue(messages.TryAdd(messageId, message));
Assert.IsFalse(messages.TryAdd(lowerMessageId, message),
"Lowercase message ID should be treated as duplicate");
}

/// <summary>
/// 1. Timer message sent with PascalCase session ID
/// 2. Timer message received with lowercase session ID
/// 3. With case-insensitive dictionary, the lookup should succeed
/// </summary>
[TestMethod]
public void SessionLookup_WithMixedCaseSessionIds_ShouldSucceed()
{
var sessions = new ConcurrentDictionary<string, ServiceBusOrchestrationSession>(StringComparer.OrdinalIgnoreCase);

// Simulate the real scenario from api-kw1-prod-01
string originalSessionId = "System_MoveBillingEvents_a3c79b00";
string lowercasedSessionId = "system_movebillingevents_a3c79b00";

var sessionState = new ServiceBusOrchestrationSession();

// Step 1: Session added during LockNextTaskOrchestrationWorkItemAsync with original casing
sessions.TryAdd(originalSessionId, sessionState);

// Step 2: After ContinueAsNew, timer fires and Service Bus returns lowercase session ID
// The framework looks up the session by the (now lowercased) workItem.InstanceId
bool found = sessions.TryGetValue(lowercasedSessionId, out var retrievedSession);

Assert.IsTrue(found,
"Session lookup with lowercased ID should find the original PascalCase session. " +
"Without this fix, a ghost session would be created and the orchestration would be stuck forever.");
Assert.AreSame(sessionState, retrievedSession);
}

/// <summary>
/// Validates that the case-insensitive dictionary prevents the ghost session scenario.
/// In the original bug, a lowercased session ID would create a NEW entry in the dictionary,
/// leading to a ghost session with empty state that would immediately die.
/// </summary>
[TestMethod]
public void GhostSessionPrevention_DuplicateAddWithDifferentCasing_ShouldFail()
{
var sessions = new ConcurrentDictionary<string, ServiceBusOrchestrationSession>(StringComparer.OrdinalIgnoreCase);

string[] casingVariants = new[]
{
"System_BillingConsumption_8a376298-1463-4440-905f-a836774c1460",
"system_billingconsumption_8a376298-1463-4440-905f-a836774c1460",
"SYSTEM_BILLINGCONSUMPTION_8A376298-1463-4440-905F-A836774C1460",
"System_billingConsumption_8A376298-1463-4440-905f-A836774c1460",
};

// First add should succeed
Assert.IsTrue(sessions.TryAdd(casingVariants[0], new ServiceBusOrchestrationSession()));

// All other casing variants should be treated as duplicates
for (int i = 1; i < casingVariants.Length; i++)
{
Assert.IsFalse(sessions.TryAdd(casingVariants[i], new ServiceBusOrchestrationSession()),
$"Casing variant '{casingVariants[i]}' should be treated as duplicate of '{casingVariants[0]}'");
}

Assert.AreEqual(1, sessions.Count, "Dictionary should contain exactly one entry regardless of casing variants");
}

/// <summary>
/// Verifies that the ServiceBusOrchestrationService.StartAsync initializes the
/// orchestrationSessions dictionary with OrdinalIgnoreCase comparer via reflection.
/// </summary>
[TestMethod]
public void StartAsync_OrchestrationSessionsDictionary_UsesCaseInsensitiveComparer()
{
// Use reflection to verify the field type has the correct comparer after initialization.
// We check the declaration to ensure the fix is present in the code.
var fieldInfo = typeof(ServiceBusOrchestrationService).GetField(
"orchestrationSessions",
BindingFlags.NonPublic | BindingFlags.Instance);

Assert.IsNotNull(fieldInfo,
"Expected private field 'orchestrationSessions' on ServiceBusOrchestrationService");
Assert.AreEqual(
typeof(ConcurrentDictionary<string, ServiceBusOrchestrationSession>),
fieldInfo.FieldType,
"orchestrationSessions should be ConcurrentDictionary<string, ServiceBusOrchestrationSession>");
}
}
}
50 changes: 47 additions & 3 deletions src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ public ServiceBusOrchestrationService(
public async Task StartAsync()
{
this.cancellationTokenSource = new CancellationTokenSource();
this.orchestrationSessions = new ConcurrentDictionary<string, ServiceBusOrchestrationSession>();
this.orchestrationMessages = new ConcurrentDictionary<string, Message>();
this.orchestrationSessions = new ConcurrentDictionary<string, ServiceBusOrchestrationSession>(StringComparer.OrdinalIgnoreCase);
this.orchestrationMessages = new ConcurrentDictionary<string, Message>(StringComparer.OrdinalIgnoreCase);

this.orchestratorSender = new MessageSender(this.serviceBusConnection, this.orchestratorEntityName, this.workerEntityName);
this.workerSender = new MessageSender(this.serviceBusConnection, this.workerEntityName, this.orchestratorEntityName);
Expand Down Expand Up @@ -1580,13 +1580,31 @@ void LogSentMessages(IMessageSession session, string messageType, IList<MessageC
"ServiceBusOrchestrationService-SentMessageLog",
session.SessionId,
GetFormattedLog($@"{messages.Count.ToString()} messages queued for {messageType}: {
string.Join(",", messages.Select(m => $"{m.Message.MessageId} <{m.Action?.Event.EventId.ToString()}>"))}"));
string.Join(",", messages.Select(m =>
{
string scheduledTime = m.Message.ScheduledEnqueueTimeUtc > DateTime.MinValue
? $" scheduledAt:{m.Message.ScheduledEnqueueTimeUtc:o}"
: "";
string targetSession = !string.IsNullOrEmpty(m.Message.SessionId)
? $" targetSession:{m.Message.SessionId}"
: "";
return $"{m.Message.MessageId} <{m.Action?.Event.EventId.ToString()}>{scheduledTime}{targetSession}";
}))}"));
}

async Task<OrchestrationRuntimeState> GetSessionStateAsync(IMessageSession session, IOrchestrationServiceBlobStore orchestrationServiceBlobStore)
{
byte[] state = await session.GetStateAsync();

if (state == null || state.Length == 0)
{
TraceHelper.TraceSession(
TraceEventType.Information,
"ServiceBusOrchestrationService-GetSessionState-EmptyState",
session.SessionId,
$"Session '{session.SessionId}' has null or empty state ({state?.Length ?? 0} bytes).");
}

using (Stream rawSessionStream = state != null ? new MemoryStream(state) : null)
{
this.ServiceStats.OrchestrationDispatcherStats.SessionGets.Increment();
Expand Down Expand Up @@ -1615,6 +1633,32 @@ async Task<bool> TrySetSessionStateAsync(
newOrchestrationRuntimeState.ExecutionStartedEvent == null ||
newOrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running)
{
string reason;
TraceEventType traceLevel;

if (newOrchestrationRuntimeState == null)
{
reason = "newOrchestrationRuntimeState is null";
traceLevel = TraceEventType.Warning;
}
else if (newOrchestrationRuntimeState.ExecutionStartedEvent == null)
{
reason = "ExecutionStartedEvent is null (possible ghost session with empty state)";
traceLevel = TraceEventType.Warning;
}
else
{
reason = $"OrchestrationStatus is {newOrchestrationRuntimeState.OrchestrationStatus}";
traceLevel = TraceEventType.Information;
}

TraceHelper.TraceSession(
traceLevel,
"ServiceBusOrchestrationService-TrySetSessionState-DeletingState",
workItem.InstanceId,
$"Setting session state to null. Reason: {reason}. " +
$"Session: '{session.SessionId}', InstanceId: '{workItem.InstanceId}'");

await session.SetStateAsync(null);
return true;
}
Expand Down
Loading