Using Mediator Pipelines in ASP.NET Core Applications
Intercept commands, queries and events to apply transversal behavior
In my previous article I explained how you could implement Command Query Responsibility Segregation (CQRS) and Event Sourcing (ES) patterns using a mediator instance from SimpleSoft.Mediator in ASP.NET Core applications.
This time I’m going to explain how to intercept commands, queries and events, making it possible to apply transversal behavior before they reach the handlers, reducing significantly the amount of boilerplate and duplicated code.
Transaction management, logging, auditing, caching or validations are some of the good examples where pipelines can be very helpful. Imagine the following chains:
Not only you can plugin any pipeline at will without changing the handlers implementation, you can also be specific to the entities each pipeline is interested.
Pipelines
Mediator pipelines are represented by the interface IPipeline (or the abstract class Pipeline) with methods to intercept each entity type:
You receive a delegate for the next pipe and the entity, so the concept is very simple: either do some work before or after invoking the next pipe, or break the chain by returning or throwing an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicclassIsItMondayAlreadyPipeline : Pipeline { publicoverrideasyncTask<TResult> OnQueryAsync<TQuery, TResult>( Func<TQuery, CancellationToken, Task<TResult>> next, TQuery query, CancellationToken ct ) { if(DateTime.Now.DayOfWeek == DayOfWeek.Monday) thrownew InvalidOperationException("Today is monday, no data for you!");
returnawait next(query, ct); } }
Pipelines are executed by the same order they are added to the container so, to be deterministic, don’t scan the assemblies for classes implementing IPipeline.
Commands pipeline
Intercept commands sent into the mediator by implementing the methods OnCommandAsync:
Queries pipeline
Intercept queries being fetched from the mediator by implementing the methods OnQueryAsync:
Events pipeline
Intercept events before being broadcasted by the mediator into all the handlers by implementing the method OnEventAsync:
The project
To make it easier, we are going to leverage on the previous article and continue from there. As a reminder, we implemented an endpoint to manage products with the following:
GET /products — search for products using some filters (SearchProductsQuery);
GET /products/{id} — get a product by its unique identifier (GetProductByIdQuery);
POST /products — create a product (CreateProductCommand and CreatedProductEvent);
PUT /products/{id} — update a product by its unique identifier (UpdateProductCommand and UpdatedProductEvent);
DELETE /products/{id} — delete a product by its unique identifier (DeleteProductCommand and DeletedProductEvent);
Since the objective of this article is to show how to implement pipelines in the mediator, the following should be enough:
LoggingPipeline — serializes every command, queries, events and results as JSON into the log if Trace is enabled;
TimeoutPipeline — cancels the handler execution if it takes more than a given set of time;
Start by creating a Pipelines folder at the project root level.
LoggingPipeline
Because we don’t want logging to affect the timeout, this will be the first pipeline to be added to the chain.
Inside the Pipelines folder, create a LoggingPipeline class implementing IPipeline. This pipeline will receive an ILogger instance and use System.Text.Json to serialize the objects:
If you now start the server and do some actions in the API, like creating or searching for products, you should start seeing your objects being serialized:
Inside the Pipelines folder, create a TimeoutPipeline class implementing IPipeline. For simplicity, the timeout value will be fixed, but could be received as an options from the container.
If you now open any of your handlers and add an Task.Delay(1000, ct) into the HandleAsync methods you should receive a TaskCanceledException. I added one when searching for products:
If you look at the stack trace, you’ll see the invocation order was kept:
1 2 3 4 5
ProductsController MicrosoftFetcher (internal class used by the mediator) LoggingPipeline TimeoutPipeline SearchProductsQueryHandler
I hope this article gave you a good idea about mediator pipelines and how you can use them to empower your solutions, making them more resilient to developer mistakes (like forgetting to save changes into the database).
For me, this is one of the strongest aspects of the mediator pattern and one of the main reason I always use this pattern even when implementing minimum viable products.