Distributed Tracing with gRPC and OpenTelemetry

This blog post demonstrates distributed tracing for gRPC calls using OpenTelemetry . Application Insights and Zipkin exporters and services are used for visualizing spans .

gRPC is http based protocol. The client uses HttpClient to send requests, while the ASP.NET Core service is using standard HTTP pipeline to process requests and return responses. Both HttpClient and the HTTP pipeline are instrumented with System.Daignostics.DiagnosticListener . DiagnosticListener enables to subscribe to emitted events and activities. While events indicate a point in time notification, activities can describe a time interval. Both may use additional metadata attributes to describe the event or the time span.

For the collection of these events, I am going to use OpenTelemetry 's C# client, version 0.2.0-alpha.179 at the time of writing this post.

In this post, I will have a .net core console application as the client and an ASP.NET Core 3.1 service as the server.

Enabling gRPC

gRPC on Server Side

Let's start by creating a gRPC proto file with two operations, one for a standard request-response unary call, DoWork , and another for streaming, StreamWork . In this post, only the client is streaming data to the server. As the data/datatypes in the messages are less of a concern, only a string field is added.

syntax = "proto3";

option csharp_namespace = "Super";

service SuperService {
  rpc DoWork(RequestData) returns (ResponseData) {}
  rpc StreamWork(stream RequestData) returns (ResponseData) {}
}

message RequestData {
  string message = 1;
}

message ResponseData {
  string message = 1;
}

I added Grpc.AspNetCore nuget package and declared the proto file in the service projects's csproj:

<ItemGroup>
    <Protobuf Include="SuperService.proto" GrpcServices="Server" />
</ItemGroup>

To complete the service, extend Startup.cs by registering the gRPC dependencies and endpoint mappings:

services.AddGrpc();
app.UseEndpoints(endpoints =>
{
    endpoints.MapGrpcService<SuperServiceImpl>();
    endpoints.MapControllers();
});

Let's implement the service. It is not doing anything useful, other than echoing back the request message with "Hello" prefix in the unary call, and returning the number of streamed messages received in the streaming call.

public class SuperServiceImpl : SuperService.SuperServiceBase
{
    public override async Task<ResponseData> DoWork(RequestData request, ServerCallContext context)
    {
        await Task.Delay(50);
        return new ResponseData { Message = $"Hello {request.Message}" };
    }

public override async Task<ResponseData> StreamWork(IAsyncStreamReader<RequestData> requestStream, ServerCallContext context)
{
    int i = 0;
    await foreach (var item in requestStream.ReadAllAsync())
        if (!string.IsNullOrWhiteSpace(item.Message))
            i++;
    return new ResponseData { Message = $"Received {i}" };
}

A delay is added in DoWork , to emulate work on the server side.

gRPC on Client Side

Implementing a simple console client application is even simpler: create a new console app, and use Add -> Service Reference to generate a client proxy for the proto file. This will add a couple of nuget packages: Google.Protobuf, Grpc.Net.ClientFactory, Grpc.Tools and a reference to the proto file:

<Protobuf Include="..\Service\SuperService.proto" GrpcServices="Client">
    <Link>Protos\Contract.proto</Link>
</Protobuf>

There are other ways as well to create a client proxy, here I chose the simplest solution for this demonstration.

Next add some code to Program.cs to invoke the service endpoint:

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new SuperService.SuperServiceClient(channel);

// Unary call
var request = new RequestData() { Message = "Pocak" };
var response = await client.DoWorkAsync(request);
Console.WriteLine(response.Message);

// Streaming data
using var streamOfWork = client.StreamWork();
for (int i = 0; i < 5; i++)
{
    var request = new RequestData() { Message = "Pocak" };
    await Task.Delay(100);
    await streamOfWork.RequestStream.WriteAsync(request);
}
await streamOfWork.RequestStream.CompleteAsync();
Console.WriteLine((await streamOfWork).Message);

First, Grpc channel and client are created. Then I use the client to invoke the DoWork method. To stream data the StreamWork method returns an AsyncClientStreamingCall object, which can be used to access the request stream and to write messages into it. In the above sample, after sending 5 messages, the stream is completed and the response is written to the console. A delay is added in-between the messages to emulate work on the client side.

Enabling OpenTelemetry

In the next step I am going to add OpenTelemetry to the server and the client applications. OpenTelemetry collects diagnostics by default through the HttpClient and the ASP.NET Core Grpc endpoint mapping, so there is not much work to enable those sources (in .net core 3 and above). I will also add some custom diagnostic sources to enrich the collected data.

OpenTelemetry on the Server Side

On the server side add OpenTelemetry nuget packages, and extend Startup.cs by adding and configuring OpenTelemetry dependencies:

<PackageReference Include="OpenTelemetry.Collector.AspNetCore" Version="0.2.0-alpha.179" />
<PackageReference Include="OpenTelemetry.Collector.Dependencies" Version="0.2.0-alpha.179" />
<PackageReference Include="OpenTelemetry.Exporter.ApplicationInsights" Version="0.2.0-alpha.179" />
<PackageReference Include="OpenTelemetry.Exporter.Zipkin" Version="0.2.0-alpha.179" />
<PackageReference Include="OpenTelemetry.Hosting" Version="0.2.0-alpha.179" />

Notice, that I am adding Zipkin and Application Insights (AppInsights) exporters as well. Typically multiple collectors, but only one exporter would be added.

services.AddOpenTelemetry(builder =>
{
    builder
    .SetSampler(new AlwaysSampleSampler())
    .AddRequestCollector()
    .AddDependencyCollector()
    .AddCollector(tracer => new TelemetryDiagnosticSourceCollector(tracer, nameof(Service)))
    .SetResource(Resources.CreateServiceResource("Super Service"))
    .UseApplicationInsights(aiOptions =>
    {
        aiOptions.InstrumentationKey = "20acdfa2-some-guid-key-returned";
    })
    .UseZipkin(zipkinOptions =>
    {
        zipkinOptions.Endpoint = new Uri("http://myzipkinservice:9411/api/v2/spans");
        zipkinOptions.ServiceName = "Super Service";
    });
});

Configuring the telemetry consists of 3 steps:

  • configuring a sampler

  • registering collectors

  • registering exporters

Here I use AlwaysSampleSampler , which samples every event and activity. There are other samplers as well, but this is perfect for the demonstration. A request collector is added to enable collection of incoming data. A dependency collector is added to enable the collection of outgoing requests (we will see later how this affects the server side). I added an Application Insights exporter and configured it with my AppInsights Azure resource's instrumentation key; add a Zipkin exporter and configured it with its endpoint URL and a service name. Finally, there is one additional collector ( TelemetryDiagnosticSourceCollector ) to collect data from my DiagnosticListeners. To enable this, a custom collector and trace handler created.

public class TelemetryDiagnosticSourceCollector : IDisposable
{
    private readonly DiagnosticSourceSubscriber diagnosticSourceSubscriber;
    public TelemetryDiagnosticSourceCollector(Tracer tracer, string diagnosticsListenerName)
    {
        diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(n => new TelemetryDiagnosticSourceHandler(n, tracer),
            x => x.Name.StartsWith(diagnosticsListenerName), (_, __, ___) => true);
        diagnosticSourceSubscriber.Subscribe();
    }
    public void Dispose() => diagnosticSourceSubscriber?.Dispose();
}

The collector instantiates a DiagnosticSourceSubscriber . A DiagnosticSourceSubscriber is a built in type to subscribe to diagnostic events. A TelemetryDiagnosticSourceHandler and two Func<...,bool> are passed to its constructor. The first lambda returns if a given DiagnosticListener is enabled; here the DiagnosticListener's name is matched with a name passed to TelemetryDiagnosticSourceCollector constructor. The second lambda (_, __, ___) => true returns if a given diagnostics event within the diagnostic listener is enabled, here I enable all messages. TelemetryDiagnosticSourceHandler derives from the built in ListenerHandler type:

public class TelemetryDiagnosticSourceHandler : ListenerHandler
{
    public TelemetryDiagnosticSourceHandler(string name, Tracer tracer) : base(name, tracer) { }

    public override void OnStartActivity(Activity activity, object payload) => Tracer.StartSpanFromActivity(activity.OperationName, activity);

    public override void OnStopActivity(Activity activity, object payload)
    {
        base.OnStopActivity(activity, payload);
        Tracer.CurrentSpan.End();
    }
}

All it does is using the OpenTelemetry's Tracer to start and stop spans based on the OnStartActivity and OnStopActivity calls. The base call for base.OnStopActivity(activity, payload); sets Span attributes based on the tags added to activities.

I created a disposable ActivityEx type to wrap an Activity and call Stop method on it during dispose (not shown here for brevity). I also added a Timer(Activity, object) extension method to DiagnosticListener so we can easily create scopes of operations to measure in C# using the using pattern shown later.

public static ActivityEx Timer(this DiagnosticListener listener, Activity activity, object arg = null)
{
    if (listener.IsEnabled())
        listener.StartActivity(activity, arg);
    return new ActivityEx(activity, listener);
}

How can we use such an extension? By simply adding a using block around the operations to measure:

private static readonly DiagnosticListener _diagnostics = new DiagnosticListener(nameof(Service));

public override async Task<ResponseData> DoWork(RequestData request, ServerCallContext context)
{
    using (_diagnostics.Timer(new Activity("ValidatingRequest")))
    {
        await Task.Delay(50);
    }
    return new ResponseData { Message = $"Hello {request.Message}" };
}

public override async Task<ResponseData> StreamWork(IAsyncStreamReader<RequestData> requestStream, ServerCallContext context)
{
    int i = 0;
    using (_diagnostics.Timer(new Activity("CountingRequests")))
    {
        await foreach (var item in requestStream.ReadAllAsync())
            if (!string.IsNullOrWhiteSpace(item.Message))
                i++;
    }
    return new ResponseData { Message = $"Received {i}" };
}

The name given to DiagnosticListener ( _diagnostics field) is the same name enabled with TelemetryDiagnosticSourceCollector during application startup.

It is an antipattern to wrap every call within a using block. Instead we can use an ASP.NET Core middleware, a gRPC Interceptor and we can rely on HttpClient/Http pipeline's diagnostics as well. I am going use this last option, and utilize the built in http diagnostics.

OpenTelemetry on the Client Side

The client side code is following the same pattern as the server side. During startup, configure telemetry:

 using var tracerFactory = TracerFactory.Create(builder => builder
    .SetResource(Resources.CreateServiceResource("NameClient"))
    .AddDependencyCollector()
    .AddCollector(tracer => new TelemetryDiagnosticSourceCollector(tracer, nameof(NameClient)))
    .UseApplicationInsights(config => config.InstrumentationKey = "20acdfa2-some-guid-key-returned")
    .UseZipkin(zipkinOptions =>
    {
        zipkinOptions.Endpoint = new Uri("http://myzipkinservice:9411/api/v2/spans");
        zipkinOptions.ServiceName = nameof(NameClient);
    }));

We do not need to add request collectors here, as we do not excpect any incoming requests.

I add a diagnosticListener to Program.cs and use the same Timer extension method as the server uses to measure the duration of operations:

private static readonly DiagnosticListener _diagnostics = new DiagnosticListener("NameClient");
using (_diagnostics.Timer(new Activity("SendingRequest")))
{
    RequestData request;
    using (_diagnostics.Timer(new Activity("PreparingRequest").AddTag("Msg", "Pocak")))
    {
        request = new RequestData() { Message = "Pocak" };
        await Task.Delay(100);
    }
    var response = await client.DoWorkAsync(request);
    Console.WriteLine(response.Message);
}

Using Application Insights Collector

Let's see how requests-resposes and streaming calls look in Application Insights. I created a new AppInsights resource in Azure. Used its instrumentation key to configure the client and server applications. Start both the server and client apps on localhost, and send some requests. Telemetry has been captured and exported to AppInsights by the OpenTelemetry collector.

Navigate to the created Application Insights resource on Azure Portal. Select the Performance tab and adjust the observed time frame appropriately. By navigating to the Dependencies tab, we can select one operation and drill into the Samples . Azure Portal offers a Suggested Sample .

AppInsights selecting dependencis

Selecting the suggested sample draws an End-to-end transaction visualization. Metrics visualization for the unary gRPC call:

AppInsights Unary call

We may see here is 2 custom activities on the client side: SendingRequest and PreparingRequest . PreparingRequest is the inner activity selected, and displayed on the right hand side. Note the details showing the custom property Msg added to the telemetry metadata, by the activity tags in the code. Then a call is made to the server running on localhost:5001. The server is instrumented with one operation, ValidatingRequest . Also notice, that we have the Http instrumentation collected as well, the two rows with localhost are collected by the default http dependency and request collectors registered.

In a similar way we can take a look at the streaming diagnostics as well:

AppInsights streaming calls

Here it is worth pointing out, that there is one single Http call open for all 5 messages, so further instrumentation needs to be added on individual calls, to see the individual messages.

AppInisghts offers an Application Map view, where services are visualized as a graph nodes, and edges are calls made between the services.

AppInsights Application Map

Note, that Zipkin exporter has been disabled when the above screenshot was taken.

Using Zipkin Collector

OpenTelemetry enables multiple exporters, another one used in this post is Zipkin. Zipkin can be started as a process on localhost or in a docker container. For convenience, I used Azure Container Instances to start the Zipkin Docker container, but it could be just as simply hosted on localhost with docker as well. For the demonstration, I used the default configuration with in-memory storage.

Zipkin is purposely built to visualize distributed tracing, so in some sense it offers less features compared to AppInsights; although this does not make in any worse in the topic of distributed tracing.

On the Zipkin opening page we see all distributed traces in a list ordered by duration. I selected one of the unary gRPC calls, the below image shows the visualization of the traces. Note, that the activity tag Msg appears among the tags the same way as it appear in AppInsights.

Zipkin Unary call

Streamed requests visualization by Zipkin:

Zipkin streaming calls

Finally, Zipkin offers an application map visualization as well:

Zipkin Application Map

Conclusion

Combining gRPC's low cost for service-to-service communications and OpenTelemetry's collectors and exporters, we may esaily set up distributed tracing for applications. Instrumenting our code with further telemetry has become a lot easier as well with the help of the OpenTelemetry standards.

A careful reader may ask if the service and the client is configured to send data to Zipkin with http, should those requests also be collected by the DependencyCollector ? That is actually the case, when using Zipkin and AppInsights together, we have the calls to Zipkin show up on Application Map:

AppInsights Application Map with Zipkin