#begin

 

 

Back a while ago in September of 2022 I got the opportunity to start a new project at Dephion. Its a greenfield project made in Unity3D. A core architectural component that we wanted in this application was an event bus including a reactive API to interact with it. Of course there is this thing called UniRX but we did not really want to take UniRX as a dependency at the core of our architecture. We want to “own” the full stack we use for a project like this.

Thus I set out to research our alternatives and how exactly the internals of UniRX work in case I needed to write my own, light-weight variant of an event bus and reactive extensions. Part of my research was to investigate how reactive programming is generally done in C#. I did some java streams a looong time ago but I didn’t in C# since I never felt the need for it. On the other side, the reactive model and reactive extensions is not fully supported in Unity3D when you compile with IL2CPP, which really is a requirement for us.

So, long story short. I decided to roll my own because the feature-set we need for our project is rather limited and using UniRX would incur to much additional “crap”. What helped me come to this conclusion was a course on Udemy called “Mastering Reactive Extensions with C# and .NET” by Dmitri Nesteruk. I chose his course because it was rather short, about 3 hours and 30 minutes and I “knew” him from another course I did called “Learn parallel Programming with C# and .NET” which I wrote a blog about before.

So in today’s blog I want to focus on the “Mastering Reactive Extensions with C# and .Net” course. I checked out this course back in September so it’s not fresh on my mind but I’ll have a quick look into the internals for this blog. It’s also a nice way to check if the knowledge stuck even after 3 months. So let’s dive straight in.

 

Course Overview

So this is a rather short course, which was perfect to me. Even with a 1.5x or 2x playback speed this is not too hard to follow so I was able to shorten it even more. The course has 5 sections; an introduction which doesn’t really count, I guess that’s why the first section starts with a 0 :D. However, section 1 dives into the key interfaces used in reactive programming in C#. Section 2 is called Subjects, which relate to the term “Subject” in a reactive context. We will get back to what this means later. Section 3 is about the Fundamental Sequence Operators. Since we are dealing with streams of data, or sequences we need to form of operators for filtering or creating logic. Section 3 is all about those. Section 4 sheds some light on the more advanced operators used in sequences.

 

Section 1: Key Interfaces

As with any proper OO code-base it’s really important to understand the core abstractions. In this case we are going to learn about the key interfaces used in the reactive programming model for .Net. Funny enough Dmitri starts of by explaining the observer design pattern. And it’s indeed a very important concept to understand in this context. The observer pattern is core to the reactive model. He then also shows the baked-in IObserver<T> interface in .Net, which interestingly I never, ever used myself. I always roll my own observer like structures using events of System.Action<T>, or even Func<T,..>. I’ve never really seen the need for using the .Net native one, maybe I should 😛 What do you think about this!? Have you used the IObserver<T> interface for creating the observer design pattern?

In the next section things continue with the observer pattern since Dmitri explains the IObservable<T> interface and how it’s different from the IObserver<T> interface. So the IObserver<T> is interested in whatever the IObservable<T> produces. And thus is the producing part of the observer pattern. Again, I’ve never really used these since I implement my own.
The key takeaway from this section is the basics of the Observer pattern. This is fundamental to understand the reactive programming model as it relies on push, instead of pulling data. The observer pattern is here to help us achieve that.

I also want to mention some important things about the contract an observable enforces. An observable can call OnNext, which produces the next value of the sequence. If the sequence is ended, it calls OnCompleted. Once OnCompleted is called, the contract is fulfilled and notifies subscribers to unsub. But it can also produce an OnError which it will raise once an exception is encountered. So to simplify; an Observable can invoke a sequence of OnNext, followed by either an OnComplete or OnError.

 

Section 2: Subjects

This second section is all about the Subject<T> class, which implements both IObservable<T> and IObserver<T>. It acts as a proxy between an observer and observable. This may sound weird but it’s really important to understand this. Since we are dealing with streams of data and we are using some LINQ like API, we need to chain these subjects together and thus the implement both interfaces. Because of this the Subject<T> class acts as both a listener and a produces, thus a proxy of some kind.

Another interesting and very important concept is unsubscribing from a subject. This can be done very cleanly by simply disposing the the subject since it implements the IDisposable interface. This is a design decision I implemented in our custom event bus as well.

I really like this way to unsubscribe. But not just for unsubbing, the IDisposable interface is really undervalued I think. I bet you can find lot’s of dirty hacks to free up memory, or remove event subscriptions in objects that shouldn’t need to. Encapsulating this nicely in objects that implement IDisposable is far cleaner and results in a simpler design.

Another cool subject concept is the ReplaySubject<T> and it does exactly what you think it does. It replays all values to any new subscriber. Personally, I haven’t found this need for this in our specific use-case, but I do see why someone would like this. I once discussed with a friend on how to implement some event bus in Unity and how some events should be persistent instead of “fire-and-forget”. These should be some way to check the history to make decisions. The ReplaySubject<T> will be the perfect way to do this since you can simply hook it up once you need to observe the history, and when you are done, you dispose it. For example; when you are in some menu scene and the selection for the map, enemies, duration and whatever settings. You catch these events on replay subject and pass the reference too some object in the game scene that is responsible for setting this all up, and then simply dispose it. This sounds like a clean way to solve such a problem.

The next Subject Dmitri discussed is the BehaviorSubject<T> which is simply a subject with a default value. So if you try to consume it, you will get this default object. Which is kinda handy in specific situation I suppose.

And the last cool kind of Subject described in this section is the AsyncSubject<T> which basically only ever returns the last value that was ever put on the subject.

 

Section 3: Fundamental Sequence Operators

The third section provides us with some more information about the core aspects of observables. It starts of with some simple factory methods, similar to Task.Factory. For example; there is a factory method to create an observable that returns a single value called Observable.Create<T>(T value) which simply creates a sequence with 1 element and completes it. And another interesting one is the Observable.Empty<T>() which, yes… you guessed it, returns an observable without any values.

The important thing about Observable.Create<T> is that it’s THE idiomatic way to actually create them. Importantly, you also need to return an IDisposable from this create function. Which the reactive extensions module provides you with two cool factory methods as well; the Disposable.Empty() and Disposable.Create(Action callback). This function will generate an IDisposable which simply calls that callback once it’s disposed. Really clean 🙂 I like it.

The next video concerns sequence generators. The first very simple one the is the observable.Range(begin, end) method. This will simply create an observable with the numbers of the given range in there. More interestingly, the observable.Generate functions allows you to do some more information. You can set the initialState, condition, iterate(function), and selector (so how to show the result. There are a couple of overloads but this first one is clear enough to form a mental model of it I think.

Another cool generator is the observable.Interval which returns an incrementing long (number) when the interval hits. So it keeps generating the value as long as it has not been disposed. This is very useful in specific situations I guess, although I cannot think of one from the top of my head. Another variant of the observable.Interval is the observable.Timer, which simply creates a single value once the given times surpassed.

The next video is one that is very interesting and important as well. This is about how to convert “stuff” into observables to be able to exploit the observable API. So there is this observable.Start(action) method. This start method will create a something observable of your function. Note that, if the function does not create a list or array or something, even if you don’t return anything it will return a Unit. The Unit type is (often) used in functional programming languages to say there is no return value. So for example the Writeline function does not return anything, since it writes to IO. But since we need some return value, we “invent” one that represents nothing, or void, and we called it Unit.

Then there is this other very cool converter and that’s observable.FromEventPattern. You simply point this method at some EventHandler, and once it’s raised, the observable will produce a value. How cool is that!? This is really cool. And what is even cooler is the fact that you can do the exact same thing for Tasks. So instead of awaiting it, you can call the ToObservable function on the task and you can operate on it that way. Lastly Dmitri shows how to convert a simple List<T> to an observable using the ToObservable function and inspects the values. Just note that this is an eager function, not lazy!

The next topic in this section is about filtering. This is very similar to LINQ. So if you know the .Where clause in LINQ, you know how to filter observables as well. It also includes other useful operators like Select, Distinct, Take, Skip, and more. But it also introduces some new ones like the TakeWhile, SkipWhile, SkipLast(eager!), SkipUntil and DistinctUntilChanged operator which only returns a value once it actually changes. So a range of [01112333451] will return the numbers: [0 1 2 3 4 5 1]. So it’s skips the duplicated ones. Really simple, and really useful as well!

To properly use the laziness of Sequences we need some proper methods to inspect the values. The functions Dmitri used so far in the video were all dependent on the fact that the sequence completed. But in practice this is not what you want since you use these observables whenever new values are produced, or maybe chucks of values are produced. You could use the ALL method but that’s also eager of course, or it requires the sequence to be completed at least. Another one is Contains which works very similar as Contains in an Array or List context and we also get an ElementAt(index) function to get an element at a specific index. Also make sure that if you use ElementAt, that index is in range or the OnError function of the observable is invoked.

Then there is also this very useful equality check function called SequenceEqual which does exactly what you expect it to do. And again remember that equality can ever been determined when the sequences are different or completed.
The next video is about sequence transformation. This is again very similar to how LINQ works. You can use the OfType and Cast functions to transform values. There are also couple of new ones like the observable.TimeStamp function which returns some interval or timer value to a timestamp. Or the observable.TimeInterval that shows you the interval between values produced. We also get a function called Materialize which simply runs through an entire sequence and completes it. This is similar to LINQ ToArray(), ToList() or ToDictionary() methods. They create an entire copy of the IEnumerable and materialize it into something new. So be careful about garbage here! Dmitri mentions you sometimes use the Materialize function for debugging purposes by the way.

And the last very important topic of this section is sequence aggregation. This works pretty similar to the LINQ aggregate function. Which is sometimes pretty confusing to me. Depending on the contents of the actual sequence. You can also use some aggregations that come included like Min, Max and Sum.

 

Section 4: Advanced Sequence Operators

So this next section dives a bit deeper into the use of sequences and how to actually work with them and make the useful. An important concept here is error handling since, software is never free of bugs. We just reduce the probability of bugs through (unit / integration / etc.) testing. So like TryCatchFinally statements, a Sequence allows you to add a Catch fallback sequence for when the main sequence you are inspecting fails for some reason.

You can also attach another type of catch clause which handles errors in the classical sense. You catch it, and then continue on with the provided sequence as if it didn’t happen. Finally() can also be used in the same way as you’re used to.

Another interesting thing is the Retry(N) function which you can call to retry listening to an observable. This retries N times until it either completes or fails N times. Pretty useful stuff 🙂 I’ve implemented some retry mechanisms in many occasions. So it’s nice they provide it out of the box.

Next Dmitri dives into Sequence Combinators. This is where things got really interesting. He started of with the Concat() function which waits for both sequences to complete and concatinates them sequences together. He also explained the Repeat(N) function, which does exactly what you think it does.

There’s also a StartWith(params) to prepend a sequence with a specific set of values. So if you need to pre-load some sequence you can use this. Another interesting one is the Amb function which stands for ambiguous. This function waits on multiple sequences and takes the one that produces a value first. This is sort of like the Task.WhenAny() function. CombineLatest() is also a nice one. You can use this for taking last value of selected streams.

For merging, things start to become a bit more complex. There is a simple Merge function if you want to merge two or more streams. But if you need some kind of pair-wise merging you can use Zip(). This however works only with two streams. If you want to Zip more than two streams you need to use And-Then-When(), which I’m not even going to try to explain. Since with text-only it’s gonna be a mess. You really need to check the API yourself or check this for a deep-dive.

The latter part of the video was about some time-related sequence processing. These are pretty cool as well. So first there is the classical buffer, which can also be a sliding-buffer. These are pretty useful and commonly used in a stream/channel context. Delay() is another one, which delays the entire sequence for a given time. Sample() is cool to; this returns a value to you at a specific frequency. Timeout() generates an error when there is no value in the stream before the timeout ends. This is similar to timeouts on for example HTTP requests. And the last one is Throttle() which can be used to catch values within a specified wait window. If you have used UniRX before, you have probably used Throttle to catch double clicks or tabs in your UI.

 

Conclusion

So there we have it! Another great course on Udemy. This time about reactive programming in C#. I really enjoyed this and definitely learned a couple of cool things in spite of reactive programming not being new to me.

My goal was to get some inspiration about how to implement this logic myself. In my current implementation I have not yet needed much of the reactive extensions on my EventBus. I did add a lot of custom awaiters for certain tasks but they don’t relate to reactive directly. But it’s fairy easy to add such functions now I think.

So in the end I think this course was totally worth it. It’s a clean and clear course. Dmitri explains everything very well and provides examples with everything that he does and tells. If you are searching for some information about reactive programming in .Net, this will surely help you a lot!

Cya!

 

#end

01010010 01110101 01100010 01100101 01101110

Hey, sorry to bother you but you can subscribe to my blog here.

Never miss a blog post!

You have Successfully Subscribed!