Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
// Explicit topology for Send pattern demo
t.DeclareQueue("process-order");
t.Endpoint("process-order-ep").Queue("process-order").Handler<ProcessOrderCommandHandler>();
// Alternative: bind by message type instead of handler type
// t.Endpoint("process-order-ep").Queue("process-order").Receives<ProcessOrderCommand>();
t.DispatchEndpoint("send-demo").ToQueue("process-order").Send<ProcessOrderCommand>();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ public interface IInMemoryReceiveEndpointDescriptor : IReceiveEndpointDescriptor
/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer;

/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Receives<TMessage>();

/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Receives(Type messageType);

/// <inheritdoc />
new IInMemoryReceiveEndpointDescriptor Kind(ReceiveEndpointKind kind);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ internal InMemoryReceiveEndpointDescriptor(IMessagingConfigurationContext discov
return this;
}

public new IInMemoryReceiveEndpointDescriptor Receives<TMessage>()
{
base.Receives<TMessage>();

return this;
}

public new IInMemoryReceiveEndpointDescriptor Receives(Type messageType)
{
base.Receives(messageType);

return this;
}

public new IInMemoryReceiveEndpointDescriptor Kind(ReceiveEndpointKind kind)
{
base.Kind(kind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ public interface IPostgresReceiveEndpointDescriptor : IReceiveEndpointDescriptor
/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Consumer{TConsumer}"/>
new IPostgresReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer;

/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Receives{TMessage}"/>
new IPostgresReceiveEndpointDescriptor Receives<TMessage>();

/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Receives(Type)"/>
new IPostgresReceiveEndpointDescriptor Receives(Type messageType);

/// <inheritdoc cref="IReceiveEndpointDescriptor{T}.Kind(ReceiveEndpointKind)"/>
new IPostgresReceiveEndpointDescriptor Kind(ReceiveEndpointKind kind);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ internal PostgresReceiveEndpointDescriptor(IMessagingConfigurationContext discov
return this;
}

/// <inheritdoc />
public new IPostgresReceiveEndpointDescriptor Receives<TMessage>()
{
base.Receives<TMessage>();

return this;
}

/// <inheritdoc />
public new IPostgresReceiveEndpointDescriptor Receives(Type messageType)
{
base.Receives(messageType);

return this;
}

/// <inheritdoc />
public new IPostgresReceiveEndpointDescriptor Kind(ReceiveEndpointKind kind)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ public interface IRabbitMQReceiveEndpointDescriptor : IReceiveEndpointDescriptor
/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Consumer{TConsumer}" />
new IRabbitMQReceiveEndpointDescriptor Consumer<TConsumer>() where TConsumer : class, IConsumer;

/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Receives{TMessage}" />
new IRabbitMQReceiveEndpointDescriptor Receives<TMessage>();

/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Receives(Type)" />
new IRabbitMQReceiveEndpointDescriptor Receives(Type messageType);

/// <inheritdoc cref="IReceiveEndpointDescriptor{TConfiguration}.Kind" />
new IRabbitMQReceiveEndpointDescriptor Kind(ReceiveEndpointKind kind);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ private RabbitMQReceiveEndpointDescriptor(IMessagingConfigurationContext discove
return this;
}

/// <inheritdoc />
public new IRabbitMQReceiveEndpointDescriptor Receives<TMessage>()
{
base.Receives<TMessage>();

return this;
}

/// <inheritdoc />
public new IRabbitMQReceiveEndpointDescriptor Receives(Type messageType)
{
base.Receives(messageType);

return this;
}

/// <inheritdoc />
public new IRabbitMQReceiveEndpointDescriptor Kind(ReceiveEndpointKind kind)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public class ReceiveEndpointConfiguration : MessagingConfiguration
/// </summary>
public List<Type> ConsumerIdentities { get; set; } = [];

/// <summary>
/// Gets or sets the list of message types that this endpoint receives, used to bind all handlers for those types.
/// </summary>
public List<Type> ReceivedMessageTypes { get; set; } = [];

/// <summary>
/// Gets or sets whether this is a temporary (auto-delete) endpoint.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public interface IReceiveEndpointDescriptor<out TConfiguration>
/// <returns>The descriptor instance for method chaining.</returns>
IReceiveEndpointDescriptor<TConfiguration> Consumer<TConsumer>() where TConsumer : class, IConsumer;

/// <summary>
/// Binds all handlers for the specified message type to this receive endpoint.
/// </summary>
/// <typeparam name="TMessage">The message type to receive.</typeparam>
/// <returns>The descriptor instance for method chaining.</returns>
IReceiveEndpointDescriptor<TConfiguration> Receives<TMessage>();

/// <summary>
/// Binds all handlers for the specified message type to this receive endpoint.
/// </summary>
/// <param name="messageType">The message type to receive.</param>
/// <returns>The descriptor instance for method chaining.</returns>
IReceiveEndpointDescriptor<TConfiguration> Receives(Type messageType);

/// <summary>
/// Sets the kind of this receive endpoint (e.g., default, temporary).
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ public IReceiveEndpointDescriptor<T> Consumer(Type consumerType)
return this;
}

/// <summary>
/// Binds all handlers for the specified message type to this receive endpoint.
/// </summary>
/// <typeparam name="TMessage">The message type to receive.</typeparam>
/// <returns>The descriptor instance for method chaining.</returns>
public IReceiveEndpointDescriptor<T> Receives<TMessage>()
{
Configuration.ReceivedMessageTypes.Add(typeof(TMessage));
return this;
}

/// <summary>
/// Binds all handlers for the specified message type to this receive endpoint.
/// </summary>
/// <param name="messageType">The message type to receive.</param>
/// <returns>The descriptor instance for method chaining.</returns>
public IReceiveEndpointDescriptor<T> Receives(Type messageType)
{
Configuration.ReceivedMessageTypes.Add(messageType);
return this;
}

public IReceiveEndpointDescriptor<T> Kind(ReceiveEndpointKind kind)
{
Configuration.Kind = kind;
Expand Down
5 changes: 5 additions & 0 deletions src/Mocha/src/Mocha/ThrowHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public static Exception RouteRequiresConsumer()
public static Exception RouteNotInitialized()
=> new InvalidOperationException("Route is not initialized");

public static Exception NoHandlerForMessageType(Type messageType, string? endpointName)
=> new InvalidOperationException(
$"No handler or consumer handles message type '{messageType.FullName}' "
+ $"declared on receive endpoint '{endpointName}'.");

public static Exception TransportConfigurationMissing()
=> new InvalidOperationException("Could not create configuration for transport");

Expand Down
88 changes: 59 additions & 29 deletions src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,31 @@ internal void Initialize(IMessagingSetupContext context)
// TODO maybe we should move this to endpoint initialize - not sure yet
foreach (var handlerType in endpointConfiguration.ConsumerIdentities)
{
var consumer = context.Consumers.FirstOrDefault(h => h.Identity == handlerType);
var consumer = context.Consumers.FirstOrDefault(h => h.Identity == handlerType)
?? throw new InvalidOperationException(
$"Handler type {handlerType.FullName} not found for endpoint {Configuration.Name}");

if (consumer is not null)
foreach (var route in context.Router.GetInboundByConsumer(consumer))
{
var routes = context.Router.GetInboundByConsumer(consumer);
var applied = false;
foreach (var route in routes)
{
if (route.Endpoint is null)
{
route.ConnectEndpoint(context, endpoint);
applied = true;
}
}

// in this case the user has explicilty mapped this consumer to multiple
// endpoints and we are currently missing an additional route
if (!applied)
{
var route = new InboundRoute();
var configuration = new InboundRouteConfiguration
{
MessageType = route.MessageType,
Consumer = consumer
};
route.Initialize(context, configuration);
route.ConnectEndpoint(context, endpoint);
}
BindRouteToEndpoint(context, route, endpoint);
}
}

foreach (var messageRuntimeType in endpointConfiguration.ReceivedMessageTypes)
{
var matched = context.Router.InboundRoutes
.Where(r => r.Kind is Subscribe or Send or Request
&& r.MessageType?.RuntimeType == messageRuntimeType)
.ToList();

if (matched.Count == 0)
{
throw ThrowHelper.NoHandlerForMessageType(messageRuntimeType, endpointConfiguration.Name);
}
else

foreach (var route in matched)
{
throw new InvalidOperationException(
$"Handler type {handlerType.FullName} not found for endpoint {Configuration.Name}");
BindRouteToEndpoint(context, route, endpoint);
}
}
}
Expand Down Expand Up @@ -130,6 +122,44 @@ internal void Initialize(IMessagingSetupContext context)
OnAfterInitialized(context);
}

private static void BindRouteToEndpoint(
IMessagingSetupContext context, InboundRoute route, ReceiveEndpoint endpoint)
{
if (route.Endpoint is null)
{
route.ConnectEndpoint(context, endpoint);
return;
}

if (route.Endpoint == endpoint)
{
return;
}

// The route is bound to another endpoint, so fan it out by adding an equivalent route to
// this endpoint. Skip when an equivalent route is already present so binding the same
// message type or consumer across three or more endpoints stays idempotent.
foreach (var existing in context.Router.GetInboundByEndpoint(endpoint))
{
if (existing.Consumer == route.Consumer
&& existing.Kind == route.Kind
&& Equals(existing.MessageType, route.MessageType))
{
return;
}
}

var clone = new InboundRoute();
clone.Initialize(context, new InboundRouteConfiguration
{
MessageType = route.MessageType,
Consumer = route.Consumer,
Kind = route.Kind,
Condition = route.Condition
});
clone.ConnectEndpoint(context, endpoint);
}

protected virtual void OnBeforeInitialize(IMessagingSetupContext context) { }

protected virtual void OnAfterInitialized(IMessagingSetupContext context) { }
Expand Down
Loading
Loading