Reactive Extensions (Rx) Data Streaming

You have probably heard about Reactive Extensions, a library from Microsoft that greatly simplifies working with asynchronous data streams and allows to query them with LINQ operators.In my previous post I briefly discussed about loading data via entity framework asynchronously however it lacks getting chunks of data. This post demonstrates how to use Reactive Extensions for loading data from database asynchronously in chunks covering brief around Reactive extension library.

Reactive extensions (Rx)
The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.Data sequences can take many forms, such as a stream of data from a file or web service, web services requests, system notifications, or a series of events such as user input. Reactive Extensions represents all these data sequences as observable sequences. An application can subscribe to these observable sequences to receive asynchronous notifications as new data arrive. The Rx library is available for desktop application development in .NET. It is also released for Silverlight, Windows Phone 7 and JavaScript.

Reactive programming allows you to turn those aspects of your code that are currently imperative into something much more event-driven and flexible.Reactive programming can be applied to a range of situations—from WPF applications to Windows Phone apps—to improve coding efficiency and boost performance.

Code snippets to asynchronous load data via entity framework in batches of 200 records.

public void LoadPostCodes()
{
 btnStatus.Content = "Started";
 listBox1.Items.Clear();
 (from p in cx.MAS_PostCode select p)
 .ToObservable(Scheduler.NewThread)
 .Buffer(200)
 .ObserveOn(SynchronizationContext.Current)
 .Subscribe(ld =>
 {
 foreach (var item in ld)
 {
 ListBoxItem litem = new ListBoxItem();
 litem.Content = string.Format("{0} {1}", item.PC_PostCode, item.PC_Address1);
 listBox1.Items.Add(litem);
 listBox1.ScrollIntoView(litem);
 }
 button1.Content = listBox1.Items.Count.ToString();
 },
 () => { btnStatus.Content = "Finished"; }
 );
}

Have finished writing MVVM(Nano View model) based scenario & Rx implementations …

Will write soon and publish code..

Regards
Rajnish

2 thoughts on “Reactive Extensions (Rx) Data Streaming

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s