Await events

Some legacy API-s provide an event-based mechanism to achieve an asynchronous behavior. In this case typically a sync method starts a longer running async operation and an event notifies the consumer when the operation succeeds or fails.

For example, a hypothetical code below sends a message to queue and also subscribes an event for completion.

_queue.Completed += (sender, args) => // message is sent
_queue.Send(msg);

Code quickly gets complicated in case the caller wants to wait for the completion of the send operation. In more modern C# one would represent this operation with a Task. A Task could be then awaited at the call site:

await _queue.SendAsync(msg);

TaskCompletionSource

To covert a legacy API to a more modern Task based representation, one could apply the pattern described in post about using TaskCompletionSource.

The idea that the user of the API creates a TaskCompletionSource and calls the synchronous API, then returns the Task of the TaskCompletionSourceto the caller. The call site can await the completion of the returned task. When the event callback is triggered, it sets the results (or exception) on the TaskCompletionSource, which will complete the Task and enable the continuations to execute.

While this approach I would use in production code, a post about How Async/Await Really Works in C# got me thinking if I may search for alternative implementations. One such implementation could try to avoid allocations related to the async nature of the code.

IValueTaskSource

This blog post explores a theoretical approach. Please do not use this code in production. Why would I still explore then? Out of personal curiosity to understand the workings of async - await internals in C# and .NET 7. Why would I suggest this topic is controversial? The goal is to get rid of an allocation (or rather 2), which is required when using the TaskCompletionSource and Task types. However, the example described above sends some data to a message broker or a queue. Typically, this data is a byte[] or a string, which means serialization and allocation is involved. Savig two objects on such paths are usually a needle in a haystack situation. Considering the GC cleans up these objects efficiently before those getting promoted to higher generations, optimizing for the async allocation without further measurements is juvenile.

The above described 'hypothetical code' example seems to serve a good opportunity expore though. To elaborate further on the 'hypothetical code' example, there is a crucial nature to this API, that drives down on a path to consider IValueTaskSource. The queue (typed MagicQueue) in my operation is single threaded and the creation of new instances of this type is expensive. This lends them for pooling. Single threaded-ness in itself does not necessarily mean that the event-callback has to be awaited on every message sent. Hence we got the event callback, as otherwise the Send method call could also block until the acknowledgement of develivery received. Multiple messages could be sent without waiting for confirmation on them one-by-one. For example, we could employ a data structure to store all the async operations in-flight. However, there could be reasons for one waitig for confirmation for each message before sending the following one: one does not prefer to maintain such a data structure, one does not want outstanding queue acknowledgements if the queue gets disconnected etc. For this post, I assume that only a single outstanding message should exist at any given time per queue instance.

To avoid the allocation for the Task, the code returns a ValueTask. However, in case of Send actually completes asynchronously, something must be still allocated on the heap. ValueTask can be backed by a Task or an IValueTaskSource, and to avoid the allocation we can employ a single instance of ValueTaskSource corresponding to each MagicQueue. Notice, that now the queue and the corresponding IValueTaskSource can be pooled together. In the example below the user may pool the MessagePublisher instance:

class MessagePublisher
{
    private MagicQueue _queue = new MagicQueue();
    private ValueTaskSource _source = new ValueTaskSource();
     
    public MessagePublisher()
    {
        _queue.Completed += (_, __) => _source.SetCompleted();
    }

    public ValueTask PublishAsync()
    {
        // TODO: check if there is already an operations in progress
        _source.Reset();
        _queue.Send();
        return new ValueTask(_source, _source.Token);
    }
}

However, implementing an IValueTaskSource seems far from trivial, especially with the fact it needs to be reset between requests. This reset-ability makes the ValueTaskSource a viable solution to avoid the allocations. Also it is the one that imposes constraints on the way a user could publish messages. Otherwise we would need to maintain separate object pools.

Implementing ValueTaskSource by hand is not required as .NET has a built in struct that comes to help: ManualResetValueTaskSourceCore<T>. With this struct implementing ValueTaskSource is a call forwarded to ManualResetValueTaskSourceCore.

class ValueTaskSource : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _sourceCore = new ManualResetValueTaskSourceCore<bool>() { RunContinuationsAsynchronously = true };

    public short Token => _sourceCore.Version;

    public void Reset()
    {
        _sourceCore.Reset();
    }

    public void SetCompleted()
    {
        _sourceCore.SetResult(true);
    }

    public void GetResult(short token)
    {
        _sourceCore.GetResult(token);
    }

    public ValueTaskSourceStatus GetStatus(short token)
    {
        return _sourceCore.GetStatus(token);
    }

    public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _sourceCore.OnCompleted(continuation, state, token, flags);
    }
}

Conclusion

While the above solution may work, and based on the measurements it reduces the Task and TaskCompletionSource allocations, it has a limit on the throughput: a given queue instance can handle only a single outstanding message at a time: so overall we throughput is significantly limited. As earlier described the goal of this post is not provide an off-the-shelf coded solution, but to explore if the Task allocations may be amortized when wrapping event-based legacy APIs.