Hello everyone, welcome back to .Net Core Central. This post is the continuation of the previous post ASP.Net Core Streaming Application Using Kafka – Part 1. And here I will be creating the Kafka producer in .Net Core.
Creating .Net Core Producer
Create new Producer Project
First of all I will open the TimeManagement
application solution in Visual Studio 2017. Furthermore when the solution is opened, I am going to right click on the solution and select “Add” -> “New Project“. As a result the New Project model window will open up.
In the New Project model window, I am going to select “.Net Core” -> “Console App (.NET Core)” and give the name of the project “TimeManagement.Streaming.Producer
” and click “OK“.
As a result a new project is created.
Finally I will install nuget package for Kafka “Confluent.Kafka”, (project url: https://github.com/confluentinc/confluent-kafka-dotnet/) into the new project.
IBookingProducer
After the nuget package is installed, I will create a new interface “IBookingProducer
”, which will have the contract for producing booking related messages to the Kafka stream. It will have a single method Produce
, which returns void
and takes a string
message as its single input parameter.
public interface IBookingProducer { void Produce(string message); }
BookingProducer
Finally, I will create the implementation class BookingProducer
, which will host the code for connecting and producing message to Kafka stream. The BookingProducer
will implement the IBookingProducer
interface.
And I will import the Confluent.Kafka
and Confluent.Kafka.Serialization
namespaces into this class.
For the implementation of Produce
method, I will first create a configuration object, which is an instance of Dictionary
class with key as string
and value as object
. This configuration object will be passed to the constructor of the Producer
class of the Confluent.Kafka
assembly.
After creating the Producer
object, I will publish message to the “timemanagement_booking” topic. And finally I will Flush
the message into the Kafka stream. The Flush
will wait until all outstanding produce request and delivery callback are complete.
public class BookingProducer : IBookingProducer { public void Produce(string message) { var config = new Dictionary<string, object> { {"bootstrap.servers", "localhost:9092"} }; using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) { producer.ProduceAsync("timemanagement_booking", null, message).GetAwaiter().GetResult(); producer.Flush(100); } } }
Invoking .Net Core Kafka Producer
For invoking the producer, I will update the Main
method of the Program.cs
to create an instance of BookingProducer
. And call the Produce
method of the BookingProducer
passing message from user captured by Console.Readline
. And this message will be published into the Kafka stream. And finally I will build the project.
class Program { static void Main(string[] args) { Console.WriteLine("Enter your message. Enter q for quitting"); var message = default(string); while((message = Console.ReadLine()) != "q") { var producer = new BookingProducer(); producer.Produce(message); } } }
Testing .Net Core Kafka Producer
First of all I will start the .Net Core Consumer. And for that I will open the command prompt and navigate to the source code location for TimeManagement.Streaming.Consumer
. Finally type the command “dotnet run” and press enter. And now the .Net Core Kafka consumer will start running in the console.
Next, I will open another command prompt. And navigate to the source code location for TimeManagement.Streaming.Producer
. Finally type the command “dotnet run” and press enter. And now the .Net Core Kafka producer will start running in the console.
Finally I will type a message “Hello from .Net Core Producer” in the TimeManagement.Streaming.Producer
console. And I can see the message appearing in the TimeManagement.Streaming.Consumer
console.
Conclusion
Just like the Kafka consumer I created in my last post, creating the Kafka producer in .Net Core was also extremely simple and straightforward. I had a roadblock with the ProduceAsync
method, as the first message published was never coming in the consumer console. And the reason for that is the Flush timeout was too short. Hence the Flush was called too early for the first message.
To solve this I had two options.
- Call the
GetWaiter
andGetResult
to make the produce call synchronous, e.g.,producer.ProduceAsync("timemanagement_booking", null, message).GetAwaiter().GetResult();
- Increase the timeout in the Flush method to 5 seconds, e.g.,
producer.Flush(5000);
In my implementation I made the produce call synchronous.
Finally here is the link to the YouTube video. And in this video I have recorded the steps I followed for creating the .Net Core producer.
References:
Kafka: https://kafka.apache.org/
Confluent.Kafka Nuget: https://github.com/confluentinc/confluent-kafka-dotnet/