Hello everyone, welcome back to .Net Core Central. Today I am going to integrate Reactive Extensions into the .Net Core application. I am going to use application created in my previous post, where I have implemented Kafka stream in .Net Core. This will allow a smooth transition of the Kafka stream into the in-memory Reactive Extensions stream. If you have not gone through my previous posts ASP.Net Core Streaming Application Using Kafka – Part 1 and ASP.Net Core Streaming Application Using Kafka – Part 2, I will strongly suggest you do. Because this post is the next logical continuation of the previous two posts.
What is Reactive Extensions (Rx)
The Reactive Extensions or short form Rx, is a library for composing asynchronous and event-based programs using observable sequences. Using reactive extensions we represent asynchronous data streams with Observables and we query asynchronous data streams using LINQ operators.
Reactive Extensions also have Schedulers built in. And using Schedulers we can control when a subscription starts and when notifications are published.
In this post I will be covering observable sequence and LINQ operators of Reactive Extensions.
The IObservable
of Reactive Extensions is considered a dual of IEnumerable
. IEnumerable
is Pull/Synchronous where as IObservable
is Push/Asynchronous.
There are five nuget packages that are needed for Reactive Extensions:
System.Reactive
System.Reactive.Core
System.Reactive.Interfaces
System.Reactive.Linq
System.Reactive.PlatformServices
Microsoft.Reactive.Testing
Installing Reactive Extensions Nuget Package
I am going to add the Reactive Extensions nuget packages to the TimeManagement.Streaming.Consumer
project. Because the Kafka Consumer is already getting streaming data from Kafka. Hence it is logical to have the Reactive Extensions implementation in this project, so that the external stream can be converted into a in-memory data stream.
First of all I will open the TimeManagement
solution in Visual Studio 2017. Furthermore after the solution is opened, I am going to right click on the TimeManagement.Streaming.Consumer
project and select “Manage Nuget Packages” option. And this will open up the Nuget Package Manager window. Also, In the Browse tab, I will search for System.Reactive
. And this will show results for all Reactive Extensions related nuget packages.
Finally from the Nuget Package Manager, I will install System.Reactive
, System.Reactive.Core
, System.Reactive.Interfaces
and System.Reactive.Linq
packages. And I will install the stable version 3.1.1.
.Net Core Reactive Extensions Implementation
IBookingStream Interface
To implement Rx in the Kafka Consumer project, first of all I will create a new interface IBookingStream
, which will have two methods.
First method is Publish
, which is used for publishing messages into the Rx stream. The Publish
method will take an instance of the BookingMessage
class as a parameter.
And the second method is Subscribe
, which is used to subscribe the listeners. The Subscribe
method will take the name of the subscriber as the first parameter and an Action<BookingMessage>
delegate as the second parameter. The Action<BookingMessage>
delegate will be used by the subscriber to show the message in the console UI.
public interface IBookingStream { void Publish(BookingMessage bookingMessage); void Subscribe(string subscriberName, Actionaction); }
BookingStream Class
Furthermore I will create BookingStream
class, which will host the implementation for Rx.
First of all I will create a new instance of class Subject
from System.Reactive.Subjects
namespace. The Subject
class has implementation for of both IObservable
and IObserver
.
private SubjectbookingMessageSubject; public BookingStream() { bookingMessageSubject = new Subject (); }
Publish Method
And next I will implement Publish
method. And inside the Publish
method, I will call OnNext
method on bookingMessageSubject
object, and pass the incoming BookingMessage
object. The OnNext
method will notify all subscribers arrival of the new message.
public void Publish(BookingMessage bookingMessage) { bookingMessageSubject.OnNext(bookingMessage); }
Subscribe Method
In addition to the Publish
method, I will implement the Subscribe
method. And Inside the Subscribe
method I will call the Subscribe
on the bookingMessageSubject
object and pass the incoming Action<BookingMessage>
delegate to it.
Since the Subscribe
method returns an IDisposable
, hence I will create a IDictionary
object with string
as key and IDisposable
as the value, for storing all subscribers. Consequently I will update the constructor of BookingStream
class.
private IDictionarysubscribers; public BookingStream() { bookingMessageSubject = new Subject (); subscribers = new Dictionary (); } public void Subscribe(string subscriberName, Action action) { if (!subscribers.ContainsKey(subscriberName)) { subscribers.Add(subscriberName, bookingMessageSubject.Subscribe(action)); } }
Dispose Method
Since all the subscribers are of type IDisposable
and the Subject
also implements IDisposable
, I will implement IDisposable
to the BookingStream
class and implement the Dispose
method.
public void Dispose() { if(bookingMessageSubject != null) { bookingMessageSubject.Dispose(); } foreach(var subscriber in subscribers) { subscriber.Value.Dispose(); } }
Implement Linq inside Subscribe
And finally I will use LINQ query in the bookingMessageSubject
object inside of the Subscribe method to filter all empty messages.
public void Subscribe(string subscriberName, Actionaction) { if (!subscribers.ContainsKey(subscriberName)) { subscribers.Add(subscriberName, bookingMessageSubject.Where(m => m.Message.Length > 0).Subscribe(action)); } }
Complete BookingStream Class
Now the BookingStream
class implementation is complete.
public class BookingStream : IBookingStream, IDisposable { private SubjectbookingMessageSubject; private IDictionary subscribers; public BookingStream() { bookingMessageSubject = new Subject (); subscribers = new Dictionary (); } public void Dispose() { if(bookingMessageSubject != null) { bookingMessageSubject.Dispose(); } foreach(var subscriber in subscribers) { subscriber.Value.Dispose(); } } public void Publish(BookingMessage bookingMessage) { bookingMessageSubject.OnNext(bookingMessage); } public void Subscribe(string subscriberName, Action action) { if (!subscribers.ContainsKey(subscriberName)) { subscribers.Add(subscriberName, bookingMessageSubject.Where(m => m.Message.Length > 0).Subscribe(action)); } } }
Using IBookingStream
Furthermore I will update BookingConsumer
class to use IBookingStream
. Hence instead of existing implementation where an Action
delegate is passed from Main
method to Listen
method of the BookingConsumer
class, I will update the BookingConsumer
to take IBookingStream
as a constructor parameter. And delete the Action
delegate from parameter of the Listen
method. And also I will add an Action
delegate into the constructor for logging purpose.
public interface IBookingConsumer { void Listen(); } public class BookingConsumer : IBookingConsumer { private readonly IBookingStream bookingStream; private readonly Actionlogger; public BookingConsumer(IBookingStream bookingStream, Action logger) { this.bookingStream = bookingStream; this.logger = logger; } public void Listen() { var config = new Dictionary ; { {"group.id","booking_consumer" }, {"bootstrap.servers", "localhost:9092" }, { "enable.auto.commit", "false" } }; using(var consumer = new Consumer (config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Subscribe("timemanagement_booking"); logger("Subscribed"); consumer.OnMessage += (_, msg) => { bookingStream.Publish(new BookingMessage { Message = msg.Value }); }; logger("OnMessage attached"); while (true) { consumer.Poll(100); } } } }
Updating Main method
Finally I will update the Main
method in the Program
class to use updated BookingConsumer
implementation. And I will also create two subscriber, just to show you can have multiple subscribers to the same stream. As a result different subscribers will handle the same message differently.
class Program { static void Main(string[] args) { var bookingStream = new BookingStream(); var bookingConsumer = new BookingConsumer(bookingStream, Console.WriteLine); bookingStream.Subscribe("Subscriber1", (m) => Console.WriteLine($"Subscriber1 Message : {m.Message}")); bookingStream.Subscribe("Subscriber2", (m) => Console.WriteLine($"Subscriber2 Message Formatted : {m.Message.Substring(0, 2)}")); bookingConsumer.Listen(); } }
Conclusion
In conclusion, implementing real-time streaming application using Reactive Extensions in .Net Core is extremely simple and easy. And Reactive Extensions would help with an easy transition from the external stream into in-memory stream. Most noteworthy is the ability to use Linq queries on the live stream. And this is extremely powerful.
Finally, I will start the Consumer and Producer window and verify the solution is working as expected.
Furthermore, I have created a YouTube video. And in this video I have recorded the steps I followed for creating the .Net Core producer.
References:
Reactive Extensions (.Net): https://github.com/Reactive-Extensions/Rx.NET