Rx.NET, part of ReactiveX, contains Buffer operator which does:

periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time

There are several overloads to configure how items are buffered and passed to observers. You can restrict buffer to the size, to the time since last emission or both.

test

Concurrency

When it comes to concurrency - ReactiveX is single-threaded by default.
This is true for Buffer, one of few operators which introduces multi-threading implicitly (when emitting buffered items from Timer's thread). Every operation to receive new event (OnNext), publish it to subscribers (Tick), OnError or OnCompleted is guarded by lock:

public override void OnError(Exception error)
{
    lock (_gate)
    {
        _s.Clear();
        ForwardOnError(error);
    }
}

This makes sure there is no concurrency between receiving events, publishing them or handling subject events, which is consistent with the rest of Rx operators.

We can easily prove that even though publisher and consumer are running on separate threads they are not executing concurrently:

public static void Log(string message, ConsoleColor colour)
{
    Console.ForegroundColor = colour;
    Console.WriteLine($"{DateTime.Now.ToString("hh:MM:ss.fffff")} [{Thread.CurrentThread.ManagedThreadId}]: {message}");
    Console.ResetColor();
}


public static void Test()
{
    var subject = new Subject<IEnumerable<int>>();

    subject
        .Buffer(TimeSpan.FromSeconds(5), 50)
        .SelectMany(i => i)
        .Do(b => {
            Log($"[2.1]   START batch {b.First()}", ConsoleColor.Yellow);
            Thread.Sleep(TimeSpan.FromSeconds(3));
            Log($"[2.2]   END   batch {b.First()}", ConsoleColor.Yellow);

        })
        .Retry()
        .Subscribe();

    for (int i = 0; i < Int32.MaxValue; i++)
    {
        Log($"[1.1] START publish {i}", ConsoleColor.Green);
        subject.OnNext(Enumerable.Range(i, 10));
        Thread.Sleep(TimeSpan.FromSeconds(1));
        Log($"[1.1] END   publish {i}", ConsoleColor.Green);
    }
}

Above code would publish new event and wait 1 second, while Buffer would batch those events and emit them every 5 seconds. Consumer code (Do action) would emulate slow processing through 3 second sleep. Running the test produces following output:

Result of running Test code

We can clearly see that slow subscriber is blocking emission of event to the Buffer even though both processes are running on different threads: 1 for publisher and 4 for consumer.

Introducing concurrency with ObserveOn

If you would like to allow buffering events independently to subscribers, all you have to do is to create new Scheduler on which events would be observed:

var scheduler = NewThreadScheduler.Default;

and pass it to ObserveOn method:

    subject
        .ObserveOn(scheduler)
        .Buffer(TimeSpan.FromSeconds(3), 50)
        .SelectMany(i => i)
        .Do(b => {
            Log($"[2.1]   START batch {b.First()}", ConsoleColor.Yellow);
            Thread.Sleep(TimeSpan.FromSeconds(3));
            Log($"[2.2]   END   batch {b.First()}", ConsoleColor.Yellow);

        })
        .Retry()
        .Subscribe();

Now publisher side is not affected by slow consumer:

ObserveOn

Final notes

One thing to remember is that every event stored in the Buffer takes up memory. If subscriber is notoriously slower then publisher then it would lead to increased memory consumption with time, and potentially OutOfMemory exceptions.

References