Skip to content

Commit

Permalink
Added advanced exercise together with kafka configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Aug 27, 2019
1 parent b3aca3d commit 013dbf7
Show file tree
Hide file tree
Showing 26 changed files with 418 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
* text=auto *.sh eol=lf
* text=auto eol=lf
*.png binary
*.ttf binary
*.jpg binary
Expand Down
5 changes: 4 additions & 1 deletion Workshop/01-EventStoreBasics/EventStoreBasics/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ namespace EventStoreBasics
public interface IRepository<T> where T : IAggregate
{
T Find(Guid id);

void Add(T aggregate);

void Update(T aggregate);

void Delete(T aggregate);
}
}
}
35 changes: 35 additions & 0 deletions Workshop/02-EventSourcingAdvanced/Core/Aggregates/Aggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Newtonsoft.Json;

namespace Core.Aggregates
{
public abstract class Aggregate: IAggregate
{
public Guid Id { get; protected set; }

public int Version { get; protected set; }

[JsonIgnore]
private readonly List<object> uncommittedEvents = new List<object>();

//for serialization purposes
protected Aggregate() { }

IEnumerable<object> IAggregate.DequeueUncommittedEvents()
{
var dequeuedEvents = uncommittedEvents.ToList();

uncommittedEvents.Clear();

return dequeuedEvents;
}

protected void Enqueue(object @event)
{
Version++;
uncommittedEvents.Add(@event);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using System;
using System.Collections.Generic;

namespace Core.Aggregates
{
public interface IAggregate
{
Guid Id { get; }
int Version { get; }

IEnumerable<object> DequeueUncommittedEvents();
}
}
17 changes: 17 additions & 0 deletions Workshop/02-EventSourcingAdvanced/Core/Config.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Core.Commands;
using Core.Events;
using Core.Queries;
using Microsoft.Extensions.DependencyInjection;

namespace Core
{
public static class Config
{
public static void AddCoreServices(this IServiceCollection services)
{
services.AddScoped<ICommandBus, CommandBus>();
services.AddScoped<IQueryBus, QueryBus>();
services.AddScoped<IEventBus, EventBus>();
}
}
}
1 change: 1 addition & 0 deletions Workshop/02-EventSourcingAdvanced/Core/Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
<ItemGroup>
<PackageReference Include="MediatR" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
</ItemGroup>
</Project>
9 changes: 6 additions & 3 deletions Workshop/02-EventSourcingAdvanced/Core/Events/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ namespace Core.Events
{
public class EventBus: IEventBus
{
private readonly IMediator _mediator;
private readonly IMediator mediator;
private readonly KafkaProducer producer;

public EventBus(IMediator mediator)
{
_mediator = mediator;
this.mediator = mediator;
producer = new KafkaProducer("localhost:9092", "meetingsmanagement");
}

public async Task Publish<TEvent>(params TEvent[] events) where TEvent : IEvent
{
foreach (var @event in events)
{
await _mediator.Publish(@event);
await mediator.Publish(@event);
await producer.Publish(@event);
}
}
}
Expand Down

This file was deleted.

This file was deleted.

44 changes: 44 additions & 0 deletions Workshop/02-EventSourcingAdvanced/Core/Events/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Newtonsoft.Json;

namespace Core.Events
{
public class KafkaProducer
{
private readonly string endpoint;
private readonly string topic;
private readonly string partitionKey;

public KafkaProducer(
string endpoint,
string topic
)
{
this.endpoint = endpoint ?? throw new ArgumentNullException(nameof(endpoint));
this.topic = topic ?? throw new ArgumentNullException(nameof(topic));
}

public async Task Publish(object @event)
{
var producerConfig = new ProducerConfig { BootstrapServers = endpoint };

using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
{
while (true)
{
try
{
await p.ProduceAsync(topic, new Message<Null, string> { Value = JsonConvert.SerializeObject(@event) });
}
catch (Exception e)
{
Console.WriteLine(e);
throw e;
}
}
}
}
}
}
18 changes: 18 additions & 0 deletions Workshop/02-EventSourcingAdvanced/Core/Storage/IRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Core.Aggregates;

namespace Core.Storage
{
public interface IRepository<T> where T : IAggregate
{
Task<T> Find(Guid id, CancellationToken cancellationToken);

Task Add(T aggregate, CancellationToken cancellationToken);

Task Update(T aggregate, CancellationToken cancellationToken);

Task Delete(T aggregate, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Core.Commands;
using Core.Queries;
using MeetingsManagement.Meetings.Commands;
using MeetingsManagement.Meetings.Queries;
using MeetingsManagement.Meetings.ValueObjects;
using Microsoft.AspNetCore.Mvc;

namespace MeetingsManagement.Api.Controllers
{
Expand All @@ -21,14 +21,12 @@ public MeetingsController(ICommandBus commandBus, IQueryBus queryBus)
_queryBus = queryBus;
}

// GET api/values
[HttpGet]
public Task<MeetingSummary> Get(Guid meetingId)
[HttpGet("{id}")]
public Task<MeetingSummary> Get(Guid id)
{
return _queryBus.Send<GetMeeting, MeetingSummary>(new GetMeeting());
return _queryBus.Send<GetMeeting, MeetingSummary>(new GetMeeting(id));
}

// POST api/values
[HttpPost]
public Task Post([FromBody]CreateMeeting command)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MeetingsManagement.Configuration;
using Core;
using MeetingsManagement.Configuration;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
Expand All @@ -15,6 +16,7 @@ public Startup(IConfiguration config)
{
this.config = config;
}

// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
Expand All @@ -26,9 +28,11 @@ public void ConfigureServices(IServiceCollection services)
c.SwaggerDoc("v1", new Info { Title = "Meeting Management", Version = "v1" });
});

services.AddMarten(config);

services.AddMediatR();

services.AddCoreServices();

services.AddMeetingsManagement(config);
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
},
},
"EventStore": {
"ConnectionString": "PORT = 5432; HOST = postgres; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'",
"ConnectionString": "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'",
"Schema": "MeetingsManagement"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
},
},
"EventStore": {
"ConnectionString": "PORT = 5432; HOST = postgres; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'",
"ConnectionString": "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'",
"Schema": "MeetingsManagement"
},
"AllowedHosts": "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

namespace MeetingsManagement.Configuration
{
public static class MartenConfig
internal static class MartenConfig
{
public static void AddMarten(this IServiceCollection services, IConfiguration config)
internal static void AddMarten(this IServiceCollection services, IConfiguration config)
{
services.AddSingleton<IDocumentStore>(sp => DocumentStore.For(options => SetStoreOptions(options, config)));
services.AddSingleton(sp => sp.GetRequiredService<IDocumentStore>().OpenSession());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using MeetingsManagement.Meetings;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace MeetingsManagement.Configuration
{
public static class ModuleConfig
{
public static void AddMeetingsManagement(this IServiceCollection services, IConfiguration config)
{
services.AddMarten(config);
services.AddMeeting();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

namespace MeetingsManagement.Meetings.Commands
{
public class CreateMeeting : ICommand
public class CreateMeeting: ICommand
{
public string Name { get; }

public CreateMeeting(string name)
{
Name = name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Core.Storage;
using MediatR;
using MeetingsManagement.Meetings.Commands;
using MeetingsManagement.Meetings.Queries;
using MeetingsManagement.Meetings.ValueObjects;
using MeetingsManagement.Storage;
using Microsoft.Extensions.DependencyInjection;

namespace MeetingsManagement.Meetings
{
public static class Config
{
public static void AddMeeting(this IServiceCollection services)
{
services.AddScoped<IRepository<Meeting>, MartenRepository<Meeting>>();

services.AddScoped<IRequestHandler<CreateMeeting, Unit>, MeetingCommandHandler>();

services.AddScoped<IRequestHandler<GetMeeting, MeetingSummary>, MeetingQueryHandler>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

namespace MeetingsManagement.Meetings
{
public class Meeting : EventSourcedAggregate
internal class Meeting: Aggregate
{
public string Name { get; private set; }

public Meeting(){}
public Meeting()
{
}

private Meeting(Guid id, string name)
{
var @event = new MeetingCreated(id, name);
Append(@event);

Enqueue(@event);
Apply(@event);
}

Expand Down
Loading

0 comments on commit 013dbf7

Please sign in to comment.