Shown below is a requirement.
[TestFixture] | |
public class TestEventThrottling | |
{ | |
[Test] | |
public void EventsAreThrottled() | |
{ | |
var fq = new Frequent(); | |
int counter = 0; | |
fq.PropertyChanged += (s, e) => { counter++; }; | |
foreach (var prop in Enumerable.Range(0, 200)) | |
fq.Property = prop.ToString(); | |
Thread.Sleep(1000); | |
Assert.AreEqual(1, counter, "Since throttled at 1/sec"); | |
} | |
} |
Let me explain a little about what I want to achieve. I have an entity called “Frequent” which implements INotifyPropertyChanged interface. So I can subscribe to the change notification and then upon firing the event, I simply increment a counter. Now, my requirement is that the PropertyChanged can fire way too quickly as simulated in the for-loop. So in UI applications, it doesn’t make sense for me to fire property changed events that frequently as in a real world intensive WPF application, this can become your biggest bottleneck (among several others things).
So my requirement is that, within 1 second, I would like to receive only 1 change notification per property, if it has changed indeed. The simple implementation for this would be to do something like pushing the properties that have changed into a queue and process the queue once a second and then raise change notification event. This can be done very easily but then it can be a little tedious job for such a requirement to restrict an event from firing more than X times a second.
So I was looking at the Reactive Extensions (Rx Framework) and it occurred to me that it should support Observer pattern out of the box. So I started playing with it but hit a road block immediately. There are numerous resources that shows how to generate observables from Timers, Time Intervals, Enumerables, etc but what I want to is to take advantage of extension methods supported by Rx Framework on IObserver such as Throttle(), Buffer(), Window(), etc on a simple Pub-Sub system. May be I did not look hard enough but I could not find a simple example. So I thought it would be helpful for rest of the people like me if I made a blog post.
In my implementation, the Frequent object is by itself a Publisher and a Subscriber. In Rx terms – IObservable and IObserver. Instead of implementing these interfaces, I want to have some observable of strings where I will publish a property that has changed and the subscriber on the observable would receive it. So for that purpose of simple message passing between Observers and Observables, you can use a Subject<T>. There are other variants like ReplaySubject<T>, BehaviorSubject<T>, AsyncSubject<T>, etc. but that is out of scope of this post. I publish on Subject<T> and then the subscribers who subscribed earlier to it would be notified of these messages.
Shown below is a the Frequent class implementation.
/// Krishna Vangapandu | |
public class Frequent : INotifyPropertyChanged | |
{ | |
private string _property; | |
private readonly StringMessageThrottler _stringMessageThrottler; | |
public Frequent() | |
{ | |
//Create a new message throttler. The implementation for this would be shown later. | |
_stringMessageThrottler = new StringMessageThrottler(); | |
//subscribe on the throttler and when received a string, raise property changed event. | |
_stringMessageThrottler.Subscribe(s => PropertyChanged(this, new PropertyChangedEventArgs(s))); | |
} | |
public string Property | |
{ | |
get { return _property; } | |
set | |
{ | |
_property = value; | |
OnPropertyChanged("Property"); | |
} | |
} | |
public event PropertyChangedEventHandler PropertyChanged = delegate { }; | |
protected void OnPropertyChanged(string property) | |
{ | |
//when a property changes, simply publish it on the throttler. | |
_stringMessageThrottler.Publish(property); | |
} | |
} |
Take a minute or two to go through the simple class, it is self explanatory. All I am doing is that when a property changes, instead of firing it immediately, I simply notify my throttler about the property that has changed. The throttler’s responsibility is to determine when to notify the subscribers about this change. Shown below is the implementation.
public class StringMessageThrottler | |
{ | |
//Subject acts as the message bus. It is a IObservable<string> in our case. | |
private readonly ISubject<string> _eventObservables = new Subject<string>(); | |
private readonly IObservable<string> _distinctPropertyChanged; | |
public StringMessageThrottler() | |
{ | |
var eventsAggregatedForHalfSecond = _eventObservables.Buffer(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool); | |
//get the unique properties that were changed within the buffered time | |
_distinctPropertyChanged = eventsAggregatedForHalfSecond.SelectMany(s=>s.Distinct()); | |
} | |
public IDisposable Subscribe(Action<string> invoke) | |
{ | |
//When you subscribe, subscribe on the final observable chain. | |
return _distinctPropertyChanged.Subscribe(invoke); | |
} | |
public void Publish(string s) | |
{ | |
//This is stupid that _eventObservables.Publish is not what is expected. | |
//Instead to publish, do a OnNext(s) call. Publish does something entirely different I guess! | |
//But come on guys, IObserver.Publish() -> I expect to publish the damn message | |
_eventObservables.OnNext(s); | |
} | |
} |
You can read my comments that I wrote out of frustrations. All I do in this simple façade is to wrap an Observable –> Subject<string> and buffer the messages received for 1/2 second and then get unique messages from the buffer and publish it to the subscribers. This is all done in the constructor. The other methods are simply my take on making things simple for the consumer of this class.
Rx is complex and powerful but in my opinion it has a very steep learning curve. But it surely did save a good few days of implementation for my team. Again, I am very new to Rx, so if you have some better ways to do it, then please let me know.