In this blog, I am going to cover Streaming and Authentication for gRPC services in ASP.Net Core. In my last blog post, I covered basic concepts of gRPC in ASP.Net core. I will strongly suggest going through the previous blog post before going through this one. As I will be using the project created on the last blog post as the starter for this post.
In today’s post on Streaming and Authentication in gRPC I will cover the following topics:
- Server Streams in gRPC
- Client Streams in gRPC
- Authentication for gRPC Service
Server Streams in gRPC
As I discussed in my last blog, gRPC supports four types of remote procedure calls. Server streams are one of these types. In the last blog post, I just discussed Unary remote procedure calls.
In the case of Server streams remote procedure call implementation, the server sends a continuous stream of response back to the client. And the stream will either stop once the server is done with sending data or when the client asks to stop.
When will we use a gRPC Server Stream?
gRPC Server Stream will be very useful for building reactive service. Meaning if your service depends on a message bus or a queue for its logic, in that case, it makes a lot of sense for the server to stream response. The server will stream the processed data as and when the upstream data is available to the connected clients.
This will allow the client of the gRPC server also to react to changes in the server as and when it occurs, instead of polling the server in a time interval.
The client can decide when and how to process the server stream. This gives the client a lot of flexibility.
gRPC Streaming Server
Now, let us consider the case where we have a server that will send a continuous stream of integer to the client. This is a very simple example, but practically the server can return any data in the form of a stream.
To build this streaming server, I will add a new .proto
file into my existing GrpcDemo.Server
project. I will add the file into the Protos
folder, just like the file I added in my last blog post. I will name the file as clientcount.proto
.
In the service definition, I will have a single RPC method GetClientCount
, which will not have any input. Hence in protobuf the parameter will be google.protobuf.Empty
. To get this type, I will add the import statement import "google/protobuf/empty.proto";
in the clientcount.proto
file.
And in the returns
statement of the function, I will add the keyword stream
before the ClientContent
output type. This keyword will create the necessary pipeline under the hood to support the server stream.
And finally, I will create the ClientCount
model which will have a single property Count
of type integer
.
syntax = "proto3";
import "google/protobuf/empty.proto";
option csharp_namespace = "GrpcClientCount";
service ClientCountProvider {
rpc GetClientCount (google.protobuf.Empty) returns (stream ClientCount);
}
message ClientCount{
int32 Count = 1;
}
Once I add the protobuf service definition to the project. Next, I will add the file in the project’s ItemGroup
along with the other protobuf file.
<ItemGroup>
<Protobuf Include="Protos\population.proto" GrpcServices="Server" />
<Protobuf Include="Protos\clientcount.proto" GrpcServices="Server" />
</ItemGroup>
The Server code
Once the service definition protobuf file is created. It is time to create a service code.
I will create a new class ClientCountProviderService
, which will derive from the autogenerated class ClientCountProvider.ClientCountProviderBase
by the Grpc.Tools
package.
The implementation of the class will be simple. A continuous while
loop that will write an incremental number into the IServerStreamWriter
provided by the gRPC infrastructure. And the loop will be exited only if the client sends a cancellation request.
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcClientCount;
using System.Threading.Tasks;
namespace GrpcDemo.Server
{
public class ClientCountProviderService : ClientCountProvider.ClientCountProviderBase
{
public override async Task GetClientCount(Empty request, IServerStreamWriter<ClientCount> responseStream, ServerCallContext context)
{
var count = 0;
while (!context.CancellationToken.IsCancellationRequested)
{
await responseStream.WriteAsync(new ClientCount { Count = count });
count++;
}
}
}
}
Once the service code is complete, it is time to configure the service class in the Startup
class. To do that I will add the following line inside of the Configure
method of Startup
class.
app.UseEndpoints(e => e.MapGrpcService<ClientCountProviderService>());
The Client code
For the client project, I will use the existing GrpcDemo.Client
project I created in my last blog post.
To start the client, first I will copy over the clientcount.proto
from the server project since it is the service contract.
syntax = "proto3";
import "google/protobuf/empty.proto";
option csharp_namespace = "GrpcClientCount";
service ClientCountProvider {
rpc GetClientCount (google.protobuf.Empty) returns (stream ClientCount);
}
message ClientCount{
int32 Count = 1;
}
Once the file is copied over into the Protos
folder of the GrpcDemo.Client
project, I will add the following in the ItemGroup
of the project.
<ItemGroup>
<Protobuf Include="Protos\population.proto" GrpcServices="Client" />
<Protobuf Include="Protos\clientcount.proto" GrpcServices="Client" />
</ItemGroup>
Once the project update is complete, it is time to create the client code. The channel creation will remain the same as before. For the client object, now I will create an instance of the ClientCountProvider.ClientCountProviderClient
class.
Since the server is sending a continuous stream of number and will be waiting on the client to send a cancellation request. Hence I will create a new instance of CancellationTokenSource
with a time span of 5 seconds. And when I call the GetClientCount
method from the server I will send the instance of CancellationTokenSource
.
Next, inside a try-catch block, I will read the data from the ResponseStream
in a loop. And after 5 seconds, the loop will break due to the CancellationTokenSource
sent to the server. And it will cause an exception, which is expected. I will catch and print the exception in the console output.
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace GrpcDemo.Client
{
class Program
{
static async Task Main(string[] args)
{
var channel = GrpcChannel.ForAddress("https://localhost:5001/");
var client = new GrpcClientCount.ClientCountProvider.ClientCountProviderClient(channel);
var token = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var population = client.GetClientCount(new Empty(), cancellationToken: token.Token );
try
{
await foreach (var item in population.ResponseStream.ReadAllAsync(token.Token))
{ Console.WriteLine(item.Count); }
}
catch(RpcException exc)
{
Console.WriteLine(exc.Message);
}
}
}
}
Running the application
Now, I will first run the server.
In the console output, I can see ClientCountProvider
is running as ServerStreaming
as expected.
Since the server is now up and running, I will run the client.
In the client response, I can see the printing of the continuous number. Also, in the end, we got the expected error from the server with the message “Call canceled by the client”. We got this since we send a cancellation token which expires after 5 seconds.
Client Streams in gRPC
Now that we know how server stream works, it is time to explore Client stream. Which is another type of RPC supported by gRPC. To demonstrate that I will update the PopulationProvider
service I created in my last blog post.
Firstly, I will update the population.proto
file. For the method GetPopulation
, instead of taking a unary request input of PopulationRequest
, I will update it to be a stream
.
syntax = "proto3";
option csharp_namespace = "GrpcPopulation";
service PopulationProvider {
rpc GetPopulation (stream PopulationRequest) returns (PopulationResponse);
}
message PopulationRequest{
string State = 1;
}
message PopulationResponse{
int64 Count = 1;
}
Once that is completed, I will update the server code to read the request stream and process.
Change in server code
I will update the PopulationService
class to read the request stream. And for each item in the request stream, it will get the state’s population. And once the request stream is completed, it will return the sum of populations of all the states in the request.
using Grpc.Core;
using GrpcPopulation;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace GrpcDemo.Server
{
public class PopulationService : PopulationProvider.PopulationProviderBase
{
private readonly IStatePopulationProvider statePopulationProvider;
public PopulationService(IStatePopulationProvider statePopulationProvider)
{
this.statePopulationProvider = statePopulationProvider;
}
public override async Task<PopulationResponse> GetPopulation(IAsyncStreamReader<PopulationRequest> requestStream, ServerCallContext context)
{
var statePopulations = new List<long>();
while (await requestStream.MoveNext())
{
var populationRequest = requestStream.Current;
statePopulations.Add(statePopulationProvider.Get(populationRequest.State));
}
return new PopulationResponse { Count = statePopulations.Sum() };
}
}
}
Change in client code
In the client code, I will update it from being a single request object to write it in a request stream.
I will create a new private method ClientStream
to achieve this. In the implementation, once I call the GetPopulation
on the client object I will get a handle to the request stream.
I will loop through an array of states and write them into the request stream using the RequestStream.WriteAsync
method. And once the loop is completed, I will call the RequestStream.CompleteAsync
method to notify the server of the request stream completed.
And finally, I will get the response from the server and write it in the console output.
private static async Task ClientStream()
{
var channel = GrpcChannel.ForAddress("https://localhost:5001/");
var client = new GrpcPopulation.PopulationProvider.PopulationProviderClient(channel);
using var populationRequest = client.GetPopulation();
foreach (var state in new [] { "NY", "NJ", "MD", "KY"})
{
await populationRequest.RequestStream.WriteAsync(new GrpcPopulation.PopulationRequest
{
State = state
});
}
await populationRequest.RequestStream.CompleteAsync();
var response = await populationRequest.ResponseAsync;
Console.WriteLine(response.Count);
}
The output of the Client Stream
Next, I will update the Main
method in the client code to call the newly created ClientStream
method.
static async Task Main(string[] args)
{
await ClientStream();
}
Finally, I will run both the server and client applications. And I will see the expected output on the client. I will get a sum of the values from state NY, NJ, MD and KY, which is 100000 based on the values from StatePopulationProvider
class. You can check my previous blog for the details of the StatePopulationProvider
class. Just for a refresher, this class contains a in-memory dictionary of the state and its population (all fake numbers).
Authentication for gRPC service
Authentication is a critical part of any application. I have a blog post on Authentication in ASP.Net Core, which goes in-depth on authentication. For this application, I am going to use JWT Token for authentication.
Firstly, I will configure the server project for authentication. To achieve this, I will add the NuGet package Microsoft.AspNetCore.Authentication.JwtBearer to the GrpcDemo.Server
project.
Update to the Startup class
Firstly, I am going to set up the Startup
class to enable authentication and authorization. For that, I will call UseAuthentication
and UseAuthorization
methods on the IApplicationBuilder
instance inside the Configure
method.
Once this is done, I am going to update the ConfigureServices
method to set up JWT token handler middleware. To do that I will use the AddAuthentication
method on the IServiceCollection
instance. And after that, I will call the AddJwtBearer
method to set up the JWT token. My blog post on Authentication goes in-depth on this.
After configuring the AddAuthentication
method, I will call the AddAuthorization
method on the IServiceCollection
instance.
app.UseAuthentication();
app.UseAuthorization();
var key = Encoding.ASCII.GetBytes("This is my test private key");
services.AddAuthentication(x =>
{
x.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
x.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(x =>
{
x.RequireHttpsMetadata = false;
x.SaveToken = true;
x.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuerSigningKey = true,
IssuerSigningKey = new SymmetricSecurityKey(key),
ValidateIssuer = false,
ValidateAudience = false
};
});
services.AddAuthorization();
Note: Since I will use an existing Authentication service which I created as a part of my authentication article, hence I will use the same key for the encryption of the JWT token (“This is my test private key”). This will ensure that the gRPC server is able to decrypt and allow the JWT token created by the JWT Authentication server.
Update ClientCountProviderService
I will update the ClientCountProviderService
call to add the Authorize
attribute.
And I will also add a Console.WriteLine
statement to make sure that the user id I am going to use for creating the JWT token is actually the one coming to the gRPC server.
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcClientCount;
using Microsoft.AspNetCore.Authorization;
using System;
using System.Threading.Tasks;
namespace GrpcDemo.Server
{
[Authorize]
public class ClientCountProviderService : ClientCountProvider.ClientCountProviderBase
{
public override async Task GetClientCount(Empty request, IServerStreamWriter<ClientCount> responseStream, ServerCallContext context)
{
var count = 0;
Console.WriteLine($"User name from JWT Token {context.GetHttpContext().User.Identity.Name}");
while (!context.CancellationToken.IsCancellationRequested)
{
await responseStream.WriteAsync(new ClientCount { Count = count });
count++;
}
}
}
}
Update the Client code
Once the server is set up to accept JWT authentication token for authentication, it is time to update the client to send the token. I will update the existing GrpcDemo.Client
with the new code.
The client can send the token one of two ways:
- Sending token as a part of the header in the function parameter
- Setting up the token as a part of the gRPC channel created
Token as a part of the header
Firstly, I will try the token as part of the header. To do so, I will create a new instance of Metadata
. Once the instance of Metadata
is created I will add Authorization as key and “Bearer token” as the value.
I will keep the token value as a fake string “token” to show that the server is sending 401 when an invalid token is passed.
var headers = new Metadata();
headers.Add("Authorization", "Bearer token");
Once that is done, I will change the call to GetClientCount
method to add the header.
using var population = client.GetClientCount(
new Empty(),
cancellationToken: token.Token,
headers: headers);
Once I do that, the client is now ready to be tested out.
Testing Authentication with header token
As we can see the client is sending HTTP Unauthenticated 401 response since we passed a fake token.
Now, I will run Postman to get a valid token from the JWT Authentication server.
Once I get the token, I will update the client code to set the proper token.
var headers = new Metadata();
headers.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1bmlxdWVfbmFtZSI6InRlc3QxIiwibmJmIjoxNTgxOTYyNzI0LCJleHAiOjE1ODE5NjYzMjQsImlhdCI6MTU4MTk2MjcyNH0.VvYln0PgZQrFwBTx0Ik3TGGI43DxdVVxzHAXma-K5P0");
Now, I will run the client application again. And I will see a proper response. It will print continuous integer numbers in increments from the server.
I will also verify the Console on the server to ensure it is indeed getting token for the user “test1”. The user id I used to create the token in Postman.
And as expected we can see the “test1” user is printed on the server console.
Token in gRPC Channel
Sending token as a part of the method call is repetitive. And if we have to make multiple calls to the server, it is tedious. Instead, we can pass the token as a part of the channel configuration.
To achieve this, I will create a new instance of CallCredentials
, using the static method FromInterceptor
of the CallCredentials
class. The FromInterceptor
takes an async delegate as the parameter which takes AuthInterceptorContext
and Metadata
object as its input. Hence I will use the Metadata
instance to set the authentication token.
var credentials = CallCredentials.FromInterceptor((c, m) => {
m.Add("Authorization",
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1bmlxdWVfbmFtZSI6InRlc3QxIiwibmJmIjoxNTgxOTYyNzI0LCJleHAiOjE1ODE5NjYzMjQsImlhdCI6MTU4MTk2MjcyNH0.VvYln0PgZQrFwBTx0Ik3TGGI43DxdVVxzHAXma-K5P0");
return Task.CompletedTask;
});
Once the instance of CallCredentials
is created, I will pass it as a part of the GrpcChannelOptions
parameter to the GrpcChannel.ForAddress
call.
var channel = GrpcChannel.ForAddress("https://localhost:5001/",
new GrpcChannelOptions {
Credentials = ChannelCredentials.Create(new SslCredentials(), credentials)
});
Once the channel is configured with authentication, I will update the client call to remove the header parameter. So that the gRPC call uses authentication header from the channel.
using var population = client.GetClientCount(
new Empty(),
cancellationToken: token.Token);
Testing Authentication with channel token
Once the channel code is complete I will run the application. And I will see the expected output.
Conclusion
In this blog post, I walked through server and client gRPC streams. And as you can see it is trivial to configure and use both. Both of these features are extremely powerful tools for building reactive microservices.
In terms of authentication, gRPC works with standard ASP.Net Core authentication middleware. Which makes it really easy to use any authentication provider. In the blog post I have used JWT, but really anything else could be used for authentication.
Source Code: https://github.com/choudhurynirjhar/gRPC
Youtube: https://youtu.be/XHHNVvS34PU