Merge pull request #32 from StewKI/feature-worker

Message subscribing and Worker
This commit is contained in:
Andrija Stevanović
2026-02-15 16:24:09 +01:00
committed by GitHub
20 changed files with 381 additions and 11 deletions

3
docker/stop.sh Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/bash
docker compose -p aips --env-file ../.env down

View File

@@ -1,4 +1,5 @@
using AipsCore.Application.Abstract.Command; using AipsCore.Application.Abstract.Command;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Application.Abstract.Query; using AipsCore.Application.Abstract.Query;
namespace AipsCore.Application.Abstract; namespace AipsCore.Application.Abstract;
@@ -9,4 +10,6 @@ public interface IDispatcher
Task<TResult> Execute<TResult>(ICommand<TResult> command, CancellationToken cancellationToken = default); Task<TResult> Execute<TResult>(ICommand<TResult> command, CancellationToken cancellationToken = default);
Task<TResult> Execute<TResult>(IQuery<TResult> query, CancellationToken cancellationToken = default); Task<TResult> Execute<TResult>(IQuery<TResult> query, CancellationToken cancellationToken = default);
public Task Execute(IMessage message, CancellationToken cancellationToken = default);
} }

View File

@@ -0,0 +1,3 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public interface IMessage;

View File

@@ -0,0 +1,6 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public interface IMessageHandler<TMessage> where TMessage : IMessage
{
Task Handle(TMessage message, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,8 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public interface IMessageSubscriber
{
Task SubscribeAsync<T>(
Func<T, CancellationToken, Task> handler
);
}

View File

@@ -0,0 +1,7 @@
namespace AipsCore.Application.Abstract.MessageBroking;
public enum MessageTag
{
Worker,
RT
}

View File

@@ -5,7 +5,7 @@ namespace AipsCore.Application.Common.Dispatcher;
public class DispatchException : Exception public class DispatchException : Exception
{ {
public DispatchException(object commandQuery, Exception innerException) public DispatchException(object dispatchingObject, Exception innerException)
: base($"Error dispatching '{commandQuery.GetType().Name}' because of: {innerException.Message}", innerException) : base($"Error dispatching '{dispatchingObject.GetType().Name}' because of: {innerException.Message}", innerException)
{ } { }
} }

View File

@@ -1,5 +1,6 @@
using AipsCore.Application.Abstract; using AipsCore.Application.Abstract;
using AipsCore.Application.Abstract.Command; using AipsCore.Application.Abstract.Command;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Application.Abstract.Query; using AipsCore.Application.Abstract.Query;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@@ -37,27 +38,34 @@ public sealed class Dispatcher : IDispatcher
return await this.HandleWithResult<TResult>(handlerType, query, cancellationToken); return await this.HandleWithResult<TResult>(handlerType, query, cancellationToken);
} }
public async Task Execute(IMessage message, CancellationToken cancellationToken = default)
{
var handlerType = typeof(IMessageHandler<>).MakeGenericType(message.GetType());
await this.Handle(handlerType, message, cancellationToken);
}
#endregion #endregion
#region Handle #region Handle
private async Task Handle(Type handlerType, object commandOrQuery, CancellationToken cancellationToken = default) private async Task Handle(Type handlerType, object dispatchingObject, CancellationToken cancellationToken = default)
{ {
dynamic handler = this.ResolveHandler(handlerType, commandOrQuery); dynamic handler = this.ResolveHandler(handlerType, dispatchingObject);
await handler.Handle((dynamic)commandOrQuery, cancellationToken); await handler.Handle((dynamic)dispatchingObject, cancellationToken);
} }
private async Task<TResult> HandleWithResult<TResult>(Type handlerType, object commandOrQuery, CancellationToken cancellationToken = default) private async Task<TResult> HandleWithResult<TResult>(Type handlerType, object dispatchingObject, CancellationToken cancellationToken = default)
{ {
dynamic handler = this.ResolveHandler(handlerType, commandOrQuery); dynamic handler = this.ResolveHandler(handlerType, dispatchingObject);
return await handler.Handle((dynamic)commandOrQuery, cancellationToken); return await handler.Handle((dynamic)dispatchingObject, cancellationToken);
} }
#endregion #endregion
private dynamic ResolveHandler(Type handlerType, object commandOrQuery) private dynamic ResolveHandler(Type handlerType, object dispatchingObject)
{ {
dynamic handler; dynamic handler;
@@ -67,7 +75,7 @@ public sealed class Dispatcher : IDispatcher
} }
catch (InvalidOperationException serviceProviderException) catch (InvalidOperationException serviceProviderException)
{ {
throw new DispatchException(commandOrQuery, serviceProviderException); throw new DispatchException(dispatchingObject, serviceProviderException);
} }
return handler; return handler;

View File

@@ -0,0 +1,5 @@
using AipsCore.Application.Abstract.MessageBroking;
namespace AipsCore.Application.Common.Message.TestMessage;
public record TestMessage(string Text) : IMessage;

View File

@@ -0,0 +1,14 @@
using AipsCore.Application.Abstract.MessageBroking;
namespace AipsCore.Application.Common.Message.TestMessage;
public class TestMessageHandler : IMessageHandler<TestMessage>
{
public Task Handle(TestMessage message, CancellationToken cancellationToken)
{
Console.WriteLine(message.Text);
return Task.CompletedTask;
}
}

View File

@@ -20,6 +20,14 @@ public static class AipsRegistrationExtensions
services.AddSingleton<IRabbitMqConnection, RabbitMqConnection>(); services.AddSingleton<IRabbitMqConnection, RabbitMqConnection>();
services.AddSingleton<IMessagePublisher, RabbitMqPublisher>(); services.AddSingleton<IMessagePublisher, RabbitMqPublisher>();
services.AddSingleton<IMessageSubscriber, RabbitMqSubscriber>();
return services;
}
public static IServiceCollection AddAipsMessageHandlers(this IServiceCollection services)
{
services.AddMessageHandlersFromAssembly(typeof(IMessage).Assembly);
return services; return services;
} }

View File

@@ -0,0 +1,36 @@
using System.Reflection;
using AipsCore.Application.Abstract.MessageBroking;
using Microsoft.Extensions.DependencyInjection;
namespace AipsCore.Infrastructure.DI;
public static class MessageHandlerRegistrationExtensions
{
public static IServiceCollection AddMessageHandlersFromAssembly(this IServiceCollection services, Assembly assembly)
{
var handlerInterface = typeof(IMessageHandler<>);
var types = assembly.GetTypes()
.Where(t => t is { IsAbstract: false, IsInterface: false });
foreach (var type in types)
{
var interfaces = type.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!@interface.IsGenericType)
continue;
var genericDef = @interface.GetGenericTypeDefinition();
if (handlerInterface != genericDef)
continue;
services.AddTransient(@interface, type);
}
}
return services;
}
}

View File

@@ -0,0 +1,86 @@
using System.Text;
using System.Text.Json;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Infrastructure.DI.Configuration;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ;
public class RabbitMqSubscriber : IMessageSubscriber
{
private readonly IRabbitMqConnection _connection;
private readonly IConfiguration _configuration;
public RabbitMqSubscriber(IRabbitMqConnection connection, IConfiguration configuration)
{
_connection = connection;
_configuration = configuration;
}
public async Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler)
{
var channel = await _connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(
exchange: GetExchange(),
type: ExchangeType.Topic,
durable: true);
await channel.QueueDeclareAsync(
queue: GetQueueName<T>(),
durable: true);
await channel.QueueBindAsync(
queue: GetQueueName<T>(),
exchange: GetExchange(),
routingKey: GetQueueName<T>());
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (sender, args) =>
{
var message = Deserialize<T>(args.Body.ToArray());
try
{
await handler(message, CancellationToken.None);
await channel.BasicAckAsync(args.DeliveryTag, multiple: false);
}
catch (Exception exception)
{
Console.Error.WriteLine(exception);
await channel.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false);
}
};
await channel.BasicConsumeAsync(
queue: GetQueueName<T>(),
autoAck: false,
consumer: consumer);
}
private string GetQueueName<T>()
{
return typeof(T).Name;
}
private string GetExchange()
{
return _configuration.GetEnvRabbitMqExchange();
}
private T Deserialize<T>(byte[] bytes)
{
var message = JsonSerializer.Deserialize<T>(bytes);
if (message is null)
{
throw new Exception();
}
return message;
}
}

View File

@@ -1,11 +1,14 @@
using AipsCore.Application.Abstract; using AipsCore.Application.Abstract;
using AipsCore.Application.Common.Authentication.Dtos; using AipsCore.Application.Common.Authentication.Dtos;
using AipsCore.Application.Abstract.MessageBroking; using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Application.Common.Authentication;
using AipsCore.Application.Common.Message.TestMessage;
using AipsCore.Application.Models.User.Command.LogIn; using AipsCore.Application.Models.User.Command.LogIn;
using AipsCore.Application.Models.User.Command.LogOut; using AipsCore.Application.Models.User.Command.LogOut;
using AipsCore.Application.Models.User.Command.LogOutAll; using AipsCore.Application.Models.User.Command.LogOutAll;
using AipsCore.Application.Models.User.Command.RefreshLogIn; using AipsCore.Application.Models.User.Command.RefreshLogIn;
using AipsCore.Application.Models.User.Command.SignUp; using AipsCore.Application.Models.User.Command.SignUp;
using AipsCore.Application.Models.User.Query.GetUser;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
@@ -66,6 +69,7 @@ public class UserController : ControllerBase
[HttpPost("test")] [HttpPost("test")]
public async Task Test(IMessagePublisher publisher) public async Task Test(IMessagePublisher publisher)
{ {
await publisher.PublishAsync("Test poruka"); var test = new TestMessage("ovo je test poruka");
await publisher.PublishAsync(test);
} }
} }

View File

@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\AipsCore\AipsCore.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="DotNetEnv" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="10.0.3" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,21 @@
using AipsCore.Infrastructure.DI;
using AipsWorker;
using AipsWorker.Utilities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
LoadingDotEnv.TryLoad();
var builder = Host.CreateDefaultBuilder(args);
builder.ConfigureServices((context, services) =>
{
services.AddAips(context.Configuration);
services.AddAipsMessageHandlers();
services.AddHostedService<WorkerService>();
});
var app = builder.Build();
await app.RunAsync();

View File

@@ -0,0 +1,24 @@
using DotNetEnv;
namespace AipsWorker.Utilities;
public static class LoadingDotEnv
{
public static bool TryLoad()
{
string? dir = Directory.GetCurrentDirectory();
while (dir != null && !File.Exists(Path.Combine(dir, ".env")))
{
dir = Directory.GetParent(dir)?.FullName;
}
if (dir != null)
{
Env.Load(Path.Combine(dir, ".env"));
return true;
}
return false;
}
}

View File

@@ -0,0 +1,48 @@
using System.Reflection;
using AipsCore.Application.Abstract.MessageBroking;
namespace AipsWorker.Utilities;
public class SubscribeMethodUtility
{
private readonly IMessageSubscriber _subscriber;
public SubscribeMethodUtility(IMessageSubscriber subscriber)
{
_subscriber = subscriber;
}
public async Task SubscribeToMessageTypeAsync(
Type messageType,
object targetInstance,
MethodInfo handlerMethod)
{
var subscribeMethod = GetGenericSubscribeMethod(messageType);
var handlerDelegate = CreateHandlerDelegate(messageType, targetInstance, handlerMethod);
var task = (Task)subscribeMethod.Invoke(
_subscriber,
new object[] { handlerDelegate })!;
await task;
}
private MethodInfo GetGenericSubscribeMethod(Type messageType)
{
var method = typeof(IMessageSubscriber)
.GetMethod(nameof(IMessageSubscriber.SubscribeAsync))!;
return method.MakeGenericMethod(messageType);
}
private Delegate CreateHandlerDelegate(
Type messageType,
object targetInstance,
MethodInfo handlerMethod)
{
var delegateType = typeof(Func<,,>)
.MakeGenericType(messageType, typeof(CancellationToken), typeof(Task));
return Delegate.CreateDelegate(delegateType, targetInstance, handlerMethod);
}
}

View File

@@ -0,0 +1,61 @@
using System.Reflection;
using AipsCore.Application.Abstract;
using AipsCore.Application.Abstract.MessageBroking;
using AipsCore.Application.Common.Message.TestMessage;
using AipsWorker.Utilities;
using Microsoft.Extensions.Hosting;
namespace AipsWorker;
public class WorkerService : BackgroundService
{
private readonly IDispatcher _dispatcher;
private readonly SubscribeMethodUtility _subscribeMethodUtility;
public WorkerService(IMessageSubscriber subscriber, IDispatcher dispatcher)
{
_dispatcher = dispatcher;
_subscribeMethodUtility = new SubscribeMethodUtility(subscriber);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var messageTypes = GetAllMessageTypes();
foreach (var messageType in messageTypes)
{
var handleMethod = GetMessageHandleMethod(messageType);
await _subscribeMethodUtility.SubscribeToMessageTypeAsync(messageType, this, handleMethod);
}
}
private IReadOnlyCollection<Type> GetAllMessageTypes()
{
var messageInterface = typeof(IMessage);
var assembly = messageInterface.Assembly;
return assembly
.GetTypes()
.Where(t =>
!t.IsAbstract &&
!t.IsInterface &&
messageInterface.IsAssignableFrom(t))
.ToList();
}
private async Task HandleMessage<T>(T message, CancellationToken ct) where T : IMessage
{
await _dispatcher.Execute(message, ct);
}
private MethodInfo GetMessageHandleMethod(Type messageType)
{
return GetType()
.GetMethod(nameof(HandleMessage),
BindingFlags.Instance | BindingFlags.NonPublic)!
.MakeGenericMethod(messageType);
}
}

View File

@@ -6,6 +6,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsCore", "AipsCore\AipsCo
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsWebApi", "AipsWebApi\AipsWebApi.csproj", "{32BC8F43-322F-441A-9AE3-9D0D36B5D22B}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsWebApi", "AipsWebApi\AipsWebApi.csproj", "{32BC8F43-322F-441A-9AE3-9D0D36B5D22B}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsWorker", "AipsWorker\AipsWorker.csproj", "{8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@@ -20,5 +22,9 @@ Global
{32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Debug|Any CPU.Build.0 = Debug|Any CPU {32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Release|Any CPU.ActiveCfg = Release|Any CPU {32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Release|Any CPU.Build.0 = Release|Any CPU {32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Release|Any CPU.Build.0 = Release|Any CPU
{8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
EndGlobal EndGlobal