Streams, Pipes and gRPC

.NET 7 introduces new methods for Stream type: ReadExactly(...), ReadAtLeast(...) and their async counterparts allow the developers to await 'enough' data to be present for further processing input streams.

At the time of writing this post .NET 7 is still in preview bits. Which means there is still a chance that these methods will not be released.

Why do we have it?

At first these methods may seem a surplus, as developers have been using streams for many years, so why would we need them? There are several workloads in software stack that fits perfectly with this concept. Or rather say payloads instead of workloads.

TCP header has a minimum size of 20 bytes, with 1 field specifying the Data offset which tells how much optional fields are added to the TCP header. An application reading a TCP header could read the first 20 bytes (known length) and parse the Data offset field. Using the value of the Data offset field the application can read all the optional TCP fields before getting to the actual data section. Note, that it is known exactly at each step how much data should be read from the stream.

Another example may be an IPv6 packets which also has fixed header of size 40 bytes, with a field called Payload length which tells the sum of the extension headers' and payload's lengths. An application parsing a full IPv6 packet could then read the fix header, parse the Payload length field to then determine how much more data to read. In all steps it is known exactly how many bytes should be read from the input stream. In practice, it depends on the level of the network device handling the IPv6 packet to decide which fields of message requires parsing.

One counter example would the HTTP Protocol, in particular HTTP/1.1. While lower layers of the network stack packets has well-structured header section, HTTP client request does not. A request line is defined as:

A request-line begins with a method token, followed by a single space (SP), the request-target, another single space (SP), the protocol version, and ends with CRLF.

In this case the reader of the stream does not know exactly how many bytes to read, as it depends on the length of method token, request-target etc. One would need to keep buffering the input request until a single space or CLRF is read.

gRPC (gRPC Remote Procedure Calls) messages however has a fixed header size, which serves itself as an excellent candidate for testing the new API-s. A gRPC request consist of request headers (as H2 headers) and a Length-Prefixed-Message which has 5 bytes long header: 1 byte indicating if the message is compressed and 4 bytes indicating the length of the message. This way a developer may use ReadExactly() methods twice on the incoming data stream: first to read the header, next to read the remaining bytes of the message.

The rest if this post investigate using the ReadExactly() in the GrpcProxy project.

A lap around System.IO.Pipelines

System.IO.Pipelines is designed to ease to cost of parsing data streams in a highly efficient manner. System.IO.Pipelines uses the Pipe construct to enable efficient buffering of the stream. A PipeWriter may read the input stream to push all incoming data into the pipe. A PipeReader may read chunk of unparsed data to decide if a message may be parsed or further data should be read into the pipe. A pipe is designed so it helps parsing streams such as presented above for the HTTP Protocol.

GrpcProxy and Data Streams

In GrpcProxy the incoming data stream is forwarded between the server and the client, as well as parsed so it may be displayed to the user.

While parsing a Length-Prefixed-Message GrpcProxy has been using a PipeReader to read a message. This results in a rather lengthy code, which uses the underlying buffer of the pipe to parse the message:

  • checks if the buffer has at least the size of the 'header' (5 bytes)

  • if it does, it parses the 4 bytes indicating length of the message

  • checks if the buffer has at last the size of the message length + 5 byte long header

  • when a full message can be read, the message is deserialized and the PipeReader marks the stream segment as read, to instruct the pipe to release and re-use the underlying buffer(s)

When the buffer is not large enough to contain a full message, the PipeReader instructs the pipe to wait for more data while marking the current buffer(s) as unread. It is possible that the next available chunk of data pushed into the pipe does still not contain a full gRPC message, in which case the PipeReader needs to wait again. To handle this PipeReader is typically used in line with a while loop.

Streams and ReadExactlyAsync

Parsing the Length-Prefixed-Message lends itself as a perfect opportunity to use ReadExactly() methods. Here is a simplified version of such a method: it has an input of a Stream, a method deserializing the payload and returns type T as the deserialized message.

public async Task<T> ReadSingleMessageAsync<T>(this Stream input, Func<DeserializationContext, T> deserializer)
where T : class
{
    var headerBuffer = ArrayPool<byte>.Shared.Rent(HeaderSize);
    byte[]? payloadBuffer = null;
    try
    {
        await input.ReadExactlyAsync(headerBuffer, 0, HeaderSize);
        var compressed = ReadCompressedFlag(headerBuffer[0]);
        var messageLength = DecodeMessageLength(headerBuffer.AsSpan(1));

        if (messageLength > MaxReceiveMessageSize)
            throw new InvalidOperationException(ReceivedMessageExceedsLimitStatus.Detail);

        payloadBuffer = ArrayPool<byte>.Shared.Rent(messageLength);
        await input.ReadExactlyAsync(payloadBuffer, 0, messageLength);
        var result = ParseMessage(deserializer, new ReadOnlySequence<byte>(payloadBuffer, 0, messageLength), compressed);

        return result;
    }
    finally
    {
        ArrayPool<byte>.Shared.Return(headerBuffer);
        if (payloadBuffer != null)
            ArrayPool<byte>.Shared.Return(payloadBuffer);
    }
}

In the body of the method first an array is rented for buffering the header. Next the code reads exactly the size of the header and parses the compressed bit and message length. Knowing the length of the rest of the message a new array buffer is rented, in which the second ReadExactlyAsync() reads exactly the rest of the message. Finally, the message is deserialized and the rented arrays are returned to the pool.

Pros and Cons

Looking at this implementation compared to the one using PipeReader there are a few clear advantages to notice: the maintainability of the code seem to have improved. The lines of source code more than halved from 62 lines 27. The executable lines of code also decreased from 18 to 13 lines, which I interpret as having less noisy lines in the new implementation. However, the key difference is Cyclomatic Complexity, which decreased from 11 to 3. Overall, the code is shorter and has less branching which improves maintainability.

Note, that while the original implementation throws an exception when data is left in the pipe, but the message has already been read. Achieving that with streams is less clear, as viable solutions may-or-may not work depending on the underlying implementation of the stream.

Does a cleaner code result pair with a better performance as well?

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.22000
Intel Core i5-1035G4 CPU 1.10GHz, 1 CPU, 8 logical and 4 physical cores
.NET SDK=7.0.100-preview.5.22307.18
  [Host]     : .NET 7.0.0 (7.0.22.30112), X64 RyuJIT
  DefaultJob : .NET 7.0.0 (7.0.22.30112), X64 RyuJIT


|  Method |             Message |     Mean |     Error |    StdDev |   Median |  Gen 0 | Allocated |
|-------- |-------------------- |---------:|----------:|----------:|---------:|-------:|----------:|
|   Pipes | Lor(...)us. [28796] | 6.005 us | 0.1202 us | 0.2809 us | 5.886 us | 1.1597 |      4 KB |
| Streams | Lor(...)us. [28796] | 7.417 us | 0.0552 us | 0.0431 us | 7.408 us | 1.2665 |      4 KB |

Surprisingly (or maybe less surprisingly for well-trained eyes), the ReadExactlyAsync() based solution is less performant. To understand why, we need to understand how the input stream is handled. When data is avaialable on the input stream in GrpcProxy, that is pushed into a pipe. Another choice could be using MemoryStream, but that could be less performant on streaming gRPC calls, as it would still keep in memory a part of the stream that has already been processed (forwarded to the server/client respectively and deserialized). For this reason, a pipe is a better choice for the task. PipeReader also exposes an as AsStream() method that enabled reading the pipe as a Stream. Hence as the data sits in internal buffer(s) of the pipe, it can be read through the Stream API. However, ReadExactlyAsync() requires a buffer parameter, which means the data is copied into the buffer passed as an input argument. The allocation of this buffer may be amortized by using ArrayPool<byte>, but the copy (copying from the buffer(s) of the pipe into the buffer passed to ReadExactlyAsync() ) operation is unavoidable, which bears with an extra cost.

At the same time, while pipes may end up looping and repeatedly re-parsing headers of the same message, the solution does not need to copy the data itself.

After all the new methods on Stream provide an excellent way to improve readability of the code, for very tight loop of application logic, it might worth the extra complexity to gain further performance by using System.IO.Pipelines.