From e2db68988860ecd5a1d5b64a259b391059a60773 Mon Sep 17 00:00:00 2001 From: Andrija Stevanovic Date: Fri, 13 Feb 2026 23:40:54 +0100 Subject: [PATCH 1/4] implemented rabbitmq message broker support (publisher) --- dotnet/AipsCore/AipsCore.csproj | 1 + .../Application/Abstract/IMessagePublisher.cs | 8 ++++ .../ConfigurationEnvExtensions.cs | 7 +++ .../RabbitMQ/IRabbitMQConnection.cs | 8 ++++ .../RabbitMQ/RabbitMqConnection.cs | 43 +++++++++++++++++++ .../RabbitMQ/RabbitMqPublisher.cs | 33 ++++++++++++++ 6 files changed, 100 insertions(+) create mode 100644 dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs create mode 100644 dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs create mode 100644 dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs create mode 100644 dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs diff --git a/dotnet/AipsCore/AipsCore.csproj b/dotnet/AipsCore/AipsCore.csproj index fbd60df..a733dcb 100644 --- a/dotnet/AipsCore/AipsCore.csproj +++ b/dotnet/AipsCore/AipsCore.csproj @@ -16,6 +16,7 @@ + diff --git a/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs b/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs new file mode 100644 index 0000000..d09651b --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs @@ -0,0 +1,8 @@ +using System.Runtime.Serialization; + +namespace AipsCore.Application.Abstract; + +public interface IMessagePublisher +{ + Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs index 8c56742..c2b16d3 100644 --- a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs +++ b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs @@ -6,6 +6,8 @@ public static class ConfigurationEnvExtensions { private const string DbConnStringKey = "DB_CONN_STRING"; + private const string RabbitMqAmqpUriKey = "RABBITMQ_AMQP_URI"; + private const string JwtIssuer = "JWT_ISSUER"; private const string JwtAudience = "JWT_AUDIENCE"; private const string JwtKey = "JWT_KEY"; @@ -18,6 +20,11 @@ public static class ConfigurationEnvExtensions return configuration.GetEnvForSure(DbConnStringKey); } + public string GetEnvRabbitMqAmqpUri() + { + return configuration.GetEnvForSure(RabbitMqAmqpUriKey); + } + public string GetEnvJwtIssuer() { return configuration.GetEnvForSure(JwtIssuer); diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs new file mode 100644 index 0000000..d4d0efb --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs @@ -0,0 +1,8 @@ +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public interface IRabbitMqConnection +{ + Task CreateChannelAsync(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs new file mode 100644 index 0000000..a07a981 --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs @@ -0,0 +1,43 @@ +using AipsCore.Infrastructure.DI.Configuration; +using Microsoft.Extensions.Configuration; +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqConnection : IRabbitMqConnection +{ + private readonly IConfiguration _configuration; + private IConnection? _connection; + + public RabbitMqConnection(IConfiguration configuration) + { + _configuration = configuration; + } + + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + if (_connection is null) + { + throw new InvalidOperationException($"RabbitMQ connection not created for {_configuration.GetEnvRabbitMqAmqpUri()}"); + } + + return await _connection.CreateChannelAsync(null, cancellationToken); + } + + public async Task CreateConnectionAsync() + { + var factory = CreateConnectionFactory(); + + _connection = await factory.CreateConnectionAsync(); + } + + private IConnectionFactory CreateConnectionFactory() + { + var factory = new ConnectionFactory + { + Uri = new Uri(_configuration.GetEnvRabbitMqAmqpUri()) + }; + + return factory; + } +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs new file mode 100644 index 0000000..bf3cc7c --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs @@ -0,0 +1,33 @@ +using System.Text; +using System.Text.Json; +using AipsCore.Application.Abstract; +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqPublisher : IMessagePublisher +{ + private readonly IRabbitMqConnection _connection; + + public RabbitMqPublisher(IRabbitMqConnection connection) + { + _connection = connection; + } + + public async Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default) + { + var channel = await _connection.CreateChannelAsync(cancellationToken); + + await channel.ExchangeDeclareAsync(exchange, ExchangeType.Topic, durable: true, cancellationToken: cancellationToken); + + var bytes = Serialize(message); + await channel.BasicPublishAsync(exchange, routeKey, bytes, cancellationToken); + + await channel.CloseAsync(cancellationToken); + } + + private byte[] Serialize(T message) + { + return JsonSerializer.SerializeToUtf8Bytes(message); + } +} \ No newline at end of file From 2ea7060eefb0622d3d43ea00d1b714460f926d17 Mon Sep 17 00:00:00 2001 From: Andrija Stevanovic Date: Sat, 14 Feb 2026 00:45:35 +0100 Subject: [PATCH 2/4] namespace fix --- .../Abstract/{ => MessageBroking}/IMessagePublisher.cs | 4 +--- .../Infrastructure/DI/AipsRegistrationExtensions.cs | 5 +++++ .../MessageBroking/RabbitMQ/RabbitMqConnection.cs | 5 +++-- .../MessageBroking/RabbitMQ/RabbitMqPublisher.cs | 1 + dotnet/AipsWebApi/Controllers/UserController.cs | 8 ++++++++ 5 files changed, 18 insertions(+), 5 deletions(-) rename dotnet/AipsCore/Application/Abstract/{ => MessageBroking}/IMessagePublisher.cs (66%) diff --git a/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs similarity index 66% rename from dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs rename to dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs index d09651b..30b2998 100644 --- a/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs +++ b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs @@ -1,6 +1,4 @@ -using System.Runtime.Serialization; - -namespace AipsCore.Application.Abstract; +namespace AipsCore.Application.Abstract.MessageBroking; public interface IMessagePublisher { diff --git a/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs index 351ea14..49101de 100644 --- a/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs +++ b/dotnet/AipsCore/Infrastructure/DI/AipsRegistrationExtensions.cs @@ -1,5 +1,7 @@ using AipsCore.Application.Abstract; +using AipsCore.Application.Abstract.MessageBroking; using AipsCore.Application.Common.Dispatcher; +using AipsCore.Infrastructure.MessageBroking.RabbitMQ; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -15,6 +17,9 @@ public static class AipsRegistrationExtensions services.AddPersistence(configuration); services.AddUserContext(configuration); + + services.AddSingleton(); + services.AddSingleton(); return services; } diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs index a07a981..0a41c8d 100644 --- a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs @@ -18,10 +18,11 @@ public class RabbitMqConnection : IRabbitMqConnection { if (_connection is null) { - throw new InvalidOperationException($"RabbitMQ connection not created for {_configuration.GetEnvRabbitMqAmqpUri()}"); + await CreateConnectionAsync(); + //throw new InvalidOperationException($"RabbitMQ connection not created for {_configuration.GetEnvRabbitMqAmqpUri()}"); } - return await _connection.CreateChannelAsync(null, cancellationToken); + return await _connection!.CreateChannelAsync(null, cancellationToken); } public async Task CreateConnectionAsync() diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs index bf3cc7c..c9cb674 100644 --- a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs @@ -1,6 +1,7 @@ using System.Text; using System.Text.Json; using AipsCore.Application.Abstract; +using AipsCore.Application.Abstract.MessageBroking; using RabbitMQ.Client; namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; diff --git a/dotnet/AipsWebApi/Controllers/UserController.cs b/dotnet/AipsWebApi/Controllers/UserController.cs index 746887f..2cc97f9 100644 --- a/dotnet/AipsWebApi/Controllers/UserController.cs +++ b/dotnet/AipsWebApi/Controllers/UserController.cs @@ -1,4 +1,5 @@ using AipsCore.Application.Abstract; +using AipsCore.Application.Abstract.MessageBroking; using AipsCore.Application.Common.Authentication; using AipsCore.Application.Models.User.Command.LogIn; using AipsCore.Application.Models.User.Command.SignUp; @@ -34,4 +35,11 @@ public class UserController : ControllerBase var result = await _dispatcher.Execute(command, cancellationToken); return Ok(result.Value); } + + [AllowAnonymous] + [HttpPost("test")] + public async Task Test(IMessagePublisher publisher) + { + await publisher.PublishAsync("test", "test.test", "Test poruka"); + } } \ No newline at end of file From b0ef75e43e27b464c75a787c46d44230a305cc8c Mon Sep 17 00:00:00 2001 From: Andrija Stevanovic Date: Sat, 14 Feb 2026 16:18:03 +0100 Subject: [PATCH 3/4] rabbitmq abstracted from application --- .../MessageBroking/IMessagePublisher.cs | 2 +- .../ConfigurationEnvExtensions.cs | 6 ++++++ .../RabbitMQ/RabbitMqPublisher.cs | 18 ++++++++++++++---- .../AipsWebApi/Controllers/UserController.cs | 2 +- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs index 30b2998..c631d91 100644 --- a/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs +++ b/dotnet/AipsCore/Application/Abstract/MessageBroking/IMessagePublisher.cs @@ -2,5 +2,5 @@ namespace AipsCore.Application.Abstract.MessageBroking; public interface IMessagePublisher { - Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default); + Task PublishAsync(T message, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs index c2b16d3..da94672 100644 --- a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs +++ b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs @@ -7,6 +7,7 @@ public static class ConfigurationEnvExtensions private const string DbConnStringKey = "DB_CONN_STRING"; private const string RabbitMqAmqpUriKey = "RABBITMQ_AMQP_URI"; + private const string RabbitMqExchange = "RABBITMQ_EXCHANGE"; private const string JwtIssuer = "JWT_ISSUER"; private const string JwtAudience = "JWT_AUDIENCE"; @@ -25,6 +26,11 @@ public static class ConfigurationEnvExtensions return configuration.GetEnvForSure(RabbitMqAmqpUriKey); } + public string GetEnvRabbitMqExchange() + { + return configuration.GetEnvForSure(RabbitMqExchange); + } + public string GetEnvJwtIssuer() { return configuration.GetEnvForSure(JwtIssuer); diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs index c9cb674..c7d7dfa 100644 --- a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs @@ -2,6 +2,8 @@ using System.Text; using System.Text.Json; using AipsCore.Application.Abstract; using AipsCore.Application.Abstract.MessageBroking; +using AipsCore.Infrastructure.DI.Configuration; +using Microsoft.Extensions.Configuration; using RabbitMQ.Client; namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; @@ -9,20 +11,26 @@ namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; public class RabbitMqPublisher : IMessagePublisher { private readonly IRabbitMqConnection _connection; + private readonly IConfiguration _configuration; - public RabbitMqPublisher(IRabbitMqConnection connection) + public RabbitMqPublisher(IRabbitMqConnection connection, IConfiguration configuration) { _connection = connection; + _configuration = configuration; } - public async Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default) + public async Task PublishAsync(T message, CancellationToken cancellationToken = default) { var channel = await _connection.CreateChannelAsync(cancellationToken); - await channel.ExchangeDeclareAsync(exchange, ExchangeType.Topic, durable: true, cancellationToken: cancellationToken); + await channel.ExchangeDeclareAsync(GetExchange(), ExchangeType.Topic, durable: true, cancellationToken: cancellationToken); var bytes = Serialize(message); - await channel.BasicPublishAsync(exchange, routeKey, bytes, cancellationToken); + await channel.BasicPublishAsync( + GetExchange(), + typeof(T).Name, + bytes, + cancellationToken); await channel.CloseAsync(cancellationToken); } @@ -31,4 +39,6 @@ public class RabbitMqPublisher : IMessagePublisher { return JsonSerializer.SerializeToUtf8Bytes(message); } + + private string GetExchange() => _configuration.GetEnvRabbitMqExchange(); } \ No newline at end of file diff --git a/dotnet/AipsWebApi/Controllers/UserController.cs b/dotnet/AipsWebApi/Controllers/UserController.cs index 2cc97f9..7c3f705 100644 --- a/dotnet/AipsWebApi/Controllers/UserController.cs +++ b/dotnet/AipsWebApi/Controllers/UserController.cs @@ -40,6 +40,6 @@ public class UserController : ControllerBase [HttpPost("test")] public async Task Test(IMessagePublisher publisher) { - await publisher.PublishAsync("test", "test.test", "Test poruka"); + await publisher.PublishAsync("Test poruka"); } } \ No newline at end of file From 56f1db871cd041fa0bd45c19188a7b0cc2986e53 Mon Sep 17 00:00:00 2001 From: Andrija Stevanovic Date: Sat, 14 Feb 2026 16:18:12 +0100 Subject: [PATCH 4/4] bash script for starting docker --- docker/start.sh | 3 +++ 1 file changed, 3 insertions(+) create mode 100755 docker/start.sh diff --git a/docker/start.sh b/docker/start.sh new file mode 100755 index 0000000..29e508d --- /dev/null +++ b/docker/start.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker compose -p aips --env-file ../.env up