diff --git a/dotnet/AipsCore/AipsCore.csproj b/dotnet/AipsCore/AipsCore.csproj index fbd60df..a733dcb 100644 --- a/dotnet/AipsCore/AipsCore.csproj +++ b/dotnet/AipsCore/AipsCore.csproj @@ -16,6 +16,7 @@ + diff --git a/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs b/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs new file mode 100644 index 0000000..d09651b --- /dev/null +++ b/dotnet/AipsCore/Application/Abstract/IMessagePublisher.cs @@ -0,0 +1,8 @@ +using System.Runtime.Serialization; + +namespace AipsCore.Application.Abstract; + +public interface IMessagePublisher +{ + Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs index 8c56742..c2b16d3 100644 --- a/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs +++ b/dotnet/AipsCore/Infrastructure/DI/Configuration/ConfigurationEnvExtensions.cs @@ -6,6 +6,8 @@ public static class ConfigurationEnvExtensions { private const string DbConnStringKey = "DB_CONN_STRING"; + private const string RabbitMqAmqpUriKey = "RABBITMQ_AMQP_URI"; + private const string JwtIssuer = "JWT_ISSUER"; private const string JwtAudience = "JWT_AUDIENCE"; private const string JwtKey = "JWT_KEY"; @@ -18,6 +20,11 @@ public static class ConfigurationEnvExtensions return configuration.GetEnvForSure(DbConnStringKey); } + public string GetEnvRabbitMqAmqpUri() + { + return configuration.GetEnvForSure(RabbitMqAmqpUriKey); + } + public string GetEnvJwtIssuer() { return configuration.GetEnvForSure(JwtIssuer); diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs new file mode 100644 index 0000000..d4d0efb --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/IRabbitMQConnection.cs @@ -0,0 +1,8 @@ +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public interface IRabbitMqConnection +{ + Task CreateChannelAsync(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs new file mode 100644 index 0000000..a07a981 --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqConnection.cs @@ -0,0 +1,43 @@ +using AipsCore.Infrastructure.DI.Configuration; +using Microsoft.Extensions.Configuration; +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqConnection : IRabbitMqConnection +{ + private readonly IConfiguration _configuration; + private IConnection? _connection; + + public RabbitMqConnection(IConfiguration configuration) + { + _configuration = configuration; + } + + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + if (_connection is null) + { + throw new InvalidOperationException($"RabbitMQ connection not created for {_configuration.GetEnvRabbitMqAmqpUri()}"); + } + + return await _connection.CreateChannelAsync(null, cancellationToken); + } + + public async Task CreateConnectionAsync() + { + var factory = CreateConnectionFactory(); + + _connection = await factory.CreateConnectionAsync(); + } + + private IConnectionFactory CreateConnectionFactory() + { + var factory = new ConnectionFactory + { + Uri = new Uri(_configuration.GetEnvRabbitMqAmqpUri()) + }; + + return factory; + } +} \ No newline at end of file diff --git a/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs new file mode 100644 index 0000000..bf3cc7c --- /dev/null +++ b/dotnet/AipsCore/Infrastructure/MessageBroking/RabbitMQ/RabbitMqPublisher.cs @@ -0,0 +1,33 @@ +using System.Text; +using System.Text.Json; +using AipsCore.Application.Abstract; +using RabbitMQ.Client; + +namespace AipsCore.Infrastructure.MessageBroking.RabbitMQ; + +public class RabbitMqPublisher : IMessagePublisher +{ + private readonly IRabbitMqConnection _connection; + + public RabbitMqPublisher(IRabbitMqConnection connection) + { + _connection = connection; + } + + public async Task PublishAsync(string exchange, string routeKey, T message, CancellationToken cancellationToken = default) + { + var channel = await _connection.CreateChannelAsync(cancellationToken); + + await channel.ExchangeDeclareAsync(exchange, ExchangeType.Topic, durable: true, cancellationToken: cancellationToken); + + var bytes = Serialize(message); + await channel.BasicPublishAsync(exchange, routeKey, bytes, cancellationToken); + + await channel.CloseAsync(cancellationToken); + } + + private byte[] Serialize(T message) + { + return JsonSerializer.SerializeToUtf8Bytes(message); + } +} \ No newline at end of file