Transaction Management With Mediator Pipelines in ASP.NET Core

Manage Entity Framework Core transactions when handling commands

When implementing a simplified CQRS service, which usually has a single database but still separates commands from queries, the mediator pipeline can be used to manage transactions when processing commands, ensuring changes will be implicitly committed unless an exception is thrown.

Implementing a pipeline for transactions has some key advantages even when using Entity Framework Core or other ORM that tracks and flushes changes at a single point in time.

Because the command handler is executing inside an implicit transaction, the developer is free to flush their changes as it sees fit knowing that any exception will rollback the changes. This is very helpful when it wants to cancel the flow when some business rules aren’t meet and could only be checked after changing the system state. Optimistic concurrency is a good example where you may want to flush your changes before leaving the handler so you know exactly what entity failed and can send a more detailed message to the user, which is the usual approach when implementing a dedicated Backend for Frontend service.

Also, ORMs are very good at abstracting database access but can’t solve more edge cases, specially when performance is a requirement. Being able to use a more lightweight library (e.g. Dapper) to do some bulk operations and using the ORM context connection inside the same transaction will remove the need to use the TransactionScope and reduce some application complexity.

Ensuring transactions are properly managed around the handler execution ensure the application architecture is be more robust and will be able to handle more edge cases.


The project

This article will be based on my previous ones that explained how to use the mediator and implement transversal behavior with pipelines. As a reminder, we implemented an endpoint to manage products with the following:

  • GET /products — search for products using some filters (SearchProductsQuery);
  • GET /products/{id} — get a product by its unique identifier (GetProductByIdQuery);
  • POST /products — create a product (CreateProductCommand and CreatedProductEvent);
  • PUT /products/{id} — update a product by its unique identifier (UpdateProductCommand and UpdatedProductEvent);
  • DELETE /products/{id} — delete a product by its unique identifier (DeleteProductCommand and DeletedProductEvent);

You can check them out here:

The source code is available on GitHub, feel free to give it a look.

Entity Framework Core transactions

Entity Framework Core has support for explicit transaction management, which works in a similar way to ADO.NET or the TransactionScope class.

You can get a disposable instance of IDbContextTransaction by invoking the method BeginTransactionAsync, available from the database property. The transaction can be either committed or rolled back, either by explicit invocation or by disposing the instance before any commit.

As stated before, this can be very helpful if SaveChanges must be invoked multiple times inside the unit of work operation and you want to ensure exceptions will rollback data changes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
await using var tx = await _context.Database.BeginTransactionAsync(ct);

var products = _context.Set<ProductEntity>();

await products.Where(p => p.Code.StartsWith('9')).ForEachAsync(product =>
{
product.Name += " [DEPRECATED]";
}, ct);

await _context.SaveChangesAsync(ct);

await products.ForEachAsync(product =>
{
product.Price /= 2;
}, ct);

await _context.SaveChangesAsync(ct);

await tx.CommitAsync(ct);

In the example above if an exception was thrown after the first SaveChangesAsync every changes would be reverted because the transaction would be disposed without an explicit call to commit.

We are going to leverage on this behavior to implement the pipeline.

The pipeline

Since only commands mutate system state, we are going to use the command pipeline to enforce a transaction for the next pipelines in the chain up until the handler.

The pipeline behavior will be:

  1. Intercept any command;
  2. Create a new IDbContextTransaction with BeginTransactionAsync;
  3. Invoke the next pipeline inside the using scope;
  4. If no exception is thrown, flush all changes and commit;
  5. Dispose the transaction;

I’ll also include some commented code incase you want to be sure the queries never mutate the system state. Prevention is better than cure…

Inside the Pipelines folder create a TransactionPipeline class extending Pipeline (because we only need some of the methods):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TransactionPipeline : Pipeline
{
private readonly ApiDbContext _context;

public TransactionPipeline(ApiDbContext context)
{
_context = context;
}

public override async Task OnCommandAsync<TCommand>(Func<TCommand, CancellationToken, Task> next, TCommand cmd, CancellationToken ct)
{
await using var tx = await _context.Database.BeginTransactionAsync(ct);

await next(cmd, ct);

await _context.SaveChangesAsync(ct);
await tx.CommitAsync(ct);
}

public override async Task<TResult> OnCommandAsync<TCommand, TResult>(Func<TCommand, CancellationToken, Task<TResult>> next, TCommand cmd, CancellationToken ct)
{
await using var tx = await _context.Database.BeginTransactionAsync(ct);

var result = await next(cmd, ct);

await _context.SaveChangesAsync(ct);
await tx.CommitAsync(ct);

return result;
}

//public override async Task<TResult> OnQueryAsync<TQuery, TResult>(Func<TQuery, CancellationToken, Task<TResult>> next, TQuery query, CancellationToken ct)
//{
// await using var tx = await _context.Database.BeginTransactionAsync(ct);

// var result = await next(query, ct);

// await tx.RollbackAsync(ct);

// return result;
//}
}

Open the Startup.cs file and add the TransactionPipeline after the ValidationPipeline, preventing the opening of transactions that could be immediately closed if the command had invalid data.

Since this examples are using the in-memory database provider for Entity Framework Core, which does not support explicit transactions, we are also going to ignore them for test purposes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();

services.AddSwaggerGen();

services.AddDbContext<ApiDbContext>(o =>
{
o.UseInMemoryDatabase("ApiDbContext").ConfigureWarnings(warn =>
{
warn.Ignore(InMemoryEventId.TransactionIgnoredWarning);
});
});

services.AddMediator(o =>
{
o.AddPipeline<LoggingPipeline>();
o.AddPipeline<TimeoutPipeline>();
o.AddPipeline<ValidationPipeline>();
o.AddPipeline<TransactionPipeline>();
foreach (var implementationType in typeof(Startup)
.Assembly
.ExportedTypes
.Where(t => t.IsClass && !t.IsAbstract))
{
foreach (var serviceType in implementationType
.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IValidator<>)))
{
o.Services.Add(new ServiceDescriptor(serviceType, implementationType, ServiceLifetime.Transient));
}
}
o.AddHandlersFromAssemblyOf<Startup>();
});
}

// ...
}

Now that we have a transversal behavior that manages transactions and flushes all context changes to the database, we can remove from all command handlers the explicit calls to SaveChangesAsync:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DeleteProductCommandHandler : ICommandHandler<DeleteProductCommand>
{
private readonly ApiDbContext _context;
private readonly IMediator _mediator;

public DeleteProductCommandHandler(ApiDbContext context, IMediator mediator)
{
_context = context;
_mediator = mediator;
}

public async Task HandleAsync(DeleteProductCommand cmd, CancellationToken ct)
{
var products = _context.Set<ProductEntity>();

var product = await products.SingleOrDefaultAsync(p => p.ExternalId == cmd.ProductId, ct);

if (product == null)
{
throw new InvalidOperationException($"Product '{cmd.ProductId}' not found");
}

products.Remove(product);

//await _context.SaveChangesAsync(ct);

await _mediator.BroadcastAsync(new DeletedProductEvent
{
ProductId = cmd.ProductId
}, ct);
}
}

Speeding things up!

Because this is a pipeline we use very often in most of our APIs, there is already a NuGet available that allows to open an explicit Entity Framework Core transaction for commands, events or queries, depending your needs.

To use it, just open the NuGet package manager and install the package SimpleSoft.Mediator.Microsoft.Extensions.EFCoreTransactionPipeline:

Open the Startup.cs file and register the pipeline with the extension method AddPipelineForEFCoreTransaction<TContext>. Explicit transactions are disabled by default, so you only need to enable them for commands.

When using all the NuGets mentioned on my previous articles, the Startup.cs file should be similar to this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();

services.AddSwaggerGen();

services.AddDbContext<ApiDbContext>(o =>
{
o.UseInMemoryDatabase("ApiDbContext").ConfigureWarnings(warn =>
{
// since InMemoryDatabase does not support transactions
// for test purposes we are going to ignore this exception
warn.Ignore(InMemoryEventId.TransactionIgnoredWarning);
});
});

services.AddMediator(o =>
{
o.AddPipelineForLogging(options =>
{
options.LogCommandResult = true;
options.LogQueryResult = true;
});
o.AddPipeline<TimeoutPipeline>();
o.AddPipelineForValidation(options =>
{
options.ValidateCommand = true;
options.ValidateEvent = true;
});
o.AddPipelineForEFCoreTransaction<ApiDbContext>(options =>
{
options.BeginTransactionOnCommand = true;
});
o.AddValidatorsFromAssemblyOf<Startup>();
o.AddHandlersFromAssemblyOf<Startup>();
});
}

// ..
}

Conclusion

I hope this article gave you a good idea on how to use mediator pipelines to simplify the management of Entity Framework Core transactions, making the implementation of command handlers less error prone since developers won’t forget to flush changes to the database (that behavior will be implicit unless an exception is thrown).

Even if your application is using another ORM or other lightweight libraries (e.g. Dapper), you can easily implement your own pipeline.

I also made an article about auditing user actions with pipelines, you may also find it helpful.