Intercepting Service Fabric remoting calls
01/14/2018
6 minutes
In the previous post I described how we can update remoting calls from V1 to V2 in Service Fabric. In this post, I am describing how we can intercept V2 remoting calls and add custom headers. I will share the related code samples for Stateful and Stateless services as-well-as Actors.
The goal will be to add a custom header entry on the client side, while the service side would read (and possibly use) the value of the header value.
Background
Services
Firstly, we want a service to add some kind of a header to the outgoing request. To implement this we can create a custom IServiceRemotingClient
public class CustomServiceRemotingClient : IServiceRemotingClient { private readonly IServiceRemotingClient _wrapped; public ResolvedServicePartition ResolvedServicePartition { get => _wrapped.ResolvedServicePartition; set => _wrapped.ResolvedServicePartition = value; } public string ListenerName { get => _wrapped.ListenerName; set => _wrapped.ListenerName = value; } public ResolvedServiceEndpoint Endpoint { get => _wrapped.Endpoint; set => _wrapped.Endpoint = value; } public CustomServiceRemotingClient(IServiceRemotingClient wrapped) { _wrapped = wrapped ?? throw new ArgumentNullException(nameof(wrapped)); } public Task<IServiceRemotingResponseMessage> RequestResponseAsync(IServiceRemotingRequestMessage requestRequestMessage) { byte[] headerValue = Guid.NewGuid().ToByteArray(); requestRequestMessage.GetHeader().AddHeader("HeaderName", headerValue); return _wrapped.RequestResponseAsync(requestRequestMessage); } public void SendOneWay(IServiceRemotingRequestMessage requestMessage) { byte[] headerValue = Guid.NewGuid().ToByteArray(); requestMessage.GetHeader().AddHeader("HeaderName", headerValue); _wrapped.SendOneWay(requestMessage); } }
The new client simply wraps another IServiceRemotingClient client, but when a service call is made it gets to the header of the message and adds a custom Guid value for the HeaderName
header item.
To use this client we will need to create a client factory which can create our client. The client factory will be passed to the Service Fabric framework.
The same approach is taken as for the remoting client, we wrap an IServiceRemotingClientFactory in our factory.
public class CustomTransportServiceRemotingClientFactory : IServiceRemotingClientFactory { private readonly IServiceRemotingClientFactory _wrapped; public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientConnected; public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientDisconnected; public CustomTransportServiceRemotingClientFactory(IServiceRemotingClientFactory wrapped) { _wrapped = wrapped ?? throw new ArgumentNullException(nameof(wrapped)); _wrapped.ClientConnected += WrappedClientConnected; _wrapped.ClientDisconnected += WrappedClientDisconnected; } public async Task<IServiceRemotingClient> GetClientAsync(Uri serviceUri, ServicePartitionKey partitionKey, TargetReplicaSelector targetReplicaSelector, string listenerName, OperationRetrySettings retrySettings, CancellationToken cancellationToken) { var client = await _wrapped.GetClientAsync(serviceUri, partitionKey, targetReplicaSelector, listenerName, retrySettings, cancellationToken); return new CustomServiceRemotingClient(client); } public async Task<IServiceRemotingClient> GetClientAsync(ResolvedServicePartition previousRsp, TargetReplicaSelector targetReplicaSelector, string listenerName, OperationRetrySettings retrySettings, CancellationToken cancellationToken) { var client = await _wrapped.GetClientAsync(previousRsp, targetReplicaSelector, listenerName, retrySettings, cancellationToken); return new CustomServiceRemotingClient(client); } public IServiceRemotingMessageBodyFactory GetRemotingMessageBodyFactory() { return _wrapped.GetRemotingMessageBodyFactory(); } public Task<OperationRetryControl> ReportOperationExceptionAsync(IServiceRemotingClient client, ExceptionInformation exceptionInformation, OperationRetrySettings retrySettings, CancellationToken cancellationToken) { var customClient = client as CustomServiceRemotingClient; if(customClient != null) { client = customClient.Wrapped; } return _wrapped.ReportOperationExceptionAsync(client, exceptionInformation, retrySettings, cancellationToken); } private void WrappedClientDisconnected(object sender, CommunicationClientEventArgs<iserviceremotingclient> e) { ClientDisconnected?.Invoke(sender, e); } private void WrappedClientConnected(object sender, CommunicationClientEventArgs<iserviceremotingclient> e) { ClientConnected?.Invoke(sender, e); } }
The wrapping in this case is even simpler. The only thing we do is get the wrapped ClientFactory create a client which we can pass into our CustomServiceRemotingClient to be wrapped. Then our new client is being returned.
To handle exceptions properly, ReportOperationExceptionAsync needs to cast the client back to the wrapped one.
Finally, when we create our ServiceProxyCache we can pass our new client factory:
new ServiceProxyFactory(c => new CustomTransportServiceRemotingClientFactory(new FabricTransportServiceRemotingClientFactory(remotingCallbackMessageHandler: c)));
Note, that FabricTransportServiceRemotingClientFactory is in this V2 namespace.
As a header is now added to the calls, let us switch to the service side of the call, and read the attached header from the request.
This time, we create a message handler, by deriving from ServiceRemotingMessageDispatcher
public class CustomServiceMessageHandler : ServiceRemotingMessageDispatcher, IServiceRemotingMessageHandler { public CustomServiceMessageHandler(IEnumerable<Type> remotingTypes, System.Fabric.ServiceContext serviceContext, object serviceImplementation, IServiceRemotingMessageBodyFactory serviceRemotingMessageBodyFactory = null) : base(remotingTypes, serviceContext, serviceImplementation, serviceRemotingMessageBodyFactory) { } public CustomServiceMessageHandler(System.Fabric.ServiceContext serviceContext, IService service, IServiceRemotingMessageBodyFactory serviceRemotingMessageBodyFactory = null) : base(serviceContext, service, serviceRemotingMessageBodyFactory) { } public override void HandleOneWayMessage(IServiceRemotingRequestMessage requestMessage) { if(requestMessage.GetHeader().TryGetHeaderValue("HeaderName", out byte[] headerValue)) { var myValue = new Guid(headerValue); } base.HandleOneWayMessage(requestMessage); } public override Task<IServiceRemotingResponseMessage> HandleRequestResponseAsync(IServiceRemotingRequestContext requestContext, IServiceRemotingRequestMessage requestMessage) { if(requestMessage.GetHeader().TryGetHeaderValue("HeaderName", out byte[] headerValue)) { var myValue = new Guid(headerValue); } return base.HandleRequestResponseAsync(requestContext, requestMessage); } }
In the HandleOneWayMessage and HandleRequestResponseAsync overrides we can access the header and read the header name item's value we attached at the client. Then we call the base to handle the rest for us.
Note, that in this case, if the header is not present, we simply pass the call along to the base.
return new[] { new ServiceReplicaListener(context=> new FabricTransportServiceRemotingListener(context, new CustomServiceMessageHandler(context, this))) };
When creating the ServiceReplicaListener/ServiceInstanceListener we pass in our custom message handler.
Actors
Actors solution on the client side is very similar to Stateful/Stateless message calls. We need to use an ActorProxyFactory
, where we can actually reuse our previously created wrapping client factory, but this time we wrap a FabricTransportActorRemotingClientFactory
. Make sure to the cache the ActorProxyFactory and reuse it across the application.
new ActorProxyFactory(c => new CustomTransportServiceRemotingClientFactory(new FabricTransportActorRemotingClientFactory(c)));
Actor calls on the 'server' side (on the side of the actor being requested) are different though. First, we create a message handler, similar to the one we already had for services, but this time we derive from ActorServiceRemotingDispatcher
:
public class CustomActorMessageHandler : ActorServiceRemotingDispatcher, IServiceRemotingMessageHandler { public CustomActorMessageHandler(ActorService actorService, IServiceRemotingMessageBodyFactory serviceRemotingRequestMessageBodyFactory) : base(actorService, serviceRemotingRequestMessageBodyFactory) { } public override void HandleOneWayMessage(IServiceRemotingRequestMessage requestMessage) { if(requestMessage.GetHeader().TryGetHeaderValue("HeaderName", out byte[] headerValue)) { var myHeaderValue = new Guid(headerValue); } base.HandleOneWayMessage(requestMessage); } public override Task<IServiceRemotingResponseMessage> HandleRequestResponseAsync(IServiceRemotingRequestContext requestContext, IServiceRemotingRequestMessage requestMessage) { if(requestMessage.GetHeader().TryGetHeaderValue("HeaderName", out byte[] headerValue)) { var myHeaderValue = new Guid(headerValue); } return base.HandleRequestResponseAsync(requestContext, requestMessage); } }
After this, a new ActorService type will need to be created, so we can pass our message handler to the actor service. Our new actor service will derive (and reuse) the ActorService type, and will need to be used typically in Program.cs where it is being registered by ActorRuntime.RegisterActorAsync
method call. This class is probably the less straightforward part:
public class CustomActorService : ActorService { public CustomActorService(StatefulServiceContext context, ActorTypeInformation actorTypeInfo, Func<ActorService, ActorId, ActorBase> actorFactory = null, Func<ActorBase, IActorStateProvider, IActorStateManager> stateManagerFactory = null, IActorStateProvider stateProvider = null, ActorServiceSettings settings = null) : base(context, actorTypeInfo, actorFactory, stateManagerFactory, stateProvider, settings) { } protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners() { return new[] { new ServiceReplicaListener( context => { var serializationProvider = new ServiceRemotingDataContractSerializationProvider(); var messageDispatcher = new CustomActorMessageHandler(this, serializationProvider.CreateMessageBodyFactory()); return new FabricTransportActorServiceRemotingListener(context, messageDispatcher, serializationProvider: serializationProvider); }, "V2Listener")}; } }
Here, we override the CreateServiceReplicaListeners
method (similar to services) and pass in our message handler to the actor remoting listener. The only extra thing we do compared to serives, that our message handler depends on a message body factory which is provided by ServiceRemotingDataContractSerializationProvider
.