Using Mediator Pipelines in ASP.NET Core Applications

Intercept commands, queries and events to apply transversal behavior

In my previous article I explained how you could implement Command Query Responsibility Segregation (CQRS) and Event Sourcing (ES) patterns using a mediator instance from SimpleSoft.Mediator in ASP.NET Core applications.

This time I’m going to explain how to intercept commands, queries and events, making it possible to apply transversal behavior before they reach the handlers, reducing significantly the amount of boilerplate and duplicated code.

Transaction management, logging, auditing, caching or validations are some of the good examples where pipelines can be very helpful. Imagine the following chains:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Commands:
Mediator
LoggingPipeline
ValidationPipeline
TransactionPipeline
AuditPipeline
Handler

Queries:
Mediator
LoggingPipeline
Handler

Events:
Mediator
LoggingPipeline
StoragePipeline
Handler(s)

Not only you can plugin any pipeline at will without changing the handlers implementation, you can also be specific to the entities each pipeline is interested.

Pipelines

Mediator pipelines are represented by the interface IPipeline (or the abstract class Pipeline) with methods to intercept each entity type:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface IPipeline
{
Task OnCommandAsync<TCommand>(
Func<TCommand, CancellationToken, Task> next,
TCommand cmd,
CancellationToken ct
) where TCommand : class, ICommand;

Task<TResult> OnCommandAsync<TCommand, TResult>(
Func<TCommand, CancellationToken, Task<TResult>> next,
TCommand cmd,
CancellationToken ct
) where TCommand : class, ICommand<TResult>;

Task OnEventAsync<TEvent>(
Func<TEvent, CancellationToken, Task> next,
TEvent evt,
CancellationToken ct
) where TEvent : class, IEvent;

Task<TResult> OnQueryAsync<TQuery, TResult>(
Func<TQuery, CancellationToken, Task<TResult>> next,
TQuery query,
CancellationToken ct
) where TQuery : class, IQuery<TResult>;
}

You receive a delegate for the next pipe and the entity, so the concept is very simple: either do some work before or after invoking the next pipe, or break the chain by returning or throwing an exception.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IsItMondayAlreadyPipeline : Pipeline
{
public override async Task<TResult> OnQueryAsync<TQuery, TResult>(
Func<TQuery, CancellationToken, Task<TResult>> next,
TQuery query,
CancellationToken ct
)
{
if(DateTime.Now.DayOfWeek == DayOfWeek.Monday)
throw new InvalidOperationException("Today is monday, no data for you!");

return await next(query, ct);
}
}

Pipelines are executed by the same order they are added to the container so, to be deterministic, don’t scan the assemblies for classes implementing IPipeline.

Commands pipeline

Intercept commands sent into the mediator by implementing the methods OnCommandAsync:

Queries pipeline

Intercept queries being fetched from the mediator by implementing the methods OnQueryAsync:

Events pipeline

Intercept events before being broadcasted by the mediator into all the handlers by implementing the method OnEventAsync:


The project

To make it easier, we are going to leverage on the previous article and continue from there. As a reminder, we implemented an endpoint to manage products with the following:

  • GET /products — search for products using some filters (SearchProductsQuery);
  • GET /products/{id} — get a product by its unique identifier (GetProductByIdQuery);
  • POST /products — create a product (CreateProductCommand and CreatedProductEvent);
  • PUT /products/{id} — update a product by its unique identifier (UpdateProductCommand and UpdatedProductEvent);
  • DELETE /products/{id} — delete a product by its unique identifier (DeleteProductCommand and DeletedProductEvent);

The complete source code can be found on GitHub.

The pipelines

Since the objective of this article is to show how to implement pipelines in the mediator, the following should be enough:

  • LoggingPipeline — serializes every command, queries, events and results as JSON into the log if Trace is enabled;
  • TimeoutPipeline — cancels the handler execution if it takes more than a given set of time;

Start by creating a Pipelines folder at the project root level.

LoggingPipeline

Because we don’t want logging to affect the timeout, this will be the first pipeline to be added to the chain.

Inside the Pipelines folder, create a LoggingPipeline class implementing IPipeline. This pipeline will receive an ILogger instance and use System.Text.Json to serialize the objects:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class LoggingPipeline : IPipeline
{
private readonly ILogger<LoggingPipeline> _logger;
private readonly JsonSerializerOptions _serializerOptions = new JsonSerializerOptions
{
WriteIndented = true
};

public LoggingPipeline(ILogger<LoggingPipeline> logger)
{
_logger = logger;
}

public async Task OnCommandAsync<TCommand>(Func<TCommand, CancellationToken, Task> next, TCommand cmd, CancellationToken ct)
where TCommand : class, ICommand
{
Log("Command: {command}", cmd);

await next(cmd, ct);
}

public async Task<TResult> OnCommandAsync<TCommand, TResult>(Func<TCommand, CancellationToken, Task<TResult>> next, TCommand cmd, CancellationToken ct)
where TCommand : class, ICommand<TResult>
{
Log("Command: {command}", cmd);

var result = await next(cmd, ct);

Log("Command.Result: {commandResult}", result);

return result;
}

public async Task OnEventAsync<TEvent>(Func<TEvent, CancellationToken, Task> next, TEvent evt, CancellationToken ct)
where TEvent : class, IEvent
{
Log("Event: {event}", evt);

await next(evt, ct);
}

public async Task<TResult> OnQueryAsync<TQuery, TResult>(Func<TQuery, CancellationToken, Task<TResult>> next, TQuery query, CancellationToken ct)
where TQuery : class, IQuery<TResult>
{
Log("Query: {query}", query);

var result = await next(query, ct);

Log("Query.Result: {queryResult}", result);

return result;
}

private void Log<T>(string message, T instance)
{
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace(message, JsonSerializer.Serialize(instance, _serializerOptions));
}
}

Open the Startup.cs file and register the pipeline into the mediator:

1
2
3
4
5
6
services.AddMediator(o =>
{
o.AddPipeline<LoggingPipeline>();

o.AddHandlersFromAssemblyOf<Startup>();
});

Open the appsettings.development.json file and set the default log level to Trace:

1
2
3
4
5
6
7
8
9
{
"Logging": {
"LogLevel": {
"Default": "Trace",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}

If you now start the server and do some actions in the API, like creating or searching for products, you should start seeing your objects being serialized:

As a note, if you find this helpful for your APIs, you can use the NuGet SimpleSoft.Mediator.Microsoft.Extensions.LoggingPipeline. Just give a look at the Startup.cs file in the example API.

TimeoutPipeline

Inside the Pipelines folder, create a TimeoutPipeline class implementing IPipeline. For simplicity, the timeout value will be fixed, but could be received as an options from the container.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TimeoutPipeline : IPipeline
{
private static readonly TimeSpan Timeout = TimeSpan.FromMilliseconds(500);

public async Task OnCommandAsync<TCommand>(Func<TCommand, CancellationToken, Task> next, TCommand cmd, CancellationToken ct)
where TCommand : class, ICommand
{
using var cts = CreateCancellationTokenSource(ct);

await next(cmd, cts.Token);
}

public async Task<TResult> OnCommandAsync<TCommand, TResult>(Func<TCommand, CancellationToken, Task<TResult>> next, TCommand cmd, CancellationToken ct)
where TCommand : class, ICommand<TResult>
{
using var cts = CreateCancellationTokenSource(ct);

return await next(cmd, cts.Token);
}

public async Task OnEventAsync<TEvent>(Func<TEvent, CancellationToken, Task> next, TEvent evt, CancellationToken ct)
where TEvent : class, IEvent
{
using var cts = CreateCancellationTokenSource(ct);

await next(evt, cts.Token);
}

public async Task<TResult> OnQueryAsync<TQuery, TResult>(Func<TQuery, CancellationToken, Task<TResult>> next, TQuery query, CancellationToken ct)
where TQuery : class, IQuery<TResult>
{
using var cts = CreateCancellationTokenSource(ct);

return await next(query, cts.Token);
}

private static CancellationTokenSource CreateCancellationTokenSource(CancellationToken ct)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(Timeout);

return cts;
}
}

Once again, open the Startup.cs file and register the pipeline into the mediator after the logging pipe:

1
2
3
4
5
6
7
services.AddMediator(o =>
{
o.AddPipeline<LoggingPipeline>();
o.AddPipeline<TimeoutPipeline>();

o.AddHandlersFromAssemblyOf<Startup>();
});

If you now open any of your handlers and add an Task.Delay(1000, ct) into the HandleAsync methods you should receive a TaskCanceledException. I added one when searching for products:

If you look at the stack trace, you’ll see the invocation order was kept:

1
2
3
4
5
ProductsController
MicrosoftFetcher (internal class used by the mediator)
LoggingPipeline
TimeoutPipeline
SearchProductsQueryHandler

The project structure with pipeline classes:

The final Startup.cs file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();

services.AddSwaggerGen();

services.AddDbContext<ApiDbContext>(o =>
{
o.UseInMemoryDatabase("ApiDbContext");
});

services.AddMediator(o =>
{
o.AddPipeline<LoggingPipeline>();
o.AddPipeline<TimeoutPipeline>();
o.AddHandlersFromAssemblyOf<Startup>();
});
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseDeveloperExceptionPage();

app.UseSwagger();

app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "Mediator ExampleApi V1");
});

app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}

Conclusion

I hope this article gave you a good idea about mediator pipelines and how you can use them to empower your solutions, making them more resilient to developer mistakes (like forgetting to save changes into the database).

For me, this is one of the strongest aspects of the mediator pattern and one of the main reason I always use this pattern even when implementing minimum viable products.