Rate Limiting Streaming gRPC Calls

In this post I investigate implementing rate limiting with .NET 7 for gRPC streaming messages. In my previous post I looked into how to apply rate limiting on gRPC requests (unary or streaming).

When we apply the ASP.NET Core rate limiting middleware to gRPC services, we can limit the rate of incoming requests. However, if the request is a long running (client stream, server streaming or duplex) we cannot limit the rate of messages sent.

In this post I create a sample rate limiter for ASP.NET Core gRPC streaming services. The limiter is built on top of .NET 7's rate limiting API.

When would I need to rate limit the incoming messages? For example clients sending long running requests with streaming data can rate limited per endpoint or per resource accessed.

Interceptor

To interact with the incoming request and messages the request needs to be intercepted. Server interceptors provide access to the incoming requests for server-side calls. In the sample server-side rate limiter I will limit the rate of the messages that a client may send.

The interceptor needs to derive from the Interceptor base class. The sample interceptor below uses a Fixed Window Rate Limiter. The FixedWindowLimiter limits the number of resource accesses in a fixed windows period. When a period reaches to its end a new period is started, and the limit is reset.

public class RateLimitingInterceptor<TPartitionKey> : Interceptor
{
    private static PartitionedRateLimiter<ServerCallContext> _limiter;

    public RateLimitingInterceptor(Func<ServerCallContext, TPartitionKey> partitionKeyGenerator, FixedWindowRateLimiterOptions options)
    {
        _limiter ??= PartitionedRateLimiter.Create<ServerCallContext, TPartitionKey>(callContext =>
        {
            var key = partitionKeyGenerator(callContext);
            var partition = RateLimitPartition.GetFixedWindowLimiter(key, _ => options);
            return partition;
        });
    }

    public override Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
        IAsyncStreamReader<TRequest> requestStream,
        ServerCallContext context,
        ClientStreamingServerMethod<TRequest, TResponse> continuation)
    {nce
        return continuation(new RateLimitingAsyncStreamReader<TRequest>(requestStream, context, _limiter), context);
    }
}

The constructor of the above class creates a PartitionedRateLimiter. The first type argument of the Create method sets the type of the limited resource: the type of the object passed to AttemptAcquire and AcquireAsync methods. TPartitionKey generic parameter is the type of the key used to retrieve a rate limiter. Rate limiters can be separated (partitioned) by this key. In the example above, the user of this class provides a function Func<ServerCallContext, TPartitionKey> that creates a key based on the resource.

Every time AttemptAcquire or AcquireAsync methods are invoked the lambda function passed to the Create method is executed: a key is generated from the resource, and a (cached) rate limiter is retrieved for the corresponding key. If two requests/messages share the same key, they are limited by the same limiter. If the keys are different then independent limits are applied on the resource

For example, when the user passes a function that uses the Host property of the ServerCallContext, the rate limiter will act as a global limiter, as every request shares the same host name on a single service. However, when a user passes a function that uses the IP address of the client making the requests, the rate limiter will limit all clients sending requests from the same IP.

Notice, that extending the above example with further constructors will allow to utilize other rate limiting algorithms (sliding window, token based, etc.), as the rest of the code is polymorphic to the actual rate limiter implementation.

The above sample interceptor intercepts only the client streaming requests by overriding the ClientStreamingServerHandler method. The requestStream input argument represents an IAsyncEnumerable like type for reading the request messages. ServerCallContext context represents streaming request context. ClientStreamingServerMethod<TRequest, TResponse> is a method delegate to invoke the actual service implementation or next interceptor in the pipeline.

Rate Limiting Stream Reader

Inside the continuation a RateLimitingAsyncStreamReader is instantiated. This type implements an IAsyncStreamReader<TRequest> and decorates the underlying stream reader. Its Current property directly returns the result of the Current property from the underlying stream reader. The MoveNext method performs the rate limiting logic. First, it tries to acquire a lease synchronous manner. If it does not succeed, it tries to acquire it asynchronously. It could happen, that the read operation is queued to wait while the limit is reset. In this case async AcquireAsync method returns once there is a free lease to use. With an acquired lease the execution fetches the next item from the stream. It could be that there is no queuing enabled for the rate limiter, in this case a lease might not be possible to acquire. If a lease cannot be acquired an exception is thrown indicating resource exhaustion.

public struct RateLimitingAsyncStreamReader<TRequest> : IAsyncStreamReader<TRequest>
{
    private readonly IAsyncStreamReader<TRequest> _requestStream;
    private readonly ServerCallContext _serverCallContext;
    private readonly PartitionedRateLimiter<ServerCallContext> _limiter;

    public RateLimitingAsyncStreamReader(IAsyncStreamReader<TRequest> requestStream,
        ServerCallContext serverCallContext,
        PartitionedRateLimiter<ServerCallContext> limiter)
    {
        _requestStream = requestStream;
        _serverCallContext = serverCallContext;
        _limiter = limiter;
    }

    public TRequest Current => _requestStream.Current;

    public async Task<bool> MoveNext(CancellationToken cancellationToken)
    {
        using var lease = await TryAcquire(cancellationToken);
        if (!lease.IsAcquired)
            ThrowResourceExhausted();

        var result = await _requestStream.MoveNext(cancellationToken);
        return result;
    }

    private void ThrowResourceExhausted() =>
        throw new RpcException(new Status(StatusCode.ResourceExhausted, "Request rate exhauseted"));

    private ValueTask<RateLimitLease> TryAcquire(CancellationToken cancellationToken)
    {
        var lease = _limiter.AttemptAcquire(_serverCallContext);
        if (lease.IsAcquired)
            return ValueTask.FromResult(lease);

        return _limiter.AcquireAsync(_serverCallContext, cancellationToken: cancellationToken);
    }
}

Service Registrations

To setup the rate limiter with a gRPC server implementation, some changes have to be done in Program.cs. The way the types are registered affects to overall rate limiting behavior. In the sample below the RateLimitingInterceptor is added to SuperServiceImpl which is the type implementing the gRPC service.

The RateLimitingInterceptor is separately registered with the service container as singleton. Otherwise, every request made by clients would instantiate a new interceptor and corresponding limiter, and that would separate the rate limiter partitions per request invocation too. The interceptor has two constructor arguments:

  • Func<ServerCallContext, string> is a method that returns a rate limiter partition key, given the ServerCallContext as the input. In this case the Host property will act as a global limiter, while the FixedWindowRateLimiterOptions.QueueLimit limits the number of clients that may concurrently wait.

  • FixedWindowRateLimiterOptions allows to customize the built-in fixed window rate limiters behavior. This type is provided by .NET 7.

builder.Services.AddGrpc().AddServiceOptions<SuperServiceImpl>(options =>
{
    options.Interceptors.Add<RateLimitingInterceptor<string>>();
});
builder.Services.AddSingleton(
    new RateLimitingInterceptor<string>(
        (ServerCallContext ctx) => ctx.Host,
        new FixedWindowRateLimiterOptions() { AutoReplenishment = true, PermitLimit = 2, QueueLimit = 2, Window = TimeSpan.FromSeconds(10) }));

With the above rate limiter, one might wonder how gRPC applies a backpressure on the clients. The interceptor is rate limiting the reading of messages and above a limit throws an exception to terminate the connection. However, it does not directly control the number of messages a client may send. These messages would consume memory on the server while waiting for a lease, by idling in the HTTP buffer. The size of this buffer can be controlled by setting the InitialConnectionWindowSize and InitialStreamWindowSize properties. In the sample below I set the minimum allowed values, but with an application deployed to production careful measurements should be done to determine the optimal value.

builder.WebHost.ConfigureKestrel(options =>
{
    options.Limits.Http2.InitialConnectionWindowSize = 65536;
    options.Limits.Http2.InitialStreamWindowSize = 65536;
});

With this when the buffer is full, the client will not send further messages. With such changes we can apply a backpressure on the clients.