diff --git a/docker/stop.sh b/docker/stop.sh new file mode 100755 index 0000000..cce683c --- /dev/null +++ b/docker/stop.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker compose -p aips --env-file ../.env down diff --git a/dotnet/AipsCore/Application/Abstract/IDispatcher.cs b/dotnet/AipsCore/Application/Abstract/IDispatcher.cs index 4cf5e44..e5ebb60 100644 --- a/dotnet/AipsCore/Application/Abstract/IDispatcher.cs +++ b/dotnet/AipsCore/Application/Abstract/IDispatcher.cs @@ -1,4 +1,5 @@ using AipsCore.Application.Abstract.Command; +using AipsCore.Application.Abstract.MessageBroking; using AipsCore.Application.Abstract.Query; namespace AipsCore.Application.Abstract; @@ -9,4 +10,6 @@ public interface IDispatcher Task Execute(ICommand command, CancellationToken cancellationToken = default); Task Execute(IQuery query, CancellationToken cancellationToken = default); + + public Task Execute(IMessage message, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessage.cs b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessage.cs new file mode 100644 index 0000000..ecade00 --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessage.cs @@ -0,0 +1,3 @@ +namespace AipsCore.Application.Abstract.MessageBroking; + +public interface IMessage; \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessageHandler.cs b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessageHandler.cs new file mode 100644 index 0000000..fad7f3c --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessageHandler.cs @@ -0,0 +1,6 @@ +namespace AipsCore.Application.Abstract.MessageBroking; + +public interface IMessageHandler where TMessage : IMessage +{ + Task Handle(TMessage message, CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessageSubscriber.cs b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessageSubscriber.cs new file mode 100644 index 0000000..af78d73 --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessageSubscriber.cs @@ -0,0 +1,8 @@ +namespace AipsCore.Application.Abstract.MessageBroking; + +public interface IMessageSubscriber +{ + Task SubscribeAsync( + Func handler + ); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Abstract/MessageBroking/MessageTag.cs b/dotnet/AipsCore/Application/Abstract/MessageBroking/MessageTag.cs new file mode 100644 index 0000000..a5b002b --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/MessageBroking/MessageTag.cs @@ -0,0 +1,7 @@ +namespace AipsCore.Application.Abstract.MessageBroking; + +public enum MessageTag +{ + Worker, + RT +} \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Common/Dispatcher/DispatchException.cs b/dotnet/AipsCore/Application/Common/Dispatcher/DispatchException.cs index 6e90bca..39cae4f 100644 --- a/dotnet/AipsCore/Application/Common/Dispatcher/DispatchException.cs +++ b/dotnet/AipsCore/Application/Common/Dispatcher/DispatchException.cs @@ -5,7 +5,7 @@ namespace AipsCore.Application.Common.Dispatcher; public class DispatchException : Exception { - public DispatchException(object commandQuery, Exception innerException) - : base($"Error dispatching '{commandQuery.GetType().Name}' because of: {innerException.Message}", innerException) + public DispatchException(object dispatchingObject, Exception innerException) + : base($"Error dispatching '{dispatchingObject.GetType().Name}' because of: {innerException.Message}", innerException) { } } \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Common/Dispatcher/Dispatcher.cs b/dotnet/AipsCore/Application/Common/Dispatcher/Dispatcher.cs index 3bbe44f..d3a7c18 100644 --- a/dotnet/AipsCore/Application/Common/Dispatcher/Dispatcher.cs +++ b/dotnet/AipsCore/Application/Common/Dispatcher/Dispatcher.cs @@ -1,5 +1,6 @@ using AipsCore.Application.Abstract; using AipsCore.Application.Abstract.Command; +using AipsCore.Application.Abstract.MessageBroking; using AipsCore.Application.Abstract.Query; using Microsoft.Extensions.DependencyInjection; @@ -36,28 +37,35 @@ public sealed class Dispatcher : IDispatcher return await this.HandleWithResult(handlerType, query, cancellationToken); } + + public async Task Execute(IMessage message, CancellationToken cancellationToken = default) + { + var handlerType = typeof(IMessageHandler<>).MakeGenericType(message.GetType()); + + await this.Handle(handlerType, message, cancellationToken); + } #endregion #region Handle - private async Task Handle(Type handlerType, object commandOrQuery, CancellationToken cancellationToken = default) + private async Task Handle(Type handlerType, object dispatchingObject, CancellationToken cancellationToken = default) { - dynamic handler = this.ResolveHandler(handlerType, commandOrQuery); + dynamic handler = this.ResolveHandler(handlerType, dispatchingObject); - await handler.Handle((dynamic)commandOrQuery, cancellationToken); + await handler.Handle((dynamic)dispatchingObject, cancellationToken); } - private async Task HandleWithResult(Type handlerType, object commandOrQuery, CancellationToken cancellationToken = default) + private async Task HandleWithResult(Type handlerType, object dispatchingObject, CancellationToken cancellationToken = default) { - dynamic handler = this.ResolveHandler(handlerType, commandOrQuery); + dynamic handler = this.ResolveHandler(handlerType, dispatchingObject); - return await handler.Handle((dynamic)commandOrQuery, cancellationToken); + return await handler.Handle((dynamic)dispatchingObject, cancellationToken); } #endregion - private dynamic ResolveHandler(Type handlerType, object commandOrQuery) + private dynamic ResolveHandler(Type handlerType, object dispatchingObject) { dynamic handler; @@ -67,7 +75,7 @@ public sealed class Dispatcher : IDispatcher } catch (InvalidOperationException serviceProviderException) { - throw new DispatchException(commandOrQuery, serviceProviderException); + throw new DispatchException(dispatchingObject, serviceProviderException); } return handler; diff --git a/dotnet/AipsCore/Application/Common/Message/TestMessage/TestMessage.cs b/dotnet/AipsCore/Application/Common/Message/TestMessage/TestMessage.cs new file mode 100644 index 0000000..e6559e6 --- /dev/null +++ b/dotnet/AipsCore/Application/Common/Message/TestMessage/TestMessage.cs @@ -0,0 +1,5 @@ +using AipsCore.Application.Abstract.MessageBroking; + +namespace AipsCore.Application.Common.Message.TestMessage; + +public record TestMessage(string Text) : IMessage; \ No newline at end of file diff --git a/dotnet/AipsCore/Application/Common/Message/TestMessage/TestMessageHandler.cs b/dotnet/AipsCore/Application/Common/Message/TestMessage/TestMessageHandler.cs new file mode 100644 index 0000000..5ba1246 --- /dev/null +++ b/dotnet/AipsCore/Application/Common/Message/TestMessage/TestMessageHandler.cs @@ -0,0 +1,14 @@ +using AipsCore.Application.Abstract.MessageBroking; + +namespace AipsCore.Application.Common.Message.TestMessage; + +public class TestMessageHandler : IMessageHandler +{ + + public Task Handle(TestMessage message, CancellationToken cancellationToken) + { + Console.WriteLine(message.Text); + + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs index 49101de..04e5a6f 100644 --- a/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs +++ b/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs @@ -20,7 +20,15 @@ public static class AipsRegistrationExtensions services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); return services; } + + public static IServiceCollection AddAipsMessageHandlers(this IServiceCollection services) + { + services.AddMessageHandlersFromAssembly(typeof(IMessage).Assembly); + + return services; + } } \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/DI/MessageHandlerRegistrationExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/MessageHandlerRegistrationExtensions.cs new file mode 100644 index 0000000..d46edce --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/DI/MessageHandlerRegistrationExtensions.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using AipsCore.Application.Abstract.MessageBroking; +using Microsoft.Extensions.DependencyInjection; + +namespace AipsCore.Infrastructure.DI; + +public static class MessageHandlerRegistrationExtensions +{ + public static IServiceCollection AddMessageHandlersFromAssembly(this IServiceCollection services, Assembly assembly) + { + var handlerInterface = typeof(IMessageHandler<>); + + var types = assembly.GetTypes() + .Where(t => t is { IsAbstract: false, IsInterface: false }); + + foreach (var type in types) + { + var interfaces = type.GetInterfaces(); + + foreach (var @interface in interfaces) + { + if (!@interface.IsGenericType) + continue; + + var genericDef = @interface.GetGenericTypeDefinition(); + + if (handlerInterface != genericDef) + continue; + + services.AddTransient(@interface, type); + } + } + + return services; + } +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqSubscriber.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqSubscriber.cs new file mode 100644 index 0000000..6002ab9 --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqSubscriber.cs @@ -0,0 +1,86 @@ +using System.Text; +using System.Text.Json; +using AipsCore.Application.Abstract.MessageBroking; +using AipsCore.Infrastructure.DI.Configuration; +using Microsoft.Extensions.Configuration; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqSubscriber : IMessageSubscriber +{ + private readonly IRabbitMqConnection _connection; + private readonly IConfiguration _configuration; + + public RabbitMqSubscriber(IRabbitMqConnection connection, IConfiguration configuration) + { + _connection = connection; + _configuration = configuration; + } + + public async Task SubscribeAsync(Func handler) + { + var channel = await _connection.CreateChannelAsync(); + + await channel.ExchangeDeclareAsync( + exchange: GetExchange(), + type: ExchangeType.Topic, + durable: true); + + await channel.QueueDeclareAsync( + queue: GetQueueName(), + durable: true); + + await channel.QueueBindAsync( + queue: GetQueueName(), + exchange: GetExchange(), + routingKey: GetQueueName()); + + var consumer = new AsyncEventingBasicConsumer(channel); + + consumer.ReceivedAsync += async (sender, args) => + { + var message = Deserialize(args.Body.ToArray()); + + try + { + await handler(message, CancellationToken.None); + + await channel.BasicAckAsync(args.DeliveryTag, multiple: false); + } + catch (Exception exception) + { + Console.Error.WriteLine(exception); + await channel.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false); + } + }; + + await channel.BasicConsumeAsync( + queue: GetQueueName(), + autoAck: false, + consumer: consumer); + } + + private string GetQueueName() + { + return typeof(T).Name; + } + + private string GetExchange() + { + return _configuration.GetEnvRabbitMqExchange(); + } + + private T Deserialize(byte[] bytes) + { + var message = JsonSerializer.Deserialize(bytes); + + if (message is null) + { + throw new Exception(); + } + + return message; + } +} \ No newline at end of file diff --git a/dotnet/AipsWebApi/Controllers/UserController.cs b/dotnet/AipsWebApi/Controllers/UserController.cs index 030c045..27eedd3 100644 --- a/dotnet/AipsWebApi/Controllers/UserController.cs +++ b/dotnet/AipsWebApi/Controllers/UserController.cs @@ -1,11 +1,14 @@ using AipsCore.Application.Abstract; using AipsCore.Application.Common.Authentication.Dtos; using AipsCore.Application.Abstract.MessageBroking; +using AipsCore.Application.Common.Authentication; +using AipsCore.Application.Common.Message.TestMessage; using AipsCore.Application.Models.User.Command.LogIn; using AipsCore.Application.Models.User.Command.LogOut; using AipsCore.Application.Models.User.Command.LogOutAll; using AipsCore.Application.Models.User.Command.RefreshLogIn; using AipsCore.Application.Models.User.Command.SignUp; +using AipsCore.Application.Models.User.Query.GetUser; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -66,6 +69,7 @@ public class UserController : ControllerBase [HttpPost("test")] public async Task Test(IMessagePublisher publisher) { - await publisher.PublishAsync("Test poruka"); + var test = new TestMessage("ovo je test poruka"); + await publisher.PublishAsync(test); } } \ No newline at end of file diff --git a/dotnet/AipsWorker/AipsWorker.csproj b/dotnet/AipsWorker/AipsWorker.csproj new file mode 100644 index 0000000..9b01779 --- /dev/null +++ b/dotnet/AipsWorker/AipsWorker.csproj @@ -0,0 +1,19 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + + + + + + diff --git a/dotnet/AipsWorker/Program.cs b/dotnet/AipsWorker/Program.cs new file mode 100644 index 0000000..b9187cb --- /dev/null +++ b/dotnet/AipsWorker/Program.cs @@ -0,0 +1,21 @@ +using AipsCore.Infrastructure.DI; +using AipsWorker; +using AipsWorker.Utilities; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +LoadingDotEnv.TryLoad(); + +var builder = Host.CreateDefaultBuilder(args); + +builder.ConfigureServices((context, services) => +{ + services.AddAips(context.Configuration); + services.AddAipsMessageHandlers(); + + services.AddHostedService(); +}); + +var app = builder.Build(); + +await app.RunAsync(); \ No newline at end of file diff --git a/dotnet/AipsWorker/Utilities/LoadingDotEnv.cs b/dotnet/AipsWorker/Utilities/LoadingDotEnv.cs new file mode 100644 index 0000000..1286e40 --- /dev/null +++ b/dotnet/AipsWorker/Utilities/LoadingDotEnv.cs @@ -0,0 +1,24 @@ +using DotNetEnv; + +namespace AipsWorker.Utilities; + +public static class LoadingDotEnv +{ + public static bool TryLoad() + { + string? dir = Directory.GetCurrentDirectory(); + + while (dir != null && !File.Exists(Path.Combine(dir, ".env"))) + { + dir = Directory.GetParent(dir)?.FullName; + } + + if (dir != null) + { + Env.Load(Path.Combine(dir, ".env")); + return true; + } + + return false; + } +} \ No newline at end of file diff --git a/dotnet/AipsWorker/Utilities/SubscribeMethodUtility.cs b/dotnet/AipsWorker/Utilities/SubscribeMethodUtility.cs new file mode 100644 index 0000000..9f99ba8 --- /dev/null +++ b/dotnet/AipsWorker/Utilities/SubscribeMethodUtility.cs @@ -0,0 +1,48 @@ +using System.Reflection; +using AipsCore.Application.Abstract.MessageBroking; + +namespace AipsWorker.Utilities; + +public class SubscribeMethodUtility +{ + private readonly IMessageSubscriber _subscriber; + + public SubscribeMethodUtility(IMessageSubscriber subscriber) + { + _subscriber = subscriber; + } + + public async Task SubscribeToMessageTypeAsync( + Type messageType, + object targetInstance, + MethodInfo handlerMethod) + { + var subscribeMethod = GetGenericSubscribeMethod(messageType); + var handlerDelegate = CreateHandlerDelegate(messageType, targetInstance, handlerMethod); + + var task = (Task)subscribeMethod.Invoke( + _subscriber, + new object[] { handlerDelegate })!; + + await task; + } + + private MethodInfo GetGenericSubscribeMethod(Type messageType) + { + var method = typeof(IMessageSubscriber) + .GetMethod(nameof(IMessageSubscriber.SubscribeAsync))!; + + return method.MakeGenericMethod(messageType); + } + + private Delegate CreateHandlerDelegate( + Type messageType, + object targetInstance, + MethodInfo handlerMethod) + { + var delegateType = typeof(Func<,,>) + .MakeGenericType(messageType, typeof(CancellationToken), typeof(Task)); + + return Delegate.CreateDelegate(delegateType, targetInstance, handlerMethod); + } +} \ No newline at end of file diff --git a/dotnet/AipsWorker/WorkerService.cs b/dotnet/AipsWorker/WorkerService.cs new file mode 100644 index 0000000..dea0790 --- /dev/null +++ b/dotnet/AipsWorker/WorkerService.cs @@ -0,0 +1,61 @@ +using System.Reflection; +using AipsCore.Application.Abstract; +using AipsCore.Application.Abstract.MessageBroking; +using AipsCore.Application.Common.Message.TestMessage; +using AipsWorker.Utilities; +using Microsoft.Extensions.Hosting; + +namespace AipsWorker; + +public class WorkerService : BackgroundService +{ + private readonly IDispatcher _dispatcher; + private readonly SubscribeMethodUtility _subscribeMethodUtility; + + public WorkerService(IMessageSubscriber subscriber, IDispatcher dispatcher) + { + _dispatcher = dispatcher; + _subscribeMethodUtility = new SubscribeMethodUtility(subscriber); + } + + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + + var messageTypes = GetAllMessageTypes(); + + foreach (var messageType in messageTypes) + { + var handleMethod = GetMessageHandleMethod(messageType); + + await _subscribeMethodUtility.SubscribeToMessageTypeAsync(messageType, this, handleMethod); + } + } + + private IReadOnlyCollection GetAllMessageTypes() + { + var messageInterface = typeof(IMessage); + var assembly = messageInterface.Assembly; + + return assembly + .GetTypes() + .Where(t => + !t.IsAbstract && + !t.IsInterface && + messageInterface.IsAssignableFrom(t)) + .ToList(); + } + + private async Task HandleMessage(T message, CancellationToken ct) where T : IMessage + { + await _dispatcher.Execute(message, ct); + } + + private MethodInfo GetMessageHandleMethod(Type messageType) + { + return GetType() + .GetMethod(nameof(HandleMessage), + BindingFlags.Instance | BindingFlags.NonPublic)! + .MakeGenericMethod(messageType); + } +} \ No newline at end of file diff --git a/dotnet/dotnet.sln b/dotnet/dotnet.sln index 5b24945..43274ba 100644 --- a/dotnet/dotnet.sln +++ b/dotnet/dotnet.sln @@ -6,6 +6,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsCore", "AipsCore\AipsCo EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsWebApi", "AipsWebApi\AipsWebApi.csproj", "{32BC8F43-322F-441A-9AE3-9D0D36B5D22B}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AipsWorker", "AipsWorker\AipsWorker.csproj", "{8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -20,5 +22,9 @@ Global {32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Debug|Any CPU.Build.0 = Debug|Any CPU {32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Release|Any CPU.ActiveCfg = Release|Any CPU {32BC8F43-322F-441A-9AE3-9D0D36B5D22B}.Release|Any CPU.Build.0 = Release|Any CPU + {8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8BEF3002-13ED-47E4-9A54-A0EF0B36CC1E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal