ASP.Net Core Streaming Application Using Kafka – Part 2

Hello everyone, welcome back to .Net Core Central. This post is the continuation of the previous post ASP.Net Core Streaming Application Using Kafka – Part 1. And here I will be creating the Kafka producer in .Net Core.

Creating .Net Core Producer

Create new Producer Project

First of all I will open the TimeManagement application solution in Visual Studio 2017. Furthermore when the solution is opened, I am going to right click on the solution and select “Add” -> “New Project“. As a result the New Project model window will open up.

In the New Project model window, I am going to select “.Net Core” -> “Console App (.NET Core)” and give the name of the project “TimeManagement.Streaming.Producer” and click “OK“.
As a result a new project is created.

Finally I will install nuget package for Kafka “Confluent.Kafka”, (project url: https://github.com/confluentinc/confluent-kafka-dotnet/) into the new project.

IBookingProducer

After the nuget package is installed, I will create a new interface “IBookingProducer”, which will have the contract for producing booking related messages to the Kafka stream. It will have a single method Produce, which returns void and takes a string message as its single input parameter.

public interface IBookingProducer
{
    void Produce(string message);
}
BookingProducer

Finally, I will create the implementation class BookingProducer, which will host the code for connecting and producing message to Kafka stream. The BookingProducer will implement the IBookingProducer interface.

And I will import the Confluent.Kafka and Confluent.Kafka.Serialization namespaces into this class.

For the implementation of Produce method, I will first create a configuration object, which is an instance of Dictionary class with key as string and value as object. This configuration object will be passed to the constructor of the Producer class of the Confluent.Kafka assembly.

After creating the Producer object, I will publish message to the “timemanagement_booking” topic. And finally I will Flush the message into the Kafka stream. The Flush will wait until all outstanding produce request and delivery callback are complete.

public class BookingProducer : IBookingProducer
{
    public void Produce(string message)
    {
        var config = new Dictionary<string, object> {
           {"bootstrap.servers", "localhost:9092"}
        };
        using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
        {
            producer.ProduceAsync("timemanagement_booking", null, message).GetAwaiter().GetResult();
            producer.Flush(100);
        }
    }
}

Invoking .Net Core Kafka Producer

For invoking the producer, I will update the Main method of the Program.cs to create an instance of BookingProducer. And call the Produce method of the BookingProducer passing message from user captured by Console.Readline. And this message will be published into the Kafka stream. And finally I will build the project.

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Enter your message. Enter q for quitting");
        var message = default(string);
        while((message = Console.ReadLine()) != "q")
        {
            var producer = new BookingProducer();
            producer.Produce(message);
        }
    }
}

Testing .Net Core Kafka Producer

First of all I will start the .Net Core Consumer. And for that I will open the command prompt and navigate to the source code location for TimeManagement.Streaming.Consumer. Finally type the command “dotnet run” and press enter. And now the .Net Core Kafka consumer will start running in the console.

Next, I will open another command prompt. And navigate to the source code location for TimeManagement.Streaming.Producer. Finally type the command “dotnet run” and press enter. And now the .Net Core Kafka producer will start running in the console.

Finally I will type a message “Hello from .Net Core Producer” in the TimeManagement.Streaming.Producer console. And I can see the message appearing in the TimeManagement.Streaming.Consumer console.

Conclusion

Just like the Kafka consumer I created in my last post, creating the Kafka producer in .Net Core was also extremely simple and straightforward. I had a roadblock with the ProduceAsync method, as the first message published was never coming in the consumer console. And the reason for that is the Flush timeout was too short. Hence the Flush was called too early for the first message.

To solve this I had two options.

  1. Call the GetWaiter and GetResult to make the produce call synchronous, e.g., producer.ProduceAsync("timemanagement_booking", null, message).GetAwaiter().GetResult();
  2. Increase the timeout in the Flush method to 5 seconds, e.g., producer.Flush(5000);

In my implementation I made the produce call synchronous.

Finally here is the link to the YouTube video. And in this video I have recorded the steps I followed for creating the .Net Core producer.

References:
Kafka: https://kafka.apache.org/
Confluent.Kafka Nuget: https://github.com/confluentinc/confluent-kafka-dotnet/