Rx.NET, part of ReactiveX, contains Buffer
operator which does:
periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
There are several overloads to configure how items are buffered and passed to observers. You can restrict buffer to the size, to the time since last emission or both.
Concurrency
When it comes to concurrency - ReactiveX is single-threaded by default.
This is true for Buffer
, one of few operators which introduces multi-threading implicitly (when emitting buffered items from Timer's thread). Every operation to receive new event (OnNext
), publish it to subscribers (Tick
), OnError
or OnCompleted
is guarded by lock
:
public override void OnError(Exception error)
{
lock (_gate)
{
_s.Clear();
ForwardOnError(error);
}
}
This makes sure there is no concurrency between receiving events, publishing them or handling subject events, which is consistent with the rest of Rx operators.
We can easily prove that even though publisher and consumer are running on separate threads they are not executing concurrently:
public static void Log(string message, ConsoleColor colour)
{
Console.ForegroundColor = colour;
Console.WriteLine($"{DateTime.Now.ToString("hh:MM:ss.fffff")} [{Thread.CurrentThread.ManagedThreadId}]: {message}");
Console.ResetColor();
}
public static void Test()
{
var subject = new Subject<IEnumerable<int>>();
subject
.Buffer(TimeSpan.FromSeconds(5), 50)
.SelectMany(i => i)
.Do(b => {
Log($"[2.1] START batch {b.First()}", ConsoleColor.Yellow);
Thread.Sleep(TimeSpan.FromSeconds(3));
Log($"[2.2] END batch {b.First()}", ConsoleColor.Yellow);
})
.Retry()
.Subscribe();
for (int i = 0; i < Int32.MaxValue; i++)
{
Log($"[1.1] START publish {i}", ConsoleColor.Green);
subject.OnNext(Enumerable.Range(i, 10));
Thread.Sleep(TimeSpan.FromSeconds(1));
Log($"[1.1] END publish {i}", ConsoleColor.Green);
}
}
Above code would publish new event and wait 1 second, while Buffer
would batch those events and emit them every 5 seconds. Consumer code (Do
action) would emulate slow processing through 3 second sleep. Running the test produces following output:
We can clearly see that slow subscriber is blocking emission of event to the Buffer
even though both processes are running on different threads: 1 for publisher and 4 for consumer.
Introducing concurrency with ObserveOn
If you would like to allow buffering events independently to subscribers, all you have to do is to create new Scheduler
on which events would be observed:
var scheduler = NewThreadScheduler.Default;
and pass it to ObserveOn
method:
subject
.ObserveOn(scheduler)
.Buffer(TimeSpan.FromSeconds(3), 50)
.SelectMany(i => i)
.Do(b => {
Log($"[2.1] START batch {b.First()}", ConsoleColor.Yellow);
Thread.Sleep(TimeSpan.FromSeconds(3));
Log($"[2.2] END batch {b.First()}", ConsoleColor.Yellow);
})
.Retry()
.Subscribe();
Now publisher side is not affected by slow consumer:
Final notes
One thing to remember is that every event stored in the Buffer
takes up memory. If subscriber is notoriously slower then publisher then it would lead to increased memory consumption with time, and potentially OutOfMemory
exceptions.
References
-
ReactiveX - official website
-
Buffer operator documentation
-
Introduction to Rx by Lee Campbell