From e2db68988860ecd5a1d5b64a259b391059a60773 Mon Sep 17 00:00:00 2001 From: Andrija Stevanovic Date: Fri, 13 Feb 2026 23:40:54 +0100 Subject: [PATCH] implemented rabbitmq message broker support (publisher) --- dotnet/AipsCore/AipsCore.csproj | 1 + .../Application/Abstract/IMessagePublisher.cs | 8 ++++ .../ConfigurationEnvExtensions.cs | 7 +++ .../RabbitMQ/IRabbitMQConnection.cs | 8 ++++ .../RabbitMQ/RabbitMqConnection.cs | 43 +++++++++++++++++++ .../RabbitMQ/RabbitMqPublisher.cs | 33 ++++++++++++++ 6 files changed, 100 insertions(+) create mode 100644 dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs create mode 100644 dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs create mode 100644 dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs create mode 100644 dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs diff --git a/dotnet/AipsCore/AipsCore.csproj b/dotnet/AipsCore/AipsCore.csproj index fbd60df..a733dcb 100644 --- a/dotnet/AipsCore/AipsCore.csproj +++ b/dotnet/AipsCore/AipsCore.csproj @@ -16,6 +16,7 @@ + diff --git a/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs b/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs new file mode 100644 index 0000000..d09651b --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs @@ -0,0 +1,8 @@ +using System.Runtime.Serialization; + +namespace AipsCore.Application.Abstract; + +public interface IMessagePublisher +{ + Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs index 8c56742..c2b16d3 100644 --- a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs +++ b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs @@ -6,6 +6,8 @@ public static class ConfigurationEnvExtensions { private const string DbConnStringKey = "DB_CONN_STRING"; + private const string RabbitMqAmqpUriKey = "RABBITMQ_AMQP_URI"; + private const string JwtIssuer = "JWT_ISSUER"; private const string JwtAudience = "JWT_AUDIENCE"; private const string JwtKey = "JWT_KEY"; @@ -18,6 +20,11 @@ public static class ConfigurationEnvExtensions return configuration.GetEnvForSure(DbConnStringKey); } + public string GetEnvRabbitMqAmqpUri() + { + return configuration.GetEnvForSure(RabbitMqAmqpUriKey); + } + public string GetEnvJwtIssuer() { return configuration.GetEnvForSure(JwtIssuer); diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs new file mode 100644 index 0000000..d4d0efb --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs @@ -0,0 +1,8 @@ +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public interface IRabbitMqConnection +{ + Task CreateChannelAsync(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs new file mode 100644 index 0000000..a07a981 --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs @@ -0,0 +1,43 @@ +using AipsCore.Infrastructure.DI.Configuration; +using Microsoft.Extensions.Configuration; +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqConnection : IRabbitMqConnection +{ + private readonly IConfiguration _configuration; + private IConnection? _connection; + + public RabbitMqConnection(IConfiguration configuration) + { + _configuration = configuration; + } + + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + if (_connection is null) + { + throw new InvalidOperationException($"RabbitMQ connection not created for {_configuration.GetEnvRabbitMqAmqpUri()}"); + } + + return await _connection.CreateChannelAsync(null, cancellationToken); + } + + public async Task CreateConnectionAsync() + { + var factory = CreateConnectionFactory(); + + _connection = await factory.CreateConnectionAsync(); + } + + private IConnectionFactory CreateConnectionFactory() + { + var factory = new ConnectionFactory + { + Uri = new Uri(_configuration.GetEnvRabbitMqAmqpUri()) + }; + + return factory; + } +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs new file mode 100644 index 0000000..bf3cc7c --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs @@ -0,0 +1,33 @@ +using System.Text; +using System.Text.Json; +using AipsCore.Application.Abstract; +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqPublisher : IMessagePublisher +{ + private readonly IRabbitMqConnection _connection; + + public RabbitMqPublisher(IRabbitMqConnection connection) + { + _connection = connection; + } + + public async Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default) + { + var channel = await _connection.CreateChannelAsync(cancellationToken); + + await channel.ExchangeDeclareAsync(exchange, ExchangeType.Topic, durable: true, cancellationToken: cancellationToken); + + var bytes = Serialize(message); + await channel.BasicPublishAsync(exchange, routeKey, bytes, cancellationToken); + + await channel.CloseAsync(cancellationToken); + } + + private byte[] Serialize(T message) + { + return JsonSerializer.SerializeToUtf8Bytes(message); + } +} \ No newline at end of file