Skip to content

Commit 884aa80

Browse files
authored
feat: MassTransit (#94)
2 parents 67c1de4 + 18ba6d5 commit 884aa80

31 files changed

+178
-506
lines changed

CleanArchitecture.Api/CleanArchitecture.Api.csproj

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@
1414
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
1515
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="9.0.0" />
1616
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
17-
<PackageReference Include="Grpc.AspNetCore.Server.Reflection" Version="2.67.0" />
18-
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.2" />
19-
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
20-
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
17+
<PackageReference Include="Grpc.AspNetCore.Server.Reflection" Version="2.70.0" />
18+
<PackageReference Include="MassTransit.Newtonsoft" Version="8.3.7" />
19+
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.7" />
20+
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.3" />
21+
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
22+
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
2123
<PrivateAssets>all</PrivateAssets>
2224
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2325
</PackageReference>
24-
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.2" />
26+
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.3" />
2527
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.2" />
26-
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="9.0.2" />
28+
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="9.0.3" />
2729
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.1" />
2830
<PackageReference Include="Swashbuckle.AspNetCore.Annotations" Version="7.3.1" />
2931
</ItemGroup>

CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
using System;
2-
using CleanArchitecture.Domain.Rabbitmq;
2+
using CleanArchitecture.Domain.Settings;
33
using Microsoft.Extensions.Configuration;
44

55
namespace CleanArchitecture.Api.Extensions;
@@ -11,15 +11,13 @@ public static RabbitMqConfiguration GetRabbitMqConfiguration(
1111
{
1212
var isAspire = configuration["ASPIRE_ENABLED"] == "true";
1313

14-
var rabbitEnabled = configuration["RabbitMQ:Enabled"];
1514
var rabbitHost = configuration["RabbitMQ:Host"];
1615
var rabbitPort = configuration["RabbitMQ:Port"];
1716
var rabbitUser = configuration["RabbitMQ:Username"];
1817
var rabbitPass = configuration["RabbitMQ:Password"];
1918

2019
if (isAspire)
2120
{
22-
rabbitEnabled = "true";
2321
var connectionString = configuration["ConnectionStrings:RabbitMq"];
2422

2523
var rabbitUri = new Uri(connectionString!);
@@ -33,7 +31,6 @@ public static RabbitMqConfiguration GetRabbitMqConfiguration(
3331
{
3432
Host = rabbitHost ?? "",
3533
Port = int.Parse(rabbitPort ?? "0"),
36-
Enabled = bool.Parse(rabbitEnabled ?? "false"),
3734
Username = rabbitUser ?? "",
3835
Password = rabbitPass ?? ""
3936
};

CleanArchitecture.Api/Program.cs

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@
44
using CleanArchitecture.Api.Extensions;
55
using CleanArchitecture.Application.Extensions;
66
using CleanArchitecture.Application.gRPC;
7+
using CleanArchitecture.Domain.Consumers;
78
using CleanArchitecture.Domain.Extensions;
8-
using CleanArchitecture.Domain.Rabbitmq.Extensions;
99
using CleanArchitecture.Infrastructure.Database;
1010
using CleanArchitecture.Infrastructure.Extensions;
1111
using CleanArchitecture.ServiceDefaults;
1212
using HealthChecks.ApplicationStatus.DependencyInjection;
1313
using HealthChecks.UI.Client;
14+
using MassTransit;
1415
using Microsoft.AspNetCore.Builder;
1516
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
1617
using Microsoft.EntityFrameworkCore;
1718
using Microsoft.Extensions.DependencyInjection;
1819
using Microsoft.Extensions.Hosting;
1920
using Microsoft.Extensions.Logging;
21+
using Newtonsoft.Json;
2022
using RabbitMQ.Client;
2123

2224
var builder = WebApplication.CreateBuilder(args);
@@ -33,11 +35,6 @@
3335
builder.Services.AddZenFirewall();
3436
}
3537

36-
builder.Services
37-
.AddHealthChecks()
38-
.AddDbContextCheck<ApplicationDbContext>()
39-
.AddApplicationStatus();
40-
4138
var isAspire = builder.Configuration["ASPIRE_ENABLED"] == "true";
4239

4340
var rabbitConfiguration = builder.Configuration.GetRabbitMqConfiguration();
@@ -47,23 +44,22 @@
4744
? builder.Configuration["ConnectionStrings:Database"]
4845
: builder.Configuration["ConnectionStrings:DefaultConnection"];
4946

50-
if (builder.Environment.IsProduction())
51-
{
52-
builder.Services
53-
.AddHealthChecks()
54-
.AddSqlServer(dbConnectionString!)
55-
.AddRedis(redisConnectionString!, "Redis")
56-
.AddRabbitMQ(
57-
async _ =>
47+
builder.Services
48+
.AddHealthChecks()
49+
.AddDbContextCheck<ApplicationDbContext>()
50+
.AddApplicationStatus()
51+
.AddSqlServer(dbConnectionString!)
52+
.AddRedis(redisConnectionString!, "Redis")
53+
.AddRabbitMQ(
54+
async _ =>
55+
{
56+
var factory = new ConnectionFactory
5857
{
59-
var factory = new ConnectionFactory
60-
{
61-
Uri = new Uri(rabbitConfiguration.ConnectionString),
62-
};
63-
return await factory.CreateConnectionAsync();
64-
},
65-
name: "RabbitMQ");
66-
}
58+
Uri = new Uri(rabbitConfiguration.ConnectionString),
59+
};
60+
return await factory.CreateConnectionAsync();
61+
},
62+
name: "RabbitMQ");
6763

6864
builder.Services.AddDbContext<ApplicationDbContext>(options =>
6965
{
@@ -82,7 +78,48 @@
8278
builder.Services.AddNotificationHandlers();
8379
builder.Services.AddApiUser();
8480

85-
builder.Services.AddRabbitMqHandler(rabbitConfiguration);
81+
builder.Services.AddMassTransit(x =>
82+
{
83+
x.AddConsumer<FanoutEventConsumer>();
84+
x.AddConsumer<TenantUpdatedEventConsumer>();
85+
86+
x.UsingRabbitMq((context, cfg) =>
87+
{
88+
cfg.ConfigureNewtonsoftJsonSerializer(settings =>
89+
{
90+
settings.TypeNameHandling = TypeNameHandling.Objects;
91+
settings.NullValueHandling = NullValueHandling.Ignore;
92+
return settings;
93+
});
94+
cfg.UseNewtonsoftJsonSerializer();
95+
cfg.ConfigureNewtonsoftJsonDeserializer(settings =>
96+
{
97+
settings.TypeNameHandling = TypeNameHandling.Objects;
98+
settings.NullValueHandling = NullValueHandling.Ignore;
99+
return settings;
100+
});
101+
102+
cfg.Host(rabbitConfiguration.Host, (ushort)rabbitConfiguration.Port, "/", h => {
103+
h.Username(rabbitConfiguration.Username);
104+
h.Password(rabbitConfiguration.Password);
105+
});
106+
107+
// Every instance of the service will receive the message
108+
cfg.ReceiveEndpoint("clean-architecture-fanout-event-" + Guid.NewGuid(), e =>
109+
{
110+
e.Durable = false;
111+
e.AutoDelete = true;
112+
e.ConfigureConsumer<FanoutEventConsumer>(context);
113+
e.DiscardSkippedMessages();
114+
});
115+
cfg.ReceiveEndpoint("clean-architecture-fanout-events", e =>
116+
{
117+
e.ConfigureConsumer<TenantUpdatedEventConsumer>(context);
118+
e.DiscardSkippedMessages();
119+
});
120+
cfg.ConfigureEndpoints(context);
121+
});
122+
});
86123

87124
builder.Services.AddHostedService<SetInactiveUsersService>();
88125

@@ -148,7 +185,7 @@
148185
app.MapGrpcService<UsersApiImplementation>();
149186
app.MapGrpcService<TenantsApiImplementation>();
150187

151-
app.Run();
188+
await app.RunAsync();
152189

153190
// Needed for integration tests web application factory
154191
public partial class Program

CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<PackageReference Include="Aspire.Hosting.AppHost" Version="9.1.0" />
1616
<PackageReference Include="Aspire.Hosting.RabbitMQ" Version="9.1.0" />
1717
<PackageReference Include="Aspire.Hosting.Redis" Version="9.1.0" />
18-
<PackageReference Include="Aspire.Hosting.SqlServer" Version="9.0.0" />
18+
<PackageReference Include="Aspire.Hosting.SqlServer" Version="9.1.0" />
1919
</ItemGroup>
2020

2121
<ItemGroup>

CleanArchitecture.Application/CleanArchitecture.Application.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</PropertyGroup>
77

88
<ItemGroup>
9-
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
9+
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
1010
</ItemGroup>
1111

1212
<ItemGroup>

CleanArchitecture.Domain/CleanArchitecture.Domain.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
<ItemGroup>
99
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3" />
1010
<PackageReference Include="FluentValidation" Version="11.11.0" />
11+
<PackageReference Include="MassTransit" Version="8.3.7" />
1112
<PackageReference Include="MediatR" Version="12.4.1" />
12-
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.2" />
13+
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.3" />
1314
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
14-
<PackageReference Include="RabbitMQ.Client" Version="7.1.1" />
15-
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.6.0" />
15+
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.6.1" />
1616
</ItemGroup>
1717

1818
<ItemGroup>

CleanArchitecture.Domain/Constants/Messaging.cs

Lines changed: 0 additions & 6 deletions
This file was deleted.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Threading.Tasks;
2+
using CleanArchitecture.Shared.Events;
3+
using MassTransit;
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace CleanArchitecture.Domain.Consumers;
7+
8+
public sealed class FanoutEventConsumer : IConsumer<FanoutDomainEvent>
9+
{
10+
private readonly ILogger<FanoutEventConsumer> _logger;
11+
12+
public FanoutEventConsumer(ILogger<FanoutEventConsumer> logger)
13+
{
14+
_logger = logger;
15+
}
16+
17+
public Task Consume(ConsumeContext<FanoutDomainEvent> context)
18+
{
19+
_logger.LogInformation("FanoutDomainEventConsumer: {FanoutDomainEvent}", context.Message);
20+
return Task.CompletedTask;
21+
}
22+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Threading.Tasks;
2+
using CleanArchitecture.Shared.Events.Tenant;
3+
using MassTransit;
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace CleanArchitecture.Domain.Consumers;
7+
8+
public sealed class TenantUpdatedEventConsumer : IConsumer<TenantUpdatedEvent>
9+
{
10+
private readonly ILogger<TenantUpdatedEventConsumer> _logger;
11+
12+
public TenantUpdatedEventConsumer(ILogger<TenantUpdatedEventConsumer> logger)
13+
{
14+
_logger = logger;
15+
}
16+
17+
public Task Consume(ConsumeContext<TenantUpdatedEvent> context)
18+
{
19+
_logger.LogInformation("TenantUpdatedEventConsumer: {TenantId}", context.Message.AggregateId);
20+
return Task.CompletedTask;
21+
}
22+
}
Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,33 @@
11
using System.Threading.Tasks;
2-
using CleanArchitecture.Domain.Constants;
3-
using CleanArchitecture.Domain.Rabbitmq;
2+
using CleanArchitecture.Domain.Interfaces;
43
using CleanArchitecture.Shared.Events;
4+
using MassTransit;
55

66
namespace CleanArchitecture.Domain.EventHandler.Fanout;
77

88
public sealed class FanoutEventHandler : IFanoutEventHandler
99
{
10-
private readonly RabbitMqHandler _rabbitMqHandler;
10+
private readonly IPublishEndpoint _massTransit;
11+
private readonly IUser _user;
1112

1213
public FanoutEventHandler(
13-
RabbitMqHandler rabbitMqHandler)
14+
IPublishEndpoint massTransit, IUser user)
1415
{
15-
_rabbitMqHandler = rabbitMqHandler;
16-
_rabbitMqHandler.InitializeExchange(Messaging.ExchangeNameNotifications);
16+
_massTransit = massTransit;
17+
_user = user;
1718
}
1819

19-
public Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event)
20+
public async Task<T> HandleDomainEventAsync<T>(T @event) where T : DomainEvent
2021
{
21-
_rabbitMqHandler.EnqueueExchangeMessage(
22-
Messaging.ExchangeNameNotifications,
23-
@event);
22+
var fanoutDomainEvent =
23+
new FanoutDomainEvent(
24+
@event.AggregateId,
25+
@event,
26+
_user.GetUserId());
27+
28+
await _massTransit.Publish(fanoutDomainEvent);
29+
await _massTransit.Publish(@event);
2430

25-
return Task.FromResult(@event);
31+
return @event;
2632
}
2733
}

0 commit comments

Comments
 (0)