implement message subscribing and worker

This commit is contained in:
2026-02-14 19:17:05 +01:00
parent 4cdfa1e096
commit 0dadaf1280
19 changed files with 376 additions and 11 deletions

View File

@@ -1,4 +1,5 @@
using AipsCore.Application.Abstract.Command;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Application.Abstract.Query;
namespace AipsCore.Application.Abstract;
@@ -9,4 +10,6 @@ public interface IDispatcher
Task<TResult> Execute<TResult>(ICommand<TResult> command, CancellationToken cancellationToken = default);
Task<TResult> Execute<TResult>(IQuery<TResult> query, CancellationToken cancellationToken = default);
public Task Execute(IMessage message, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,3 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public interface IMessage;

View File

@@ -0,0 +1,6 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public interface IMessageHandler<TMessage> where TMessage : IMessage
{
Task Handle(TMessage message, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,8 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public interface IMessageSubscriber
{
Task SubscribeAsync<T>(
Func<T, CancellationToken, Task> handler
);
}

View File

@@ -0,0 +1,7 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public enum MessageTag
{
Worker,
RT
}

View File

@@ -5,7 +5,7 @@ namespace AipsCore.Application.Common.Dispatcher;
public class DispatchException : Exception
{
public DispatchException(object commandQuery, Exception innerException)
: base($"Error dispatching '{commandQuery.GetType().Name}' because of: {innerException.Message}", innerException)
public DispatchException(object dispatchingObject, Exception innerException)
: base($"Error dispatching '{dispatchingObject.GetType().Name}' because of: {innerException.Message}", innerException)
{ }
}

View File

@@ -1,5 +1,6 @@
using AipsCore.Application.Abstract;
using AipsCore.Application.Abstract.Command;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Application.Abstract.Query;
using Microsoft.Extensions.DependencyInjection;
@@ -36,28 +37,35 @@ public sealed class Dispatcher : IDispatcher
return await this.HandleWithResult<TResult>(handlerType, query, cancellationToken);
}
public async Task Execute(IMessage message, CancellationToken cancellationToken = default)
{
var handlerType = typeof(IMessageHandler<>).MakeGenericType(message.GetType());
await this.Handle(handlerType, message, cancellationToken);
}
#endregion
#region Handle
private async Task Handle(Type handlerType, object commandOrQuery, CancellationToken cancellationToken = default)
private async Task Handle(Type handlerType, object dispatchingObject, CancellationToken cancellationToken = default)
{
dynamic handler = this.ResolveHandler(handlerType, commandOrQuery);
dynamic handler = this.ResolveHandler(handlerType, dispatchingObject);
await handler.Handle((dynamic)commandOrQuery, cancellationToken);
await handler.Handle((dynamic)dispatchingObject, cancellationToken);
}
private async Task<TResult> HandleWithResult<TResult>(Type handlerType, object commandOrQuery, CancellationToken cancellationToken = default)
private async Task<TResult> HandleWithResult<TResult>(Type handlerType, object dispatchingObject, CancellationToken cancellationToken = default)
{
dynamic handler = this.ResolveHandler(handlerType, commandOrQuery);
dynamic handler = this.ResolveHandler(handlerType, dispatchingObject);
return await handler.Handle((dynamic)commandOrQuery, cancellationToken);
return await handler.Handle((dynamic)dispatchingObject, cancellationToken);
}
#endregion
private dynamic ResolveHandler(Type handlerType, object commandOrQuery)
private dynamic ResolveHandler(Type handlerType, object dispatchingObject)
{
dynamic handler;
@@ -67,7 +75,7 @@ public sealed class Dispatcher : IDispatcher
}
catch (InvalidOperationException serviceProviderException)
{
throw new DispatchException(commandOrQuery, serviceProviderException);
throw new DispatchException(dispatchingObject, serviceProviderException);
}
return handler;

View File

@@ -0,0 +1,5 @@
using AipsCore.Application.Abstract.MessageBroking;
namespace AipsCore.Application.Common.Message.TestMessage;
public record TestMessage(string Text) : IMessage;

View File

@@ -0,0 +1,14 @@
using AipsCore.Application.Abstract.MessageBroking;
namespace AipsCore.Application.Common.Message.TestMessage;
public class TestMessageHandler : IMessageHandler<TestMessage>
{
public Task Handle(TestMessage message, CancellationToken cancellationToken)
{
Console.WriteLine(message.Text);
return Task.CompletedTask;
}
}

View File

@@ -20,7 +20,15 @@ public static class AipsRegistrationExtensions
services.AddSingleton<IRabbitMqConnection, RabbitMqConnection>();
services.AddSingleton<IMessagePublisher, RabbitMqPublisher>();
services.AddSingleton<IMessageSubscriber, RabbitMqSubscriber>();
return services;
}
public static IServiceCollection AddAipsMessageHandlers(this IServiceCollection services)
{
services.AddMessageHandlersFromAssembly(typeof(IMessage).Assembly);
return services;
}
}

View File

@@ -0,0 +1,36 @@
using System.Reflection;
using AipsCore.Application.Abstract.MessageBroking;
using Microsoft.Extensions.DependencyInjection;
namespace AipsCore.Infrastructure.DI;
public static class MessageHandlerRegistrationExtensions
{
public static IServiceCollection AddMessageHandlersFromAssembly(this IServiceCollection services, Assembly assembly)
{
var handlerInterface = typeof(IMessageHandler<>);
var types = assembly.GetTypes()
.Where(t => t is { IsAbstract: false, IsInterface: false });
foreach (var type in types)
{
var interfaces = type.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!@interface.IsGenericType)
continue;
var genericDef = @interface.GetGenericTypeDefinition();
if (handlerInterface != genericDef)
continue;
services.AddTransient(@interface, type);
}
}
return services;
}
}

View File

@@ -0,0 +1,86 @@
using System.Text;
using System.Text.Json;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Infrastructure.DI.Configuration;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ;
public class RabbitMqSubscriber : IMessageSubscriber
{
private readonly IRabbitMqConnection _connection;
private readonly IConfiguration _configuration;
public RabbitMqSubscriber(IRabbitMqConnection connection, IConfiguration configuration)
{
_connection = connection;
_configuration = configuration;
}
public async Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler)
{
var channel = await _connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(
exchange: GetExchange(),
type: ExchangeType.Topic,
durable: true);
await channel.QueueDeclareAsync(
queue: GetQueueName<T>(),
durable: true);
await channel.QueueBindAsync(
queue: GetQueueName<T>(),
exchange: GetExchange(),
routingKey: GetQueueName<T>());
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (sender, args) =>
{
var message = Deserialize<T>(args.Body.ToArray());
try
{
await handler(message, CancellationToken.None);
await channel.BasicAckAsync(args.DeliveryTag, multiple: false);
}
catch (Exception exception)
{
Console.Error.WriteLine(exception);
await channel.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false);
}
};
await channel.BasicConsumeAsync(
queue: GetQueueName<T>(),
autoAck: false,
consumer: consumer);
}
private string GetQueueName<T>()
{
return typeof(T).Name;
}
private string GetExchange()
{
return _configuration.GetEnvRabbitMqExchange();
}
private T Deserialize<T>(byte[] bytes)
{
var message = JsonSerializer.Deserialize<T>(bytes);
if (message is null)
{
throw new Exception();
}
return message;
}
}