Mixing TPL Dataflow with Reactive Extensions

The TPL Dataflow Library (TDF) from Microsoft DevLabs provides an additional method of managing asynchronous execution in .NET. Fortunately, like many of the other choices, it’s built on the same underlying constructs (primarily the Task object), allowing integration with existing solutions. One of the points of integration is provided by the IObservable and IObserver interfaces which form the basis for the Reactive Extensions.

The basic concept of TDF involves chaining together blocks that do some processing on each data item as it flows through the chain. Blocks can be sources (ISourceBlock) or targets (ITargetBlock) or a combination of both (IPropagatorBlock).

The most straightforward combination of Rx and TDF involves transforming the output of a source block into an observable sequence by using the AsObservable extension method available on ISourceBlock. This allows construction of an arbitrary Dataflow chain that just needs to have some propagating output at the end. This output can then flow smoothly into an arbitrary set of Reactive fluent extension methods to further process the output. This generally terminates with a Subscribe call to executes some action on the resulting data.

    IPropagatorBlock<int,string> source = new TransformBlock<int, string>(i => i.ToString());
    IObservable<int> observable = source.AsObservable().Select(Int32.Parse);
    IDisposable subscription = observable.Subscribe(i => Debug.WriteLine(i));
    // send some data into TDF
    source.Post(138);

A transformation in the opposite direction is also possible but it’s a little more roundabout. The AsObserver extension method on ITargetBlock allows a TDF chain to make itself available for input as an IObserver instance. Since IObserver can be used directly as an IObservable Subscribe target, any Rx sequence can set up a subscription to flow its data into the target block. The integration doesn’t feel quite as smooth in this direction due to the need for setup of the target block ahead of the subscription but it’s still a pretty clean switch.

    IPropagatorBlock<string, int> target = new TransformBlock<string, int>(s => Int32.Parse(s));
    IDisposable link = target.LinkTo(new ActionBlock<int>(i => Debug.WriteLine(i)));
    IObserver<string> observer = target.AsObserver();
 
    IObservable<string> observable = Observable.Range(1, 10).Select(i => i.ToString());
 
    observable.Subscribe(observer);

Now that the transition can be made in either direction the possibilities are wide open. Any data can start out in either the Rx or TDF world and jump back and forth as needed to leverage features of both.

    IObservable<int> originalInts = Observable.Range(1, 10);
 
    IPropagatorBlock<int, int[]> batch = new BatchBlock<int>(2);
    IObservable<int[]> batched = batch.AsObservable();
    originalInts.Subscribe(batch.AsObserver());
 
    IObservable<int> added = batched.Timeout(TimeSpan.FromMilliseconds(50)).Select(a => a.Sum());
 
    IPropagatorBlock<int, string> toString = new TransformBlock<int, string>(i => i.ToString());
    added.Subscribe(toString.AsObserver());
 
    JoinBlock<string, int> join = new JoinBlock<string, int>();
    toString.LinkTo(join.Target1);
 
    IObserver<int> joinIn2 = join.Target2.AsObserver();
    originalInts.Subscribe(joinIn2);
            
    IObservable<Tuple<string, int>> joined = join.AsObservable();
 
    joined.Subscribe(t => Debug.WriteLine("{0};{1}", t.Item1, t.Item2));

One thought on “Mixing TPL Dataflow with Reactive Extensions

Leave a Reply

Your email address will not be published. Required fields are marked *