Reactive Extensions in .Net Core

Hello everyone, welcome back to .Net Core Central. Today I am going to integrate Reactive Extensions into the .Net Core application. I am going to use application created in my previous post, where I have implemented Kafka stream in .Net Core. This will allow a smooth transition of the Kafka stream into the in-memory Reactive Extensions stream. If you have not gone through my previous posts ASP.Net Core Streaming Application Using Kafka – Part 1 and ASP.Net Core Streaming Application Using Kafka – Part 2, I will strongly suggest you do. Because this post is the next logical continuation of the previous two posts.

Reactive Extensions in .Net Core

What is Reactive Extensions (Rx)

The Reactive Extensions or short form Rx, is a library for composing asynchronous and event-based programs using observable sequences. Using reactive extensions we represent asynchronous data streams with Observables and we query asynchronous data streams using LINQ operators.

Reactive Extensions also have Schedulers built in. And using Schedulers we can control when a subscription starts and when notifications are published.

In this post I will be covering observable sequence and LINQ operators of Reactive Extensions.

The IObservable of Reactive Extensions is considered a dual of IEnumerable. IEnumerable is Pull/Synchronous where as IObservable is Push/Asynchronous.

There are five nuget packages that are needed for Reactive Extensions:
System.Reactive
System.Reactive.Core
System.Reactive.Interfaces
System.Reactive.Linq
System.Reactive.PlatformServices
Microsoft.Reactive.Testing

Installing Reactive Extensions Nuget Package

I am going to add the Reactive Extensions nuget packages to the TimeManagement.Streaming.Consumer project. Because the Kafka Consumer is already getting streaming data from Kafka. Hence it is logical to have the Reactive Extensions implementation in this project, so that the external stream can be converted into a in-memory data stream.

First of all I will open the TimeManagement solution in Visual Studio 2017. Furthermore after the solution is opened, I am going to right click on the TimeManagement.Streaming.Consumer project and select “Manage Nuget Packages” option. And this will open up the Nuget Package Manager window. Also, In the Browse tab, I will search for System.Reactive. And this will show results for all Reactive Extensions related nuget packages.

Reactive Extensions Nuget Packages for .Net Core

Finally from the Nuget Package Manager, I will install System.Reactive, System.Reactive.Core, System.Reactive.Interfaces and System.Reactive.Linq packages. And I will install the stable version 3.1.1.

.Net Core Reactive Extensions Implementation

IBookingStream Interface

To implement Rx in the Kafka Consumer project, first of all I will create a new interface IBookingStream, which will have two methods.

First method is Publish, which is used for publishing messages into the Rx stream. The Publish method will take an instance of the BookingMessage class as a parameter.

And the second method is Subscribe, which is used to subscribe the listeners. The Subscribe method will take the name of the subscriber as the first parameter and an Action<BookingMessage> delegate as the second parameter. The Action<BookingMessage> delegate will be used by the subscriber to show the message in the console UI.

public interface IBookingStream
{
    void Publish(BookingMessage bookingMessage);

    void Subscribe(string subscriberName, Action action);
}
BookingStream Class

Furthermore I will create BookingStream class, which will host the implementation for Rx.

First of all I will create a new instance of class Subject from System.Reactive.Subjects namespace. The Subject class has implementation for of both IObservable and IObserver.

private Subject bookingMessageSubject;

public BookingStream()
{
    bookingMessageSubject = new Subject();
}
Publish Method

And next I will implement Publish method. And inside the Publish method, I will call OnNext method on bookingMessageSubject object, and pass the incoming BookingMessage object. The OnNext method will notify all subscribers arrival of the new message.

public void Publish(BookingMessage bookingMessage)
{
    bookingMessageSubject.OnNext(bookingMessage);
}
Subscribe Method

In addition to the Publish method, I will implement the Subscribe method. And Inside the Subscribe method I will call the Subscribe on the bookingMessageSubject object and pass the incoming Action<BookingMessage> delegate to it.

Since the Subscribe method returns an IDisposable, hence I will create a IDictionary object with string as key and IDisposable as the value, for storing all subscribers. Consequently I will update the constructor of BookingStream class.

private IDictionary subscribers;

public BookingStream()
{
    bookingMessageSubject = new Subject();
    subscribers = new Dictionary();
}

public void Subscribe(string subscriberName, Action action)
{
    if (!subscribers.ContainsKey(subscriberName))
    {
        subscribers.Add(subscriberName, bookingMessageSubject.Subscribe(action));
    }
}
Dispose Method

Since all the subscribers are of type IDisposable and the Subject also implements IDisposable, I will implement IDisposable to the BookingStream class and implement the Dispose method.

public void Dispose()
{
    if(bookingMessageSubject != null)
    {
        bookingMessageSubject.Dispose();
    }
    
    foreach(var subscriber in subscribers)
    {
        subscriber.Value.Dispose();
    }
}
Implement Linq inside Subscribe

And finally I will use LINQ query in the bookingMessageSubject object inside of the Subscribe method to filter all empty messages.

public void Subscribe(string subscriberName, Action action)
{
    if (!subscribers.ContainsKey(subscriberName))
    {
        subscribers.Add(subscriberName, bookingMessageSubject.Where(m => m.Message.Length > 0).Subscribe(action));
    }
}
Complete BookingStream Class

Now the BookingStream class implementation is complete.

public class BookingStream : IBookingStream, IDisposable
{
    private Subject bookingMessageSubject;
    private IDictionary subscribers;

    public BookingStream()
    {
        bookingMessageSubject = new Subject();
        subscribers = new Dictionary();
    }

    public void Dispose()
    {
        if(bookingMessageSubject != null)
        {
             bookingMessageSubject.Dispose();
        }

        foreach(var subscriber in subscribers)
        {
            subscriber.Value.Dispose();
        }
    }

    public void Publish(BookingMessage bookingMessage)
    {
        bookingMessageSubject.OnNext(bookingMessage);
    }

    public void Subscribe(string subscriberName, Action action)
    {
        if (!subscribers.ContainsKey(subscriberName))
        {
            subscribers.Add(subscriberName, bookingMessageSubject.Where(m => m.Message.Length > 0).Subscribe(action));
        }
    }
}
Using IBookingStream

Furthermore I will update BookingConsumer class to use IBookingStream. Hence instead of existing implementation where an Action delegate is passed from Main method to Listen method of the BookingConsumer class, I will update the BookingConsumer to take IBookingStream as a constructor parameter. And delete the Action delegate from parameter of the Listen method. And also I will add an Action delegate into the constructor for logging purpose.

public interface IBookingConsumer
{
    void Listen();
}

public class BookingConsumer : IBookingConsumer
{
    private readonly IBookingStream bookingStream;
    private readonly Action logger;

    public BookingConsumer(IBookingStream bookingStream, Action logger)
    {
        this.bookingStream = bookingStream;
        this.logger = logger;
    }

    public void Listen()
    {
        var config = new Dictionary;
        {
            {"group.id","booking_consumer" },
            {"bootstrap.servers", "localhost:9092" },
            { "enable.auto.commit", "false" }
        };

        using(var consumer = new Consumer(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.Subscribe("timemanagement_booking");
            logger("Subscribed");

            consumer.OnMessage += (_, msg) => {
                bookingStream.Publish(new BookingMessage { Message = msg.Value });
            };
            logger("OnMessage attached");

            while (true)
            {
                consumer.Poll(100);
            }
        }
    }
}
Updating Main method

Finally I will update the Main method in the Program class to use updated BookingConsumer implementation. And I will also create two subscriber, just to show you can have multiple subscribers to the same stream. As a result different subscribers will handle the same message differently.

class Program
{
    static void Main(string[] args)
    {
        var bookingStream = new BookingStream();
        var bookingConsumer = new BookingConsumer(bookingStream, Console.WriteLine);
        
        bookingStream.Subscribe("Subscriber1", (m) => Console.WriteLine($"Subscriber1 Message : {m.Message}"));
        bookingStream.Subscribe("Subscriber2", (m) => Console.WriteLine($"Subscriber2 Message Formatted : {m.Message.Substring(0, 2)}"));

        bookingConsumer.Listen();
    }
}

Conclusion

In conclusion, implementing real-time streaming application using Reactive Extensions in .Net Core is extremely simple and easy. And Reactive Extensions would help with an easy transition from the external stream into in-memory stream. Most noteworthy is the ability to use Linq queries on the live stream. And this is extremely powerful.

Finally, I will start the Consumer and Producer window and verify the solution is working as expected.

Reactive Extensions in .Net Core test results

Furthermore, I have created a YouTube video. And in this video I have recorded the steps I followed for creating the .Net Core producer.

References:

Reactive Extensions (.Net): https://github.com/Reactive-Extensions/Rx.NET