Mediator Pattern in ASP.NET Core Applications

Send commands, fetch queries, and broadcast events using a mediator pattern for CQRS+ES

For the last few years, Command Query Responsibility Segregation (CQRS) and Event Sourcing (ES) emerged as patterns that can help implement large scale systems, with some risky complexity, by having different models to read or mutate data while also using events as a single source of truth.

Since there are already great articles explaining CQRS or ES in great depth, I’m going to focus and show how you can decouple your application layers by only knowing POCOs and a mediator instance from SimpleSoft.Mediator.

Remember that, even if you are not using CQRS or ES to its full extend, just by ensuring a logical model segregation inside the project can make your life easier in the long run, even when implementing a simple Backend for Frontend (BFF) or a minimum viable product (MVP).

Commands, Queries, Events, Handlers and Mediator

Before showing some code, lets make a simple review of some core concepts about these patterns and the library we are going to use:

  • Commands - each action intended to change the system state, by creating, updating or deleting information, should be represented by a class implementing either ICommand or ICommand<TResult>, if you are using the mediator just for in-process decoupling and want to return a result synchronously. Examples: CreateProductCommand or DeleteUserCommand;
  • Queries - classes implementing IQuery<TResult> represent data reads that shouldn’t change the system state or else they must be considered commands. Examples: GetProductByIdQuery or SearchUsersQuery;
  • Events — representing system changes over time, these classes implement IEvent. Examples: ProductCreatedEvent or UpdatedUserEmailEvent;
  • Handlers — responsible for receiving the commands (ICommandHandler), queries (IQueryHandler) or events (IEventHandler), they implement the business behavior to be run. Examples: CreateProductCommandHandler or GetProductByIdQueryHandler;
  • Mediator — decouples the caller from the executor exposing generic methods to send commands, fetch queries or broadcast events, being responsible for finding the correct handler (or handlers, in case of an event) and is represented by the interface IMediator;

I also made articles explaining how pipelines work, how to enforce validations, how to manage transactions and how to audit user actions.


The project

The source code for this article can be found on GitHub.

Start by opening Visual Studio and creating an ASP.NET Core 3.1 Web Application with a name and at any location.

Choose an empty project since this is just a demo.

Install the Nuget Swashbuckle.AspNetCore so we can use Swagger to test the API:

Open the file Startup.cs and configure both MVC and Swagger, making it easier to test our web API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();

services.AddSwaggerGen();
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseDeveloperExceptionPage();

app.UseSwagger();

app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "Mediator ExampleApi V1");
});

app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}

The web API

Since the objective of this article is to show the mediator pattern, a simple endpoint to manage products should be enough:

  • GET /products — search for products using some filters;
  • GET /products/{id} — get a product by its unique identifier;
  • POST /products — create a product;
  • PUT /products/{id} — update a product by its unique identifier;
  • DELETE /products/{id} — delete a product by its unique identifier;

Create a Controllers folder with another Products folder inside, a ProductsController and the following models:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
[Route("products")]
public class ProductsController : ControllerBase
{
[HttpGet]
public async Task<IEnumerable<ProductModel>> SearchAsync([FromQuery] string filterQ, [FromQuery] int? skip, [FromQuery] int? take, CancellationToken ct)
{
throw new NotImplementedException();
}

[HttpGet("{id:guid}")]
public async Task<ProductModel> GetByIdAsync([FromRoute] Guid id, CancellationToken ct)
{
throw new NotImplementedException();
}

[HttpPost]
public async Task<CreateProductResultModel> CreateAsync([FromBody] CreateProductModel model, CancellationToken ct)
{
throw new NotImplementedException();
}

[HttpPut("{id:guid}")]
public async Task UpdateAsync([FromRoute] Guid id, [FromBody] UpdateProductModel model, CancellationToken ct)
{
throw new NotImplementedException();
}

[HttpDelete("{id:guid}")]
public async Task DeleteAsync([FromRoute] Guid id, CancellationToken ct)
{
throw new NotImplementedException();
}
}

public class CreateProductModel
{
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

public class CreateProductResultModel
{
public Guid Id { get; set; }
}

public class ProductModel
{
public Guid Id { get; set; }
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

public class UpdateProductModel
{
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

The project should look as follows:

Open the Swagger endpoint (ex: https://localhost:44380/swagger/index.html) and you should see your products endpoint with all the actions:

The database model

For demo purposes we are going to use Entity Framework Core with data stored in-memory.

Install the Nuget Microsoft.EntityFrameworkCore.InMemory:

Create a Database folder, the following database context and entities for products and events, to store both the business data and mutations history:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ApiDbContext : DbContext
{
public ApiDbContext(DbContextOptions<ApiDbContext> options) : base(options)
{

}

protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);

builder.Entity<ProductEntity>(cfg =>
{
cfg.HasKey(e => e.Id);
cfg.HasAlternateKey(e => e.ExternalId);
cfg.HasIndex(e => e.Code).IsUnique();
cfg.Property(e => e.ExternalId)
.IsRequired();
cfg.Property(e => e.Code)
.IsRequired()
.HasMaxLength(8);
cfg.Property(e => e.Name)
.IsRequired()
.HasMaxLength(128);
cfg.Property(e => e.Price)
.IsRequired();
});

builder.Entity<EventEntity>(cfg =>
{
cfg.HasKey(e => e.Id);
cfg.HasAlternateKey(e => e.ExternalId);
cfg.HasIndex(e => e.CreatedOn);
cfg.Property(e => e.Name)
.IsRequired()
.HasMaxLength(128);
cfg.Property(e => e.Payload)
.IsRequired();
cfg.Property(e => e.CreatedOn)
.IsRequired();
cfg.Property(e => e.CreatedBy)
.IsRequired()
.HasMaxLength(128);
});
}
}

public class EventEntity
{
public long Id { get; set; }
public Guid ExternalId { get; set; }
public string Name { get; set; }
public string Payload { get; set; }
public DateTimeOffset CreatedOn { get; set; }
public string CreatedBy { get; set; }
}

public class ProductEntity
{
public long Id { get; set; }
public Guid ExternalId { get; set; }
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

Open the Startup.cs file and add the context into the container as an in-memory database:

1
2
3
4
services.AddDbContext<ApiDbContext>(o =>
{
o.UseInMemoryDatabase("ApiDbContext");
});

The project should look as follows:

The mediator

Now that we have created the demo endpoint, models and database, its time to configure the mediator.

Install the Nuget SimpleSoft.Mediator.Microsoft.Extensions which is a specialized package for projects using Microsoft.Extensions.*:

Open the Startup.cs file and add the mediator into the container, scanning the current assembly for all classes implementing ICommandHandler, IQueryHandler and IEventHandler:

1
2
3
4
services.AddMediator(o =>
{
o.AddHandlersFromAssemblyOf<Startup>();
});

Open the ProductsController.cs file and inject into the constructor the IMediator instance:

1
2
3
4
5
6
private readonly IMediator _mediator;

public ProductsController(IMediator mediator)
{
_mediator = mediator;
}

The handlers

We now have everything we need to start implementing the business logic.

To keep it simple, we are not going to use the mediator pipelines, only focusing on commands, queries, events and their handlers. As stated before, I made some articles more specific for those use cases.

Start by creating a folder named Handlers with another inside Products in which we are going to put our POCOs and their handlers.

As a note, remember this is a demo article and this project structure should not be seen as a valid way to organize your solution in your private projects. Commands, queries, events and their handlers should be, at minimum, in different folders.

CreateProductComand

We need data so lets create a command that allows that and will be sent when a request for POST /products is received.

Create a class CreateProductResult that will be returned by the handler with the product unique identifier and a CreateProductCommand, implementing the class Command<CreateProductResult>:

1
2
3
4
5
6
7
8
9
10
11
public class CreateProductCommand : Command<CreateProductResult>
{
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

public class CreateProductResult
{
public Guid Id { get; set; }
}

Open the ProductsController.cs file and inside the method CreateAsync send the command into the mediator and map its result:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[Route("products")]
public class ProductsController : ControllerBase
{
private readonly IMediator _mediator;

public ProductsController(IMediator mediator)
{
_mediator = mediator;
}

// ...

[HttpPost]
public async Task<CreateProductResultModel> CreateAsync([FromBody] CreateProductModel model, CancellationToken ct)
{
var result = await _mediator.SendAsync(new CreateProductCommand
{
Code = model.Code,
Name = model.Name,
Price = model.Price
}, ct);

return new CreateProductResultModel
{
Id = result.Id
};
}

// ...
}

If we now invoked the endpoint, we would receive an exception saying that no ICommandHandler<CreateProductCommand, CreateProductResult> was found by the container.

We will fix that by creating the class CreateProductCommandHandler with the business logic for this action. For this demo, an InvalidOperationException with a custom message will be thrown when a duplicated code is received:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CreateProductCommandHandler : ICommandHandler<CreateProductCommand, CreateProductResult>
{
private readonly ApiDbContext _context;

public CreateProductCommandHandler(ApiDbContext context)
{
_context = context;
}

public async Task<CreateProductResult> HandleAsync(CreateProductCommand cmd, CancellationToken ct)
{
var products = _context.Set<ProductEntity>();

if (await products.AnyAsync(p => p.Code == cmd.Code, ct))
{
throw new InvalidOperationException($"Product code '{cmd.Code}' already exists");
}

var externalId = Guid.NewGuid();
await products.AddAsync(new ProductEntity
{
ExternalId = externalId,
Code = cmd.Code,
Name = cmd.Name,
Price = cmd.Price
}, ct);

await _context.SaveChangesAsync(ct);

return new CreateProductResult
{
Id = externalId
};
}
}

If you now try to create a product using the Swagger endpoint, you will receive the response with the unique identifier or an internal server error indicating a duplicated code.

The project should look as follows:

GetProductByIdQuery

Time to return a product when a GET /products/:id is received by the web API.

Inside the Handlers/Products folder, create a Product, the GetProductByIdQuery and its corresponding handler, GetProductByIdQueryHandler. Like in the previous handler, for this demo, we are going to throw an InvalidOperationException if the id is unknown:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class GetProductByIdQuery : Query<Product>
{
public Guid ProductId { get; set; }
}

public class Product
{
public Guid Id { get; set; }
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

public class GetProductByIdQueryHandler : IQueryHandler<GetProductByIdQuery, Product>
{
private readonly IQueryable<ProductEntity> _products;

public GetProductByIdQueryHandler(ApiDbContext context)
{
_products = context.Set<ProductEntity>();
}

public async Task<Product> HandleAsync(GetProductByIdQuery query, CancellationToken ct)
{
var product = await _products.SingleOrDefaultAsync(p => p.ExternalId == query.ProductId, ct);

if (product == null)
{
throw new InvalidOperationException($"Product '{query.ProductId}' not found");
}

return new Product
{
Id = product.ExternalId,
Code = product.Code,
Name = product.Name,
Price = product.Price
};
}
}

Open the ProductsController.cs file and inside the method GetByIdAsync fetch the query from the mediator and map its result:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[Route("products")]
public class ProductsController : ControllerBase
{
private readonly IMediator _mediator;

public ProductsController(IMediator mediator)
{
_mediator = mediator;
}

// ...

[HttpGet("{id:guid}")]
public async Task<ProductModel> GetByIdAsync([FromRoute] Guid id, CancellationToken ct)
{
var result = await _mediator.FetchAsync(new GetProductByIdQuery
{
ProductId = id
}, ct);

return new ProductModel
{
Id = result.Id,
Code = result.Code,
Name = result.Name,
Price = result.Price
};
}

// ...
}

If you now try to get a product using the Swagger endpoint, you will receive the product or an internal server error indicating an unknown identifier.

CreatedProductEvent

Finally, we are going to create an event that will be broadcast when a product is successfully created.

Just like the previous ones, create a CreateProductEvent and the handler CreateProductEventHandler that will serialize and store it into the events table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CreatedProductEvent : Event
{
public Guid ExternalId { get; set; }
public string Code { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

public class CreatedProductEventHandler : IEventHandler<CreatedProductEvent>
{
private readonly ApiDbContext _context;

public CreatedProductEventHandler(ApiDbContext context)
{
_context = context;
}

public async Task HandleAsync(CreatedProductEvent evt, CancellationToken ct)
{
await _context.Set<EventEntity>().AddAsync(new EventEntity
{
ExternalId = evt.Id,
Name = nameof(CreatedProductEvent),
Payload = JsonSerializer.Serialize(evt),
CreatedOn = evt.CreatedOn,
CreatedBy = evt.CreatedBy
}, ct);
}
}

Open the CreateProductCommandHandler file, inject the IMediator instance and broadcast the event before saving flushing all changes into the database:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CreateProductCommandHandler : ICommandHandler<CreateProductCommand, CreateProductResult>
{
private readonly ApiDbContext _context;
private readonly IMediator _mediator;

public CreateProductCommandHandler(ApiDbContext context, IMediator mediator)
{
_context = context;
_mediator = mediator;
}

public async Task<CreateProductResult> HandleAsync(CreateProductCommand cmd, CancellationToken ct)
{
var products = _context.Set<ProductEntity>();

if (await products.AnyAsync(p => p.Code == cmd.Code, ct))
{
throw new InvalidOperationException($"Product code '{cmd.Code}' already exists");
}

var externalId = Guid.NewGuid();
await products.AddAsync(new ProductEntity
{
ExternalId = externalId,
Code = cmd.Code,
Name = cmd.Name,
Price = cmd.Price
}, ct);

await _mediator.BroadcastAsync(new CreatedProductEvent
{
ExternalId = externalId,
Code = cmd.Code,
Name = cmd.Name,
Price = cmd.Price
}, ct);

await _context.SaveChangesAsync(ct);

return new CreateProductResult
{
Id = externalId
};
}
}

The project should look as follows:

Conclusion

I hope this article gave you a good idea on how to use the mediator to implement the CQRS and ES patterns even if you are implementing an MVP and can’t spend much time thinking about the architecture but you still want something that can be maintained and extended for some time.

Soon I’ll be explaining more advanced scenarios, like using the mediator pipeline to validate commands before reaching the handler, managing database transactions or even implementing transversal auditing into you application.