Blog

Selenium WebDriver – Get Browser Hwnd

While designing some UI automation framework API, which can work with browser and other systems like window, I have to control the z order or IExplore launched by selenium web driver.

 

However out of box, selenium does not expose driver process id or Browser hwnd.

The attached solution with some extensions is able to get the browser hwnd..

The basic logic is

  • When driver is initialized, get the url for hub and extract the port number
  • From port number, find the process id which is using this port for listening, ie. PID of driver
  • After navigation, from all instances of iexplore find the parent PID matches the pid of driver, i.e browser pid.
  • Get the Hwnd of browser pid
Below is some code highlights
// Interface to extend the web driver and expose ProcessId of driver
public interface IWebDriverEx: IWebDriver
{
/// <summary>
/// Get the process id of driver
/// </summary>
/// <returns>process id</returns>
int ProcessId { get; }
}
and few extension methods
public static IntPtr GetBrowserHwnd(this IWebDriverEx driver);
public static void BringToFront(this IWebDriverEx driver);
for more details, see attached sample (download), after launch go to console and press any key to bring the browser in front…

Testing with Dispatcher

The threading model in the Windows Presentation Foundation (WPF) is based on the concept of a “dispatcher”. Every thread has a Dispatcher object associated with it. Dispatcher is responsible for policing cross-thread operations and provides a way for threads to marshal method invocations to each other, via its BeginInvoke and Invoke methods.

Dispatcher contains the WPF version of a Windows message queue and message pump. It provides a prioritized message queue, from which the message pump can pull messages and process them. Most WPF classes have an affinity for the thread on which they are instantiated. That thread affinity is based on an association between the object and the thread’s Dispatcher.

Using IDispatcher in your view models

 /// <summary>
 /// The Dispatcher interface.
 /// </summary>
 public interface IDispatcher
 {
 /// <summary>
 /// The invoke.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 void Invoke(Delegate callback);

 /// <summary>
 /// The invoke.
 /// </summary>
 /// <param name="method">
 /// The method.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <param name="args">
 /// The args.
 /// </param>
 /// <returns>
 /// The <see cref="object"/>.
 /// </returns>
 object Invoke(Delegate method, DispatcherPriority priority, params object[] args);

 /// <summary>
 /// The begin invoke.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation BeginInvoke(Delegate callback);

 /// <summary>
 /// The begin invoke.
 /// </summary>
 /// <param name="method">
 /// The method.
 /// </param>
 /// <param name="args">
 /// The args.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation BeginInvoke(Delegate method, params object[] args);

 /// <summary>
 /// The begin invoke.
 /// </summary>
 /// <param name="method">
 /// The method.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <param name="args">
 /// The args.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation BeginInvoke(Delegate method, DispatcherPriority priority, params object[] args);

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation InvokeAsync(Action callback);

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation InvokeAsync(Action callback, DispatcherPriority priority);

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <typeparam name="TResult">type of result
 /// </typeparam>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation<TResult> InvokeAsync<TResult>(Func<TResult> callback);

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <typeparam name="TResult">type of result
 /// </typeparam>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 DispatcherOperation<TResult> InvokeAsync<TResult>(Func<TResult> callback, DispatcherPriority priority);
 }

The real dispatcher

public class DispatcherFactory
 {
 public static IDispatcher GetDispatcher()
 {
 return Application.Current != null ? new AppDispatcher(Application.Current.Dispatcher) : new AppDispatcher();
 }

 private class AppDispatcher : IDispatcher
 {
 private readonly Dispatcher _safeDispatcher;
 private readonly Dispatcher _currentDispatcher;

 public AppDispatcher(Dispatcher dispatcher):this()
 {
 this._safeDispatcher = dispatcher ?? this._currentDispatcher;
 }

 public AppDispatcher()
 {
 this._currentDispatcher = Dispatcher.CurrentDispatcher;
 this._safeDispatcher = this._currentDispatcher;
 }

 private Dispatcher Dispatcher
 {
 get { return this._currentDispatcher.CheckAccess() ? this._currentDispatcher : this._safeDispatcher; }
 }

 void IDispatcher.Invoke(Delegate callback)
 {
 this.Dispatcher.Invoke(callback);
 }

 object IDispatcher.Invoke(Delegate method, DispatcherPriority priority, params object[] args)
 {
 return this.Dispatcher.Invoke(method, priority, args);
 }

 DispatcherOperation IDispatcher.BeginInvoke(Delegate callback)
 {
 return this.Dispatcher.BeginInvoke(callback);
 }

 DispatcherOperation IDispatcher.BeginInvoke(Delegate method, params object[] args)
 {
 return this.Dispatcher.BeginInvoke(method, args);
 }

 DispatcherOperation IDispatcher.BeginInvoke(Delegate method, DispatcherPriority priority, params object[] args)
 {
 return this.Dispatcher.BeginInvoke(method, priority, args);
 }


 DispatcherOperation IDispatcher.InvokeAsync(Action callback)
 {
 return this.Dispatcher.InvokeAsync(callback);
 }

 DispatcherOperation IDispatcher.InvokeAsync(Action callback, DispatcherPriority priority)
 {
 return this.Dispatcher.InvokeAsync(callback, priority);
 }

 DispatcherOperation<TResult> IDispatcher.InvokeAsync<TResult>(Func<TResult> callback)
 {
 return this.Dispatcher.InvokeAsync(callback);
 }

 DispatcherOperation<TResult> IDispatcher.InvokeAsync<TResult>(Func<TResult> callback, DispatcherPriority priority)
 {
 return this.Dispatcher.InvokeAsync(callback, priority);
 }
 }
 }

The testing dispatcher

 /// <summary>
 /// The stub dispatcher.
 /// </summary>
 public class TestingDispatcher : IDispatcher
 {
 private readonly Dispatcher currentDispatcher;

 /// <summary>
 /// Initializes a new instance of the <see cref="TestingDispatcher"/> class.
 /// </summary>
 public TestingDispatcher()
 {
 this.currentDispatcher = Dispatcher.CurrentDispatcher;
 }

 /// <summary>
 /// The invoke.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 public void Invoke(Delegate callback)
 {
 try
 {
 this.currentDispatcher.Invoke(callback);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The invoke.
 /// </summary>
 /// <param name="method">
 /// The method.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <param name="args">
 /// The args.
 /// </param>
 /// <returns>
 /// The <see cref="object"/>.
 /// </returns>
 public object Invoke(Delegate method, DispatcherPriority priority, params object[] args)
 {
 try
 {
 return this.currentDispatcher.Invoke(method, priority, args);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The begin invoke.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation BeginInvoke(Delegate callback)
 {
 try
 {
 return this.currentDispatcher.BeginInvoke(callback);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The begin invoke.
 /// </summary>
 /// <param name="method">
 /// The method.
 /// </param>
 /// <param name="args">
 /// The args.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation BeginInvoke(Delegate method, params object[] args)
 {
 try
 {
 return this.currentDispatcher.BeginInvoke(method, args);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The begin invoke.
 /// </summary>
 /// <param name="method">
 /// The method.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <param name="args">
 /// The args.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation BeginInvoke(Delegate method, DispatcherPriority priority, params object[] args)
 {
 try
 {
 return this.currentDispatcher.BeginInvoke(method, priority, args);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation InvokeAsync(Action callback)
 {
 try
 {
 return this.currentDispatcher.InvokeAsync(callback);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation InvokeAsync(Action callback, DispatcherPriority priority)
 {
 try
 {
 return this.currentDispatcher.InvokeAsync(callback, priority);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <typeparam name="TResult">type of result
 /// </typeparam>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation<TResult> InvokeAsync<TResult>(Func<TResult> callback)
 {
 try
 {
 return this.currentDispatcher.InvokeAsync(callback);
 }
 finally
 {
 this.DoEvents();
 }
 }

 /// <summary>
 /// The invoke async.
 /// </summary>
 /// <param name="callback">
 /// The callback.
 /// </param>
 /// <param name="priority">
 /// The priority.
 /// </param>
 /// <typeparam name="TResult">type of result
 /// </typeparam>
 /// <returns>
 /// The <see cref="DispatcherOperation"/>.
 /// </returns>
 public DispatcherOperation<TResult> InvokeAsync<TResult>(Func<TResult> callback, DispatcherPriority priority)
 {
 try
 {
 return this.currentDispatcher.InvokeAsync(callback, priority);
 }
 finally
 {
 this.DoEvents();
 }
 }

 private void DoEvents()
 {
 DispatcherFrame frame = new DispatcherFrame();
 this.currentDispatcher.BeginInvoke(new Action(() => frame.Continue = false), DispatcherPriority.ContextIdle);
 Dispatcher.PushFrame(frame);
 }
 }

ViewModel First – ViewCaching

With View model first approach, view are usually created via DataTemplate, and each time a view model is injected in the content control, the corresponding view is recreated.

If you have complex view where it take bit time to create view, you may see the performance hits. To avoid the performance hit you may want to cache the view and use the cached view when available. This can be done via creating CacheContentControl and you choice of view factory.

The basic idea is to wrap the view inside CacheContentControl and delegate the view creation logic to your view factory. The attached sample uses weakreference view factory,say if GC has not collected the view, you may use the cached view instead of creating view each time. This will work with controls like tab control docking controls etc.

The CacheContentControl  code

public class CacheContentControl : ContentControl
    {
        public CacheContentControl()
        {
            Unloaded += ViewCache_Unloaded;
            ViewFactory = WeakReferenceViewFactory.Instance;
        }

        void ViewCache_Unloaded(object sender, RoutedEventArgs e)
        {
            Content = null;
        }

        private Type _contentType;
        public Type ContentType
        {
            get { return _contentType; }
            set
            {
                _contentType = value;
               //  use you favorite factory
                Content = ViewFactory.GetView(value);
            }
        }

        public IViewFactory ViewFactory { get; private set; }
    }

In the DataTemplate, use the ViewCache, pass the type of the real view you want to use:

<DataTemplate DataType=”{x:Type moduleB:Panel3Vm}”>

<Border Background=”Green”>

<local:CacheContentControl ContentType=”{x:Type moduleB:Pane3 }” Margin=”5″/>

</Border>

</DataTemplate>

The example uses standard data template (red option) to demonstrate each time you navigate to panes the corresponding views are recreated with new hash code.

If you select CacheContentControl (green option) and navigate the view is only created once. If you click on button to collected the GC then view are created on navigation if they are claimed by GC.

Why ViewModel first ?

I prefer to use view model first approach. For many reasons:

  • Vms are your application containing most of logic apart from glue code in form of behaviors or triggers.
  • If you creates views then you are responsible for its life and cleanup code. You have to deal with threading and other issues which are difficult to test. On the other hand of you create vms and leave the view creation logic with WPF via data template.. you don’t have to worry about threading issue. And there will be better separation of concerns.
  • With vm first approach zero code behind.
  • With a project level isolation for view and vms you can restrict developers using view specific things like dispatcher in the view model leaving more cleaner and testable code base. I.e view project sprojec to vm. And vm project should not refer to any presentation lib.
  • If there is clear boundary between view and vm. Both can evolve and will be less fragile.
  • Allows more complete testing of logic to open new Views and ViewModels
  • Tends to be DRYer (don’t repeat yourself ) as applications get larger
  • View and ViewModel are more independent and can be worked on separately more easily.

For more details see attached source code (download and rename docx to zip)

ViewCache Source Code

RadDocking With Prism + Mvvm

Features

  • Support MVVM with Prism
  • Supports ViewModel First or ViewFirst Approach
  • Supports styling RadPane in xaml rather than DockingPansFactory
  • Code RadDocking – SourceCode

The XAML

  • Defiine the region names at RadPaneGroup level
  • Set Rad Panel Style via ItemContainerStyle
  • Bind Header property of RadPane with Title Property in View Model
 <Window.Resources>
 <Style x:Key="RadPaneFactoryStyle" TargetType="{x:Type telerik:RadPane}">
 <Setter Property="Header" Value="{Binding Title}"/>
 </Style>
 </Window.Resources>
 <Grid>
 <telerikDocking:RadDocking>
 <telerikDocking:RadDocking.DocumentHost>
 <telerikDocking:RadSplitContainer >
 <telerikDocking:RadPaneGroup regions:RegionManager.RegionName="MainRegion"
 ItemContainerStyle="{StaticResource RadPaneFactoryStyle}"
 >
 
 </telerikDocking:RadPaneGroup>
 </telerikDocking:RadSplitContainer>
 </telerikDocking:RadDocking.DocumentHost>

 <telerikDocking:RadSplitContainer InitialPosition="DockedRight">
 <telerikDocking:RadPaneGroup regions:RegionManager.RegionName="MainRegion1" >
 
 </telerikDocking:RadPaneGroup>
 </telerikDocking:RadSplitContainer>
 
 </telerikDocking:RadDocking> 
 </Grid>

Configure Region Adapter Mappings in Bootstrap

protected override RegionAdapterMappings ConfigureRegionAdapterMappings()
 {
 var mappings = base.ConfigureRegionAdapterMappings();

 mappings.RegisterMapping(typeof(RadPaneGroup), this.Container.Resolve<RadPaneGroupRegionAdapter>());

 return mappings;
 }

Define the RadPaneGroup RegionAdapter

 
Behavior
public class RadPaneGroupRegionAdapter : RegionAdapterBase<RadPaneGroup>
 {
 public RadPaneGroupRegionAdapter(IRegionBehaviorFactory regionBehaviorFactory)
 : base(regionBehaviorFactory)
 {
 }

 protected override void Adapt(IRegion region, RadPaneGroup regionTarget)
 {
 var docking = regionTarget.ParentOfType<RadDocking>();
 if (docking.DockingPanesFactory == null)
 {
 docking.DockingPanesFactory = new CustomDockingPanesFactory();
 }

 var store = DocumentsStoreAttachment.GetPanesStore(docking);
 if (store == null)
 {
 store = new PanesStore();
 DocumentsStoreAttachment.SetPanesStore(docking, store);
 BindingOperations.SetBinding(
 docking,
 RadDocking.PanesSourceProperty,
 new Binding("PanesSource") { Source = store });
 }
 }

 protected override IRegion CreateRegion()
 {
 return new SingleActiveRegion();
 }

 protected override void AttachBehaviors(IRegion region, RadPaneGroup regionTarget)
 {
 if (region == null)
 throw new System.ArgumentNullException("region");

 var docking = regionTarget.ParentOfType<RadDocking>();
 
 // Add the behavior that syncs the items source items with the rest of the items
 region.Behaviors.Add(RadPaneGroupSyncBehavior.BehaviorKey, new RadPaneGroupSyncBehavior()
 {
 HostControl = regionTarget,
 Docking = docking
 });

 base.AttachBehaviors(region, regionTarget);
 }

 private class PanesStore
 {
 public readonly ObservableCollection<object> Panes = new ObservableCollection<object>();
 ReadOnlyObservableCollection<object> _readonlyDocumentsList;
 public ReadOnlyObservableCollection<object> PanesSource
 {
 get
 {
 return _readonlyDocumentsList ??
 (_readonlyDocumentsList = new ReadOnlyObservableCollection<object>(Panes));
 }
 }
 }

 private static class DocumentsStoreAttachment
 {
 public static readonly DependencyProperty PanesStoreProperty = DependencyProperty.RegisterAttached(
 "PanesStore",
 typeof(PanesStore),
 typeof(DocumentsStoreAttachment),
 new PropertyMetadata(null)
 );
 public static void SetPanesStore(UIElement element, PanesStore value)
 {
 element.SetValue(PanesStoreProperty, value);
 }
 public static PanesStore GetPanesStore(UIElement element)
 {
 return (PanesStore)element.GetValue(PanesStoreProperty);
 }
 }

 private class CustomDockingPanesFactory : DockingPanesFactory
 {
 public RadPaneGroup ActiveGroup { get; internal set; }

 protected override void AddPane(RadDocking radDocking, RadPane pane)
 {
 var group = ActiveGroup;

 pane.SetValue(RadPane.HeaderProperty, DependencyProperty.UnsetValue);

 var style = group.ItemContainerStyle;
 if (style != null)
 {
 pane.SetValue(RadPane.StyleProperty, style);
 }
 group?.Items.Add(pane);
 }
 }

 private class RadPaneGroupSyncBehavior : RegionBehavior, IHostAwareRegionBehavior
 {
 public static readonly string BehaviorKey = "RadPaneGroupSyncBehavior";
 private bool _updatingActiveViewsInManagerActiveContentChanged;
 private RadPaneGroup _radPaneGroup;
 private PanesStore _documentStore;

 public DependencyObject HostControl
 {
 get
 {
 return _radPaneGroup;
 }

 set
 {
 _radPaneGroup = value as RadPaneGroup;
 }
 }



 public RadDocking Docking { get; internal set; }

 /// <summary>
 /// Starts to monitor the <see cref="IRegion"/> to keep it in synch with the items of the <see cref="HostControl"/>.
 /// </summary>
 protected override void OnAttach()
 {
 bool itemsSourceIsSet = _radPaneGroup.ItemsSource != null;


 if (itemsSourceIsSet)
 {
 throw new InvalidOperationException();
 }

 SynchronizeItems();

 Docking.ActivePaneChanged += ManagerActiveContentChanged;
 Region.ActiveViews.CollectionChanged += ActiveViews_CollectionChanged;
 Region.Views.CollectionChanged += Views_CollectionChanged;
 Docking.Close += _dockingManager_DocumentClosed;

 }

 private void _dockingManager_DocumentClosed(object sender, StateChangeEventArgs e)
 {
 foreach (var pane in e.Panes)
 {
 if (Region.Views.Contains(pane.Content))
 {
 Region.Remove(pane.Content);
 }
 }

 }


 private void Views_CollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
 {
 if (e.Action == NotifyCollectionChangedAction.Add)
 {
 int startIndex = e.NewStartingIndex;

 foreach (var newItem in e.NewItems)
 {
 var panesFactory = ((CustomDockingPanesFactory)Docking.DockingPanesFactory);
 panesFactory.ActiveGroup = _radPaneGroup;
 _documentStore.Panes.Insert(0, newItem);
 }

 }
 else if (e.Action == NotifyCollectionChangedAction.Remove)
 {
 foreach (object oldItem in e.OldItems)
 {
 _documentStore.Panes.Remove(oldItem);
 }
 }
 }

 private void SynchronizeItems()
 {
 var store = DocumentsStoreAttachment.GetPanesStore(Docking);



 _documentStore = store;
 foreach (object view in Region.Views)
 {
 ((CustomDockingPanesFactory)Docking.DockingPanesFactory).ActiveGroup = _radPaneGroup;
 _documentStore.Panes.Insert(0, view);

 }
 }

 private void ActiveViews_CollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
 {
 if (_updatingActiveViewsInManagerActiveContentChanged)
 {
 return;
 }

 if (e.Action == NotifyCollectionChangedAction.Add)
 {
 if (Docking.ActivePane != null
 && Docking.ActivePane != e.NewItems[0]
 && Region.ActiveViews.Contains(Docking.ActivePane.Content))
 {
 Region.Deactivate(Docking.ActivePane.Content);
 }

 var pane = Docking.Panes.FirstOrDefault(x => x.Content.Equals(e.NewItems[0]));
 Docking.ActivePane = pane;
 }
 else if (e.Action == NotifyCollectionChangedAction.Remove &&
 e.OldItems.Contains(Docking.ActivePane))
 {
 Docking.ActivePane = null;
 }
 }

 private void ManagerActiveContentChanged(object sender, EventArgs e)
 {
 try
 {
 _updatingActiveViewsInManagerActiveContentChanged = true;

 if (Docking.Equals(sender))
 {
 var activePane = Docking.ActivePane;
 if (activePane != null)
 {
 var activeViews = Region.ActiveViews.Where(it => it != activePane.Content).ToList();
 foreach (var item in activeViews)
 {
 Region.Deactivate(item);
 }

 if (Region.Views.Contains(activePane.Content) && !Region.ActiveViews.Contains(activePane.Content))
 {
 Region.Activate(activePane.Content);
 }
 }



 }
 }
 finally
 {
 _updatingActiveViewsInManagerActiveContentChanged = false;
 }
 }

 }
 }

TPL Dataflow – Concurrent Programming

TPL DataFlow

TPL Dataflow is an in-process actor library on top of the Task Parallel Library enabling more robust concurrent programming.

Parallel computing is a form of computation in which multiple operations are carried out simultaneously.Parallel computing is closely related to asynchronous programming, using many of the same core concepts and support. Asynchronous programming is an approach to writing code that involves invoking operations such that they don’t block the current thread of execution.Many personal computers and workstations have two or four or 8 cores (that is, CPUs) that enable multiple threads to be executed simultaneously. Computers in the near future are expected to have significantly more cores. To take advantage of the hardware of today and tomorrow, you can parallelize your code to distribute work across multiple processors. In the past, parallelization required low-level manipulation of threads and locks.

The purpose of the TPL is to make developers more productive by simplifying the process of adding parallelism and concurrency to applications. The TPL scales the degree of concurrency dynamically to most efficiently use all the processors that are available. In addition, the TPL handles the partitioning of the work, the scheduling of threads on the ThreadPool, cancellation support, state management, and other low-level details. By using TPL, you can maximize the performance of your code while focusing on the work that your program is designed to accomplish.

Data parallelism refers to scenarios in which the same operation is performed concurrently (that is, in parallel) on elements in a source collection or array. In data parallel operations, the source collection is partitioned so that multiple threads can operate on different segments concurrently.

The Task Parallel Library (TPL) is based on the concept of a task, which represents an asynchronous operation. In some ways, a task resembles a thread or ThreadPool work item, but at a higher level of abstraction. The term task parallelism refers to one or more independent tasks running concurrently. Tasks provide two primary benefits:More efficient and more scalable use of system resources & More programmatic control than is possible with a thread or work item.

The concurrency models we has discussed so far have the notion of shared state (data) in common.Shared state can be accessed by multiple threads at the same time and must be thus protected, either by locking or by using transactions. Both, mutability and sharing of state are not just inherent for these models, they are also inherent for the complexities.Unfortunately, programmers have found it very difficult to reliably build robust multi-threaded applications using the shared data and locks model, especially as applications grow in size and complexity.Making things worse, testing is not reliable with multi-threaded code. Since threads are non-deterministic, you might successfully test a program one thousand times, yet still the program could go wrong the first time it runs on a customer’s machine.

We now have a look at an entirely different approach that bans the notion of shared state altogether. State is still mutable, however it is exclusively coupled to single entities that are allowed to alter it, so-called actors.The actor model in computer science is a mathematical model of concurrent computation that treats “actors” as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.For communication, the actor model uses asynchronous message passing. In particular, it does not use any intermediate entities such as channels. Instead, each actor possesses a mailbox and can be addressed. These addresses are not to be confused with identities, and each actor can have no, one or multiple addresses. When an actor sends a message, it must know the address of the recipient. In addition, actors are allowed to send messages to themselves, which they will receive and handle later in a future step.

The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications. These dataflow components are collectively referred to as the TPL Dataflow Library. This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks.The TPL Dataflow Library provides a foundation for message passing and parallelizing CPU-intensive and I/O-intensive applications that have high throughput and low latency. It also gives you explicit control over how data is buffered and moves around the system.

If you want to scale your application beyond single machine or process then ServiceBus (NServiceBus, Microsoft Azure, etc) are the best candidates. These are designed around message oriented architecture and you can achieve highly reliable, available and scalable application. As an architect i always focus on reliability, after all, a highly available and scalable service that produces unreliable results isn’t very valuable. – We will need another post to cover the in-depth of service bus..

TPL Dataflow (TDF) is a library for building concurrent applications. It promotes actor/agent-oriented designs through primitives for in-process message passing, dataflow, and pipelining. I have been playing with dataflow since its CTP was released and i found its very use in cases where you have to process data in form of a pipeline.With just few in-build blocks you can easily and quickly build concurrent app..

The primitive blocks provided by dataflow are

  • Buffering Blocks – Holds data for use by data consumers.
    • BufferBock(T) – FIFO queue of message that can be written to multiple sources or read from by multiple targets.
    • BroadcastBlock(T) – Used when you must pass multiple messages to another component.
    • WriteOnceBlock(T) – similar to broadcastblock except object can be written to one time only
  • Execution Block – call a user provided delegate for each piece of received data
    • ActionBlock(t) – calls a delegate when it receives a data – excepts synchronous or asynchronous delegates
    • TransformBlock(Tinout,TOutput) – call function delegates to transform the incoming message to another type- excepts synchronous or asynchronous delegates
    • TransformManyBlock(TInput , TOutput) – similar to TransformBlock except it can produce zero or more output values for each input value, instead of only one output value for each input value. – excepts synchronous or asynchronous delegates

Degree of Parallelism

Every ActionBlock<TInput>, TransformBlock<TInput, TOutput>, and TransformManyBlock<TInput, TOutput> object buffers input messages until the block is ready to process them. By default, these classes process messages in the order in which they are received, one message at a time. You can also specify the degree of parallelism to enable ActionBlock<TInput>, TransformBlock<TInput, TOutput> and TransformManyBlock<TInput, TOutput> objects to process multiple messages concurrently.

Now lets look at the implementation details of a web crawler

Request Buffer
|
PageDownload
/ Save     ParseLink
|
RaiseLinkFound

The messages to download a Url is received in the request buffer which is downloaded by a TranformBlock and converted into type safe page type message. The page message is then broadcasted using Broadcast block, which is further received by save ActionBlock and ParseLink Block. The parse link block parses the urls in the page and if they belong to same page it will raise an event for each url. The consumer of engine will receive the url and if its a new URL it will be posted back to engine.. The save action block will save the page to disk.

This is very basic example but you can see the message based approach is much more simpler than a shared resource + threading approach.

The TPL dataflow is good in case you don’t want to scale the solution beyond single machine as it offer a in process message base approach.With a proper service bus like NServiceBus you can scale out the solution beyond single machine and multiple servers could process the request to achieve the high throughput and off-course with easy to build,maintain clean code base.

In production there are more things you have to take care like logging, error handling, transactions, unexpected failure recovery, dependency injection,loosely coupled components, extensibility, scalability and so on.. Frameworks like NServiceBus provides all these features alone with API to handle more complex business problems.

Download Code : Here (Partially finished but working POC)