Streaming and Authentication in gRPC (ASP.Net Core)

In this blog, I am going to cover Streaming and Authentication for gRPC services in ASP.Net Core. In my last blog post, I covered basic concepts of gRPC in ASP.Net core. I will strongly suggest going through the previous blog post before going through this one. As I will be using the project created on the last blog post as the starter for this post.

In today’s post on Streaming and Authentication in gRPC I will cover the following topics:

  • Server Streams in gRPC
  • Client Streams in gRPC
  • Authentication for gRPC Service

Server Streams in gRPC

As I discussed in my last blog, gRPC supports four types of remote procedure calls. Server streams are one of these types. In the last blog post, I just discussed Unary remote procedure calls.

In the case of Server streams remote procedure call implementation, the server sends a continuous stream of response back to the client. And the stream will either stop once the server is done with sending data or when the client asks to stop.

When will we use a gRPC Server Stream?

gRPC Server Stream will be very useful for building reactive service. Meaning if your service depends on a message bus or a queue for its logic, in that case, it makes a lot of sense for the server to stream response. The server will stream the processed data as and when the upstream data is available to the connected clients.

This will allow the client of the gRPC server also to react to changes in the server as and when it occurs, instead of polling the server in a time interval.

The client can decide when and how to process the server stream. This gives the client a lot of flexibility.

gRPC Streaming Server

Now, let us consider the case where we have a server that will send a continuous stream of integer to the client. This is a very simple example, but practically the server can return any data in the form of a stream.

To build this streaming server, I will add a new .proto file into my existing GrpcDemo.Server project. I will add the file into the Protos folder, just like the file I added in my last blog post. I will name the file as clientcount.proto.

In the service definition, I will have a single RPC method GetClientCount, which will not have any input. Hence in protobuf the parameter will be google.protobuf.Empty. To get this type, I will add the import statement import "google/protobuf/empty.proto"; in the clientcount.proto file.

And in the returns statement of the function, I will add the keyword stream before the ClientContent output type. This keyword will create the necessary pipeline under the hood to support the server stream.

And finally, I will create the ClientCount model which will have a single property Count of type integer.

syntax = "proto3";

import "google/protobuf/empty.proto";

option csharp_namespace = "GrpcClientCount";

service ClientCountProvider {
	rpc GetClientCount (google.protobuf.Empty) returns (stream ClientCount);
}

message ClientCount{
	int32 Count = 1;
}

Once I add the protobuf service definition to the project. Next, I will add the file in the project’s ItemGroup along with the other protobuf file.

<ItemGroup>
    <Protobuf Include="Protos\population.proto" GrpcServices="Server" />
    <Protobuf Include="Protos\clientcount.proto" GrpcServices="Server" />
</ItemGroup>

The Server code

Once the service definition protobuf file is created. It is time to create a service code.

I will create a new class ClientCountProviderService, which will derive from the autogenerated class ClientCountProvider.ClientCountProviderBase by the Grpc.Tools package.

The implementation of the class will be simple. A continuous while loop that will write an incremental number into the IServerStreamWriter provided by the gRPC infrastructure. And the loop will be exited only if the client sends a cancellation request.

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcClientCount;
using System.Threading.Tasks;

namespace GrpcDemo.Server
{
    public class ClientCountProviderService : ClientCountProvider.ClientCountProviderBase
    {
        public override async Task GetClientCount(Empty request, IServerStreamWriter<ClientCount> responseStream, ServerCallContext context)
        {
            var count = 0;
            while (!context.CancellationToken.IsCancellationRequested)
            {
                await responseStream.WriteAsync(new ClientCount { Count = count });
                count++;
            }
        }
    }
}

Once the service code is complete, it is time to configure the service class in the Startup class. To do that I will add the following line inside of the Configure method of Startup class.

app.UseEndpoints(e => e.MapGrpcService<ClientCountProviderService>());

The Client code

For the client project, I will use the existing GrpcDemo.Client project I created in my last blog post.

To start the client, first I will copy over the clientcount.proto from the server project since it is the service contract.

syntax = "proto3";

import "google/protobuf/empty.proto";

option csharp_namespace = "GrpcClientCount";

service ClientCountProvider {
	rpc GetClientCount (google.protobuf.Empty) returns (stream ClientCount);
}

message ClientCount{
	int32 Count = 1;
}

Once the file is copied over into the Protos folder of the GrpcDemo.Client project, I will add the following in the ItemGroup of the project.

<ItemGroup>
    <Protobuf Include="Protos\population.proto" GrpcServices="Client" />
    <Protobuf Include="Protos\clientcount.proto" GrpcServices="Client" />
</ItemGroup>

Once the project update is complete, it is time to create the client code. The channel creation will remain the same as before. For the client object, now I will create an instance of the ClientCountProvider.ClientCountProviderClient class.

Since the server is sending a continuous stream of number and will be waiting on the client to send a cancellation request. Hence I will create a new instance of CancellationTokenSource with a time span of 5 seconds. And when I call the GetClientCount method from the server I will send the instance of CancellationTokenSource.

Next, inside a try-catch block, I will read the data from the ResponseStream in a loop. And after 5 seconds, the loop will break due to the CancellationTokenSource sent to the server. And it will cause an exception, which is expected. I will catch and print the exception in the console output.

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace GrpcDemo.Client
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var channel = GrpcChannel.ForAddress("https://localhost:5001/");
            var client = new GrpcClientCount.ClientCountProvider.ClientCountProviderClient(channel);

            var token = new CancellationTokenSource(TimeSpan.FromSeconds(5));
            using var population = client.GetClientCount(new Empty(), cancellationToken: token.Token  );
            try
            {
                await foreach (var item in population.ResponseStream.ReadAllAsync(token.Token))
                { Console.WriteLine(item.Count); }
            }
            catch(RpcException exc)
            {
                Console.WriteLine(exc.Message);
            }
        }
    }
}

Running the application

Now, I will first run the server.

gRPC Server Stream
Server

In the console output, I can see ClientCountProvider is running as ServerStreaming as expected.

Since the server is now up and running, I will run the client.

gRPC Client
Client Response

In the client response, I can see the printing of the continuous number. Also, in the end, we got the expected error from the server with the message “Call canceled by the client”. We got this since we send a cancellation token which expires after 5 seconds.

Client Streams in gRPC

Now that we know how server stream works, it is time to explore Client stream. Which is another type of RPC supported by gRPC. To demonstrate that I will update the PopulationProvider service I created in my last blog post.

Firstly, I will update the population.proto file. For the method GetPopulation, instead of taking a unary request input of PopulationRequest, I will update it to be a stream.

syntax = "proto3";

option csharp_namespace = "GrpcPopulation";

service PopulationProvider {
	rpc GetPopulation (stream PopulationRequest) returns (PopulationResponse);
}

message PopulationRequest{
	string State = 1;
}

message PopulationResponse{
	int64 Count = 1;
}

Once that is completed, I will update the server code to read the request stream and process.

Change in server code

I will update the PopulationService class to read the request stream. And for each item in the request stream, it will get the state’s population. And once the request stream is completed, it will return the sum of populations of all the states in the request.

using Grpc.Core;
using GrpcPopulation;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace GrpcDemo.Server
{
    public class PopulationService : PopulationProvider.PopulationProviderBase
    {
        private readonly IStatePopulationProvider statePopulationProvider;

        public PopulationService(IStatePopulationProvider statePopulationProvider)
        {
            this.statePopulationProvider = statePopulationProvider;
        }

        public override async Task<PopulationResponse> GetPopulation(IAsyncStreamReader<PopulationRequest> requestStream, ServerCallContext context)
        {
            var statePopulations = new List<long>();
            while (await requestStream.MoveNext())
            {
                var populationRequest = requestStream.Current;
                statePopulations.Add(statePopulationProvider.Get(populationRequest.State));
            }

            return new PopulationResponse { Count = statePopulations.Sum() };
        }
    }
}

Change in client code

In the client code, I will update it from being a single request object to write it in a request stream.

I will create a new private method ClientStream to achieve this. In the implementation, once I call the GetPopulation on the client object I will get a handle to the request stream.

I will loop through an array of states and write them into the request stream using the RequestStream.WriteAsync method. And once the loop is completed, I will call the RequestStream.CompleteAsync method to notify the server of the request stream completed.

And finally, I will get the response from the server and write it in the console output.

private static async Task ClientStream()
{
    var channel = GrpcChannel.ForAddress("https://localhost:5001/");

    var client = new GrpcPopulation.PopulationProvider.PopulationProviderClient(channel);

    using var populationRequest = client.GetPopulation();

    foreach (var state in new [] { "NY", "NJ", "MD", "KY"})
    {
        await populationRequest.RequestStream.WriteAsync(new GrpcPopulation.PopulationRequest
        {
            State = state
        });
    }

    await populationRequest.RequestStream.CompleteAsync();

    var response = await populationRequest.ResponseAsync;

    Console.WriteLine(response.Count);

}

The output of the Client Stream

Next, I will update the Main method in the client code to call the newly created ClientStream method.

static async Task Main(string[] args)
{
    await ClientStream();
}

Finally, I will run both the server and client applications. And I will see the expected output on the client. I will get a sum of the values from state NY, NJ, MD and KY, which is 100000 based on the values from StatePopulationProvider class. You can check my previous blog for the details of the StatePopulationProvider class. Just for a refresher, this class contains a in-memory dictionary of the state and its population (all fake numbers).

Client request stream
Client request stream output

Authentication for gRPC service

Authentication is a critical part of any application. I have a blog post on Authentication in ASP.Net Core, which goes in-depth on authentication. For this application, I am going to use JWT Token for authentication.

Firstly, I will configure the server project for authentication. To achieve this, I will add the NuGet package Microsoft.AspNetCore.Authentication.JwtBearer to the GrpcDemo.Server project.

Update to the Startup class

Firstly, I am going to set up the Startup class to enable authentication and authorization. For that, I will call UseAuthentication and UseAuthorization methods on the IApplicationBuilder instance inside the Configure method.

Once this is done, I am going to update the ConfigureServices method to set up JWT token handler middleware. To do that I will use the AddAuthentication method on the IServiceCollection instance. And after that, I will call the AddJwtBearer method to set up the JWT token. My blog post on Authentication goes in-depth on this.

After configuring the AddAuthentication method, I will call the AddAuthorization method on the IServiceCollection instance.

app.UseAuthentication();
app.UseAuthorization();
var key = Encoding.ASCII.GetBytes("This is my test private key");
services.AddAuthentication(x =>
    {
        x.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
        x.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
    })
    .AddJwtBearer(x =>
    {
        x.RequireHttpsMetadata = false;
        x.SaveToken = true;
        x.TokenValidationParameters = new TokenValidationParameters
        {
             ValidateIssuerSigningKey = true,
             IssuerSigningKey = new SymmetricSecurityKey(key),
             ValidateIssuer = false,
             ValidateAudience = false
         };
     });

     services.AddAuthorization();

Note: Since I will use an existing Authentication service which I created as a part of my authentication article, hence I will use the same key for the encryption of the JWT token (“This is my test private key”). This will ensure that the gRPC server is able to decrypt and allow the JWT token created by the JWT Authentication server.

Update ClientCountProviderService

I will update the ClientCountProviderService call to add the Authorize attribute.

And I will also add a Console.WriteLine statement to make sure that the user id I am going to use for creating the JWT token is actually the one coming to the gRPC server.

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcClientCount;
using Microsoft.AspNetCore.Authorization;
using System;
using System.Threading.Tasks;

namespace GrpcDemo.Server
{
    [Authorize]
    public class ClientCountProviderService : ClientCountProvider.ClientCountProviderBase
    {
        public override async Task GetClientCount(Empty request, IServerStreamWriter<ClientCount> responseStream, ServerCallContext context)
        {
            var count = 0;

            Console.WriteLine($"User name from JWT Token {context.GetHttpContext().User.Identity.Name}");

            while (!context.CancellationToken.IsCancellationRequested)
            {
                await responseStream.WriteAsync(new ClientCount { Count = count });
                count++;
            }
        }
    }
}

Update the Client code

Once the server is set up to accept JWT authentication token for authentication, it is time to update the client to send the token. I will update the existing GrpcDemo.Client with the new code.

The client can send the token one of two ways:

  • Sending token as a part of the header in the function parameter
  • Setting up the token as a part of the gRPC channel created
Token as a part of the header

Firstly, I will try the token as part of the header. To do so, I will create a new instance of Metadata. Once the instance of Metadata is created I will add Authorization as key and “Bearer token” as the value.

I will keep the token value as a fake string “token” to show that the server is sending 401 when an invalid token is passed.

var headers = new Metadata();
headers.Add("Authorization", "Bearer token");

Once that is done, I will change the call to GetClientCount method to add the header.

using var population = client.GetClientCount(
                new Empty(), 
                cancellationToken: token.Token,
                headers: headers);

Once I do that, the client is now ready to be tested out.

Testing Authentication with header token
gRPC Authentication client error
401 error

As we can see the client is sending HTTP Unauthenticated 401 response since we passed a fake token.

Now, I will run Postman to get a valid token from the JWT Authentication server.

Once I get the token, I will update the client code to set the proper token.

var headers = new Metadata();
headers.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1bmlxdWVfbmFtZSI6InRlc3QxIiwibmJmIjoxNTgxOTYyNzI0LCJleHAiOjE1ODE5NjYzMjQsImlhdCI6MTU4MTk2MjcyNH0.VvYln0PgZQrFwBTx0Ik3TGGI43DxdVVxzHAXma-K5P0");

Now, I will run the client application again. And I will see a proper response. It will print continuous integer numbers in increments from the server.

gRPC AUthentication proper response
gRPC Authentication client successful response

I will also verify the Console on the server to ensure it is indeed getting token for the user “test1”. The user id I used to create the token in Postman.

gRPC server authentication response
gRPC Server authentication response

And as expected we can see the “test1” user is printed on the server console.

Token in gRPC Channel

Sending token as a part of the method call is repetitive. And if we have to make multiple calls to the server, it is tedious. Instead, we can pass the token as a part of the channel configuration.

To achieve this, I will create a new instance of CallCredentials, using the static method FromInterceptor of the CallCredentials class. The FromInterceptor takes an async delegate as the parameter which takes AuthInterceptorContext and Metadata object as its input. Hence I will use the Metadata instance to set the authentication token.

var credentials = CallCredentials.FromInterceptor((c, m) => { 
    m.Add("Authorization",
                    "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1bmlxdWVfbmFtZSI6InRlc3QxIiwibmJmIjoxNTgxOTYyNzI0LCJleHAiOjE1ODE5NjYzMjQsImlhdCI6MTU4MTk2MjcyNH0.VvYln0PgZQrFwBTx0Ik3TGGI43DxdVVxzHAXma-K5P0");
                return Task.CompletedTask;
    });

Once the instance of CallCredentials is created, I will pass it as a part of the GrpcChannelOptions parameter to the GrpcChannel.ForAddress call.

var channel = GrpcChannel.ForAddress("https://localhost:5001/", 
                new GrpcChannelOptions {
                    Credentials = ChannelCredentials.Create(new SslCredentials(), credentials)
                });

Once the channel is configured with authentication, I will update the client call to remove the header parameter. So that the gRPC call uses authentication header from the channel.

using var population = client.GetClientCount(
                new Empty(), 
                cancellationToken: token.Token);
Testing Authentication with channel token

Once the channel code is complete I will run the application. And I will see the expected output.

client auth
Expected client gRPC authenticated response

Conclusion

In this blog post, I walked through server and client gRPC streams. And as you can see it is trivial to configure and use both. Both of these features are extremely powerful tools for building reactive microservices.

In terms of authentication, gRPC works with standard ASP.Net Core authentication middleware. Which makes it really easy to use any authentication provider. In the blog post I have used JWT, but really anything else could be used for authentication.

Source Code: https://github.com/choudhurynirjhar/gRPC

Youtube: https://youtu.be/XHHNVvS34PU