Audit commands and store events with transversal behavior
When implementing a web application, it can be a good idea to enforce some kind of auditing to all of your client interactions, either to track their behavior over time, to ensure any security breach will be properly logged, or just to help analyze system bugs.
Previously we talked about using the mediator pattern to implement the Command Query Responsibility Segregation (CQRS) and Event Sourcing (ES) and how pipelines could be used to implement transversal behavior to your application.
Since commands are responsible to mutate the system state, in this article I’m going to demonstrate how you could implement an audit pipeline to ensure all commands will be stored into a table. Because a variable number of events can be broadcasted when the state changes, the pipeline will also store them into another table and with a reference to the command, ensuring any correlation can be analyzed.
The project
From my previous articles, were I explained how to use the mediator and implement transversal behavior with pipelines, we are going to continue and expand the source code to audit commands and store into the events table anything broadcasted by the mediator without having a specific handler for each event.
As a reminder, we implemented an endpoint to manage products with the following:
GET /products — search for products using some filters (SearchProductsQuery); GET /products/{id} — get a product by its unique identifier (GetProductByIdQuery); POST /products — create a product (CreateProductCommand and CreatedProductEvent); PUT /products/{id} — update a product by its unique identifier (UpdateProductCommand and UpdatedProductEvent); DELETE /products/{id} — delete a product by its unique identifier (DeleteProductCommand and DeletedProductEvent);
The source code is available on GitHub, feel free to give it a look.
Auditing
Since in this article we will only audit API actions that mutate state, we are going to intercept commands and store information we find relevant into a specific table:
ExternalId — the unique identifier for each command, available via Command.Id or Command<TResult>.Id;
Name — the command type name from typeof(TCommand).Name;
Payload — the command serialized as JSON;
Result — if available, the command result serialized as JSON;
CreatedOn — date and time when the command was sent into the mediator, available via Command.CreatedOn or Command<TResult>.CreatedOn;
CreatedBy — username from the current request user property, available via Command.CreatedBy or Command<TResult>.CreatedBy;
ExecutionTime — elapsed time the handler spent processing the command;
Because events are broadcasted by commands, which are now audited into the database, we are also going to extend the events table and introduce the foreign key CommandId, referencing the commands.
The Database Model
Inside the Database folder create a CommandEntity class and add the new CommandId property to the existing EventEntity:
By design, all POCOs provided in this library are immutable and only provide a protected setter for the properties Id, CreatedOn and CreatedBy. This ensures the developer is free to decide either by immutable commands, queries and events, initializing all properties in the constructor, or to expose a public setter instead.
Since we haven’t made our POCOs immutable, and for demo purposes, we are going to expose a public setter for the CreatedBy property by implementing our own command, query and event classes.
Inside the Handlers folder create a Command.cs, Query.cs and Event.cs files and extend the corresponding Command, Command<TResult>, Query<TResult> and Event classes, creating a new setter ao getter for CreatedBy. Since your classes have the same name than the ones provided by Simplesoft.Mediator, your existing classes will automatically extend from them and expose the new setters without a single change:
We also want to pass the same username to all of our events, so open the command handlers and set the event CreatedBy property with the same value from the command, as exemplified by the following handler:
await _mediator.BroadcastAsync(new CreatedProductEvent { ExternalId = externalId, Code = cmd.Code, Name = cmd.Name, Price = cmd.Price, CreatedBy = cmd.CreatedBy // use the same value }, ct);
returnnew CreateProductResult { Id = externalId }; } }
The Audit Pipeline
Now that we are passing the user information into the mediator we can create the audit pipeline that will have the following behavior when intercepting commands:
Serialize and insert a new entry into the commands table;
Add both the command and entry ids into an AsyncLocal<T> scope to be used if an event is broadcast;
Invoke the next pipe;
If available, serialize the result, calculate the execution time and update the table entry;
When intercepting events, which are sent by commands, it will do the following:
Get the command id from the current AsyncLocal<T> scope;
Serialize the event and insert a new entry into the events table, referencing the command entry;
Invoke the next pipe;
Inside the Pipelines folder, create an AuditPipeline class extending Pipeline. The implementation should be similar to the following:
services.AddDbContext<ApiDbContext>(o => { o.UseInMemoryDatabase("ApiDbContext").ConfigureWarnings(warn => { // since InMemoryDatabase does not support transactions // for test purposes we are going to ignore this exception warn.Ignore(InMemoryEventId.TransactionIgnoredWarning); }); });
Because this pipeline is also serializing all events, the existing handlers for CreatedProductEvent, DeletedProductEvent and UpdatedProductEvent can now either be deleted or stop storing their events into the table to prevent duplicated data:
When comparing the audit pipeline with the previous ones we implemented, the biggest difference is the usage of AsyncLocal<T> to store an instance of the CommandScope class holding both the command external id and the primary key value for the audit entry into the table.
If you aren’t familiar with this class, it is available since .NET Framework 4.6 and .NET standard 1.3, and was introduced to help sharing global flow state when implementing asynchronous code with Task Parallel Library (TPL). Because TPL relies on the thread pool and, by default, asynchronous code in ASP.NET Core applications can be resumed by any available thread, we can’t rely on mechanisms like the ThreadLocal class to store global state.
Simply put, the idea of AsyncLocal<T> is to create a static instance that can hold some T value and, as long you use the async and await keywords, the runtime will consider your code execution to be a logical flow, despite asynchronous, and will ensure the value is shared even if the flow has been resumed by a different thread.
Because we want to share data between the command and event interceptor code, the flow is asynchronous, and since only commands broadcast events, the AsyncLocal<T> class is an elegant solution to prevent changing all the events to include an CommandId property that has to be set on every broadcast.
As an example, this is usually the solution implemented by some logging frameworks to support the creation of scopes, enabling some information to be written on every log without having to pass it every time, like when the using Microsoft façade Logger.BeginScope("X:{x} Y:{y}", x, y).
For more details and examples, give a look to the AsyncLocal<T> class documentation.
Audits Controller
To make it easier to test and check our system audits, we are going to implement the following endpoint:
GET /audits — search for command audits using some filters (SearchAuditsQuery); GET /audits/{id} — get a command audit by its unique identifier and all the associated events (GetAuditByIdQuery);
Inside the Handlers folder create an Audits folder and create the queries for searching or getting an audit by its external id:
Create, update or delete products with the help of Swagger UI and then check if all the commands and events have been properly audited:
And even get the details of a specific audit:
Conclusion
I hope this article gave you a good idea on how to use mediator pipelines to simplify the auditing of user actions without having to replicate code across all commands.
We also ensured events were always stored before being broadcasted and a reference to the command was kept without adding properties to our POCOs, providing a more clean approach.
Soon I’ll be explaining how we can inject more specialized interfaces, like the ISender<TCommand>, to make our dependencies more clearer and help with unit testing.