.NET — TaskCompletionSource and CancellationTokenSource

The importance of TCS and CTS for Task-based asynchronous programming

When Microsoft released .NET Framework 4.0 in April 2010, the Task Parallel Library (TPL) was introduced to help developers replace the previously used Asynchronous Programming Model (APM) pattern for a Task-based asynchronous programming.

Before the introduction of Tasks, when implementing asynchronous code, developers had to define two variations of the same method: one to begin the operation execution (convention: BeginOperationName), that would receive an optional callback to be invoked when completed, and another to wait for the operation to complete (convention: EndOperationName) and get the result or an exception, usually used inside the callback to prevent the main thread to be blocked.


As an example, imagine a repository of cars with an asynchronous method for getting one by a given plate number, implemented using the APM pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ICarRepository
{
IAsyncResult BeginGetByPlateNumber(
string plateNumber,
AsyncCallback callback,
object state
);

Car EndGetByPlateNumber(IAsyncResult ar);
}

// non-blocking usage
carRepository.BeginGetByPlateNumber(plateNumber, ar =>
{
var car = carRepository.EndGetByPlateNumber(ar);
}, null);

// blocking usage
var ar = carRepository.BeginGetByPlateNumber(plateNumber, null, null);
var car = carRepository.EndGetByPlateNumber(ar);

As you can see, creating two methods per operation is one of the most obvious and annoying disadvantages of the APM pattern, while the other is the need to implement your own wrapper for IAsyncResult so you can trigger the WaitHandle and invoke the callbacks when the operation completes.

Another disadvantage was the cancellation of running operations. If it was supported — and that’s a big if — there wasn’t a standardized pattern for developers to follow. Some would create another method for canceling (i.e. CancelOperationName) that receives an IAsyncResult, others would provide a method directly into the IAsyncResult and either return their own interface or require a cast, and others would simply ignore this feature due to sheer complexity.

Because Microsoft knew asynchronous programming was very important for the future of .NET, it decided to solve these problems by introducing the Task Parallel Library, making it easier for developers to add parallelism and concurrency to applications.

The Task Parallel Library has two central pieces:

  • The CancellationToken is a structure commonly used in asynchronous methods and enables developers to register a callback that will be invoked if a cancellation is requested. This provides a standardized approach for implementing asynchronous operations that can be canceled mid execution by simply receiving a CancellationToken as a method parameter.
  • The Task and Task<T> are classes that merge both IAsyncResult and AsyncCallback concepts. Developers can return a Task and the caller would either register a callback with ContinueWith, for a non-blocking approach, or use the method Wait/Result and block the main thread until the Task was completed. This removes the need to have callback parameters and only a single method is needed instead of BeginX/EndX methods.
    If we change the previous car repository example from APM pattern to use TPL, it would be much simpler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ICarRepository
{
Task<Car> GetByPlateNumberAsync(
string plateNumber,
CancellationToken ct
);
}

// non-blocking usage
carRepository.GetByPlateNumberAsync(
plateNumber,
CancellationToken.None
).ContinueWith(t =>
{
var car = t.Result;
});

// blocking usage
var car = carRepository.GetByPlateNumberAsync(
plateNumber,
CancellationToken.None
).Result;

Now that we have an idea about Task, Task<T>, CancellationToken and some of the reasons why Microsoft created the Task Parallel Library, let’s analyze two other important classes that aren’t commonly used but make all of this possible — TaskCompletionSource and CancellationTokenSource.

TaskCompletionSource

The class TaskCompletionSource is used to create a Task and provides methods to mark it as completed in one from three possible states:

  • RanToCompletion — the methods SetResult or TrySetResult complete the task successfully and, in case of a Task<T>, the result can be retrieved;
  • Canceled — the methods SetCanceled or TrySetCanceled mark the task as cancelled mid-execution. Waiting or retrieving the result will throw a TaskCanceledException;
  • Faulted — the methods SetException or TrySetException mark the task as faulted and waiting or retrieving the result will throw the exception.

With this class developers can easily implement Task-based asynchronous programming. On later versions of the .NET Framework it was widely used to convert classes implementing the APM pattern and, with an increase adoption of tasks, developers also started to migrate their own libs.

Let’s imagine the car repository was still implemented using the APM pattern and we wanted to create some extension methods for developers that prefer to use tasks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public interface ICarRepository
{
IAsyncResult BeginGetByPlateNumber(
string plateNumber,
AsyncCallback callback,
object state
);

Car EndGetByPlateNumber(IAsyncResult ar);
}

public static class CarRepositoryExtensions
{
public static Task<Car> GetByPlateNumberAsync(
this ICarRepository repository,
string plateNumber
)
{
var tcs = new TaskCompletionSource<Car>();

// use callback for non-blocking approach
repository.BeginGetByPlateNumber(plateNumber, ar =>
{
try
{
var car = repository.EndGetByPlateNumber(ar);

// no exception thrown, mark the task as completed successfully
tcs.SetResult(car);
}
catch (Exception e)
{
// exception thrown, mark the task as faulted
tcs.SetException(e);
}
}, null);

return tcs.Task;
}
}

As you can see, we just converted BeginX/EndX methods to tasks in just a few lines without blocking the caller thread.

The TaskCompletionSource is also perfect to convert Event-based Asynchronous Pattern (EAP). Imagine you have a class that represents a message queue and it provides two events — OnMessageReceived and OnErrorReceived — and a non-blocking method to send a message.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface IMessageQueue
{
event Action<object, OnMessageReceivedEventArgs> OnMessageReceived;

event Action<object, OnErrorReceivedEventArgs> OnErrorReceived;

void Send(Guid correlationId, JObject content);
}

public class OnMessageReceivedEventArgs : EventArgs
{
public Guid CorrelationId { get; init; }

public JObject Content { get; init; }
}

public class OnErrorReceivedEventArgs : EventArgs
{
public Guid CorrelationId { get; init; }

public Exception Exception { get; init; }
}

Let’s say we wanted to create an extension method that sends a message and, without blocking the current thread, waits for a correlated response or exception to be received.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class MessageQueueExtensions
{
public static async Task<JObject> SendAsync(
this IMessageQueue queue,
Guid correlationId,
JObject content
)
{
var tcs = new TaskCompletionSource<JObject>();

// register the events before sending the message
queue.OnMessageReceived += OnMessageReceived;
queue.OnErrorReceived += OnErrorReceived;
try
{
queue.Send(correlationId, content);

return await tcs.Task;
}
finally
{
// ensure events registration is always cleaned up
queue.OnMessageReceived -= OnMessageReceived;
queue.OnErrorReceived -= OnErrorReceived;
}

void OnMessageReceived(object _, OnMessageReceivedEventArgs args)
{
if (args.CorrelationId == correlationId)
tcs.TrySetResult(args.Content);
}

void OnErrorReceived(object _, OnErrorReceivedEventArgs args)
{
if (args.CorrelationId == correlationId)
tcs.TrySetException(args.Exception);
}
}
}

The approach is very similar to the APM pattern. We do a temporary registration into the relevant events, then invoke the action that can trigger those events and change the task state when callbacks are invoked. Just be sure registrations are properly cleaned up or else a memory leak will happen.

CancellationTokenSource

The class CancellationTokenSource is used to create a CancellationToken that can be manually marked as canceled using the method Cancel or, to support timeout implementations, the CancelAfter methods that will schedule a cancellation after the specified time as passed.

Using the previously defined GetByPlateNumberAsync extension method, let’s change it first to support cancellation using a CancellationToken. Please note that some performance improvements could be made when detecting for cancellation, but this implementation is for simplicity:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class CarRepositoryExtensions
{
public static async Task<Car> GetByPlateNumberAsync(
this ICarRepository repository,
string plateNumber,
CancellationToken ct
)
{
var tcs = new TaskCompletionSource<Car>();

await using var _ = ct.Register(() =>
{
// if the cancellation is triggered we try to cancel the task
tcs.TrySetCanceled(ct);
});

// use callback for non-blocking approach
repository.BeginGetByPlateNumber(plateNumber, ar =>
{
try
{
var car = repository.EndGetByPlateNumber(ar);

// because the task may have been cancelled first we
// must try to set a result or an exception will be thrown
tcs.TrySetResult(car);
}
catch (Exception e)
{
// same, task may have been cancelled first so we try
// to set an exception and mark it as faulted
tcs.TrySetException(e);
}
}, null);

return await tcs.Task;
}
}

Now that GetByPlateNumberAsync supports cancellation, if we wanted to limit the waiting for a response to a maximum of 5 seconds, a CancellationTokenSource could be used:

1
2
3
4
5
6
7
8
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(5));

var ct = cts.Token;

// if this operation takes more than 5 seconds to run
// a TaskCanceledException will be thrown
await carRepository.GetByPlateNumberAsync(plateNumber, ct);

The CancellationTokenSource also supports to be linked to an existing CancellationToken meaning that the underlying token can either be cancelled manually or by the linked token. This is usually useful to implement methods that offer some kind of timeout parameter without the caller having to use a CancellationTokenSource directly.

Let’s change the previously defined SendAsync extension method to support both cancellation using a token or a timeout parameter that will throw a TimeoutException if the waiting time is exceeded:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public static class MessageQueueExtensions
{
public static async Task<JObject> SendAsync(
this IMessageQueue queue,
Guid correlationId,
JObject content,
TimeSpan timeout,
CancellationToken ct
)
{
var tcs = new TaskCompletionSource<JObject>();

using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(timeout);

await using var _ = cts.Token.Register(() =>
{
if (ct.IsCancellationRequested)
{
// if ct was cancelled we want to throw a TaskCanceledException
tcs.TrySetCanceled(ct);
return;
}

// if ct wasn't cancelled but the cts token was triggered
// it can only mean the timeout parameter was exceeded
tcs.TrySetException(new TimeoutException(
$"The queue took more than '{timeout}' to return a message"
));
});

queue.OnMessageReceived += OnMessageReceived;
queue.OnErrorReceived += OnErrorReceived;
try
{
queue.Send(correlationId, content);

return await tcs.Task;
}
finally
{
queue.OnMessageReceived -= OnMessageReceived;
queue.OnErrorReceived -= OnErrorReceived;
}

void OnMessageReceived(object _, OnMessageReceivedEventArgs args)
{
if (args.CorrelationId == correlationId)
tcs.TrySetResult(args.Content);
}

void OnErrorReceived(object _, OnErrorReceivedEventArgs args)
{
if (args.CorrelationId == correlationId)
tcs.TrySetException(args.Exception);
}
}
}

In this case, we now use the method CreateLinkedTokenSource to ensure the CancellationTokenSource token will be cancelled if the ct parameter is cancelled or if a given timeout has passed. Because we are listening for the linked cts token, inside the callback we must check if the ct parameter was the cause of cancellation so we can either mark the task as canceled or faulted with a TimeoutException.

Conclusion

In this article I explained some of the reasons why Microsoft decided to replace the Asynchronous Programming Model (APM) pattern for the Task-based Asynchronous Programming, which is possible by using classes from the Task Parallel Library (TPL).

I also demonstrated how the classes TaskCompletionSource and CancellationTokenSource could be used to convert any asynchronous implementation to tasks, even if based on Event-based Asynchronous Pattern (EAP), ensuring threads won’t be blocked waiting for responses and even supporting the cancellation of running operations.