Asynchronous Streams in C# 8/.Net Core 3.0

Asynchronous Streams is a new feature which is recently added to C# language in .NET Conference as a part of C# 8. This is one of the features C# 8 introduced, that I am very excited about. I introduced this feature when I discussed my top 5 features of C#8/.Net Core 3.0 in my previous blog post. In this blog, I am going to take a deep dive into the details of Asynchronous streams and a hands-on code example of implementing it. So, let’s dig in.

asynchronous streams

Three main high-level topics I want to cover in this blog are the following:

  • What are Asynchronous Streams
  • Its difference with Reactive Extensions
  • An implementation example

What are Asynchronous Streams?

Asynchronous streams are a continuous stream of data that is not deterministic. Meaning, we are not guaranteed to get the data when we start to you call the stream. Rather, we wait on the stream, and as and when the data is available to the stream, it is delivered to us. As I mentioned earlier, it is introduced with C# 8 and .Net Core 3.0. It is supported by a new interface IAsyncEnumerable. And any function returning an IAsyncEnumerable will have an async prefix, just like any other asynchronous functions today. An example below:

private async IAsyncEnumerable<string> ReturnAsyncString()

The question that we might have is, what is the difference between an asynchronous stream and an async function. Well, async functions always return a single or scalar data, whereas an asynchronous stream returns a continuous collection stream. The doubt might be, well async function can return a Task<IEnumerable<T>>, does not that count as an asynchronous stream? Well, it’s not the same thing. If you have a function, like below, then yes you can do it, but this is not really an asynchronous stream.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Async.IEnumerable.Test
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var result = await Test();
            foreach (var item in result)
            {
                Console.WriteLine(item);
            }
        }

        private static Task<IEnumerable<string>> Test()
        {
            var result = new[] { "First", "Second" }.AsEnumerable();
            return Task.FromResult(result);
        }
    }
}

It is because we are not doing any asynchronous operation inside of the function Test, we are just returning an enumerable back. There is no yield return statement here. Now, let’s change the implementation and implement yield return statement. And for that I will create a new function GetStrings, which will return an IEnumerable of strings.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Async.IEnumerable.Test
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var result = await Test();
            foreach (var item in result)
            {
                Console.WriteLine(item);
            }
        }

        private static Task<IEnumerable<string>> Test()
        {
            return Task.FromResult(GetStrings());
        }

        private static IEnumerable<string> GetStrings()
        {
            for(var count = 0; count < 5; count++)
            {
                yield return $"Count {count}";
            }
        }
    }
}

Even, in this case, it’s not a real asynchronous stream, as inside of the GetStrings function, before yield return statement, I am not waiting on any tasks. I created a fictitious Task inside the Test function. But it’s not a real asynchronous operation due to an asynchronous I/O operation.

After I implement an example for IAsyncEnumerable, I am going to discuss why we can never have a real asynchronous stream that returns a Task<IEnumerable<T>> in detail.

What is the difference between Rx and Asynchronous streams?

Well, the fundamental difference lies with the producer of the data. In the case of Rx or Reactive Extensions, which I wrote about in one of my previous blog posts (https://dotnetcorecentral.com/blog/reactive-extensions-in-net-core/), the consumer is not aware of the producer. The consumer will subscribe to a stream and receive data as and when they arrive. Whereas for asynchronous streams, the consumer will directly call the producer to get data. The consumer in case of an asynchronous stream will be getting data from a stream as well, but the consumer and producer are not decoupled.

We implement Rx using IObservable whereas we implement asynchronous stream using async IEnumerable. One is a push, and the other is pull.

Creating an application

Now that we have a good understanding of what asynchronous streams are, let us create an application to demonstrate this feature. The idea of the application is simple. We will have a ASP.Net Core Web API application, which returns the weather data based on zip code. And a .Net Core Console client application, which will call this Web API to get weather data for a list of zip codes in a state.

The .Net Core Console client application code will use an asynchronous stream to expose the weather data from the service proxy.

New Web API

Firstly, I will create a Web API project in Visual Studio 2019. This project will make a call out to OpenWeather to get current weather data for a zipcode. In Visual Studio, select “ASP.NET Core Web Application”, and select “API” and .Net Core version as “ASP.NET Core 3.0”. I will keep HTTPS and Docker enabled.

web api create

Secondly, once the is project created, I will update the Startup class, to configure HttpClient for the Weather Web API call. And update the classWeatherForecastController, to first remove the existing mock implementation and make a new call OpenWeather API to get weather information for a zipcode.

Thirdly, I will add Newtonsoft.Json nuget package for deserializing the data coming back from OpenWeather API.

using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace WebApplication3
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();

            services.AddHttpClient("weather", c => c.BaseAddress = new Uri("http://samples.openweathermap.org/data/2.5/"));
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace WebApplication3.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
        private readonly IHttpClientFactory _httpClientFactory;
        private readonly ILogger<WeatherForecastController> _logger;

        public WeatherForecastController(IHttpClientFactory httpClientFactory, ILogger<WeatherForecastController> logger)
        {
            _httpClientFactory = httpClientFactory;
            _logger = logger;
        }

        [HttpGet]
        public async Task<WeatherModel> Get([FromQuery]string zip)
        {
            using (var client = _httpClientFactory.CreateClient("weather"))
            {
                var response = await client.GetAsync($"weather?zip={zip},us&appid=YOUR_APP_ID_HERE");
                if (response.StatusCode == HttpStatusCode.OK)
                {
                    var data = await response.Content.ReadAsStringAsync();
                    return JsonConvert.DeserializeObject<WeatherModel>(data);
                }
                return null;
            }
        }
    }

    public class WeatherModel
    {
        public MainModel Main { get; set; }
    }

    public class MainModel
    {
        public decimal Temp { get; set; }
        public int Pressure { get; set; }
        public int Humidity { get; set; }
    }
}

The Weather API Client

Finally, I will create a .Net Core 3.0 Console application. This application will consume the ASP.Net Core Web API application I have created earlier. This consumer Console application, will create an asynchronous stream to expose the Weather data to the calling Main function.

The client will keep a list of 6 zip codes in a static array. In a real-life example, even this value will be from a database or another API. And based on these zip codes it will make async calls to the Web API and create an asynchronous stream. Which will be read by the Main method in a foreach loop with await prefix.

To this project as well, I will add Newtonsoft.Json nuget package, which I will use to deserialize the response from the Web API call.

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;

namespace WeatherClient
{
    class Program
    {
        private static int[] zips = new int[] { 08816, 08536, 08810, 08812, 08817, 08832 };
        static async Task Main(string[] args)
        {
            await foreach(var humidity in GetHumidity())
            {
                Console.WriteLine($"Humidity for zip {humidity.Zip} is {humidity.Humidity}");
            }
        }

        private static async IAsyncEnumerable<MainModel> GetHumidity()
        {
            var client = new HttpClient
            {
                BaseAddress = new Uri("https://localhost:44363/")
            };

            foreach (var zip in zips)
            {
                var response = await client.GetAsync($"weatherforecast?zip={zip}");
                var data = await response.Content.ReadAsStringAsync();
                var weather = JsonConvert.DeserializeObject<WeatherModel>(data);
                weather.Main.Zip = zip;
                yield return weather.Main;
            }
        }
    }

    public class WeatherModel
    {
        public MainModel Main { get; set; }
    }

    public class MainModel
    {
        public decimal Temp { get; set; }
        public int Pressure { get; set; }
        public int Humidity { get; set; }
        public int Zip { get; set; }
    }
}

Once I run this application, I can see the response as expected. Humidity for each of the zip codes will print in the console.

asynchronous stream output

Why IAsyncEnumerable<T> is not the same as Task<IEnumerable<T>>

The short answer is, a function returning Task<IEnumerable<T>> with async prefix will not even compile. To demonstrate that, I will update the GetHumidity function above and change the implementation to return  Task<IEnumerable<T>>, instead of IAsyncEnumerable<T>.

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;

namespace WeatherClient
{
    class Program
    {
        private static int[] zips = new int[] { 08816, 08536, 08810, 08812, 08817, 08832 };
        static async Task Main(string[] args)
        {
            await foreach (var humidity in GetHumidity())
            {
                Console.WriteLine($"Humidity for zip {humidity.Zip} is {humidity.Humidity}");
            }
        }

        private static async Task<IEnumerable<MainModel>> GetHumidity()
        {
            var client = new HttpClient
            {
                BaseAddress = new Uri("https://localhost:44363/")
            };

            foreach (var zip in zips)
            {
                var response = await client.GetAsync($"weatherforecast?zip={zip}");
                var data = await response.Content.ReadAsStringAsync();
                var weather = JsonConvert.DeserializeObject<WeatherModel>(data);
                weather.Main.Zip = zip;
                yield return weather.Main;
            }
        }
    }

    public class WeatherModel
    {
        public MainModel Main { get; set; }
    }

    public class MainModel
    {
        public decimal Temp { get; set; }
        public int Pressure { get; set; }
        public int Humidity { get; set; }
        public int Zip { get; set; }
    }
}

Once I have this implementation, I will get two errors. Firstly, the GetHumidity function will throw a compile-time error: Error    CS1624 The body of 'Program.GetHumidity()' cannot be an iterator block because 'Task>' is not an iterator interface type. Secondly, the foreach inside of Main method where I am calling GetHumidity, will throw the following compiler error: Error    CS8411 Asynchronous foreach statement cannot operate on variables of type 'Task>' because 'Task>' does not contain a suitable public instance definition for 'GetAsyncEnumerator'.

compiler error
Compiler error

Conclusion

In conclusion, it’s a neat feature to use for asynchronous collections, which was not available before. In my opinion, this complements Reactive Extension which is a push mechanism. Whereas an asynchronous stream is a pull mechanism. The use-case I see for this is where we are making multiple asynchronous I/O operations, we can just create a synchronous stream out of the I/O operations, instead of the caller making an individual call to a scalar function in a loop.

YouTube video for the post: https://youtu.be/XvPcMHmW1Zw