Sunday, August 17, 2014

Asynchronous Programming in C# – Part 2: Tasks


As discussed in the previous article, though Thread pool helps us up to certain extent of reusing and managing the threads, still it lacks the built in mechanism to know when the operation has finished and what did the thread return back.

So to help programmers to concentrate on the job Microsoft has introduced Task, which is an object that represents some work that should be done. Also Task has the capability to let you know if the work is completed and if a operation returns a result, Task returns you a result.

A Task Scheduler is responsible for starting the Task and managing it. By default the Task Scheduler uses threads from the thread pool to execute the Task.

Basic usage of Task is to make your application being responsive all the time. We make the main thread or thread running the UI free from any background time consuming jobs by creating a Task which internally will be using another thread from the Thread pool to complete the operation. But, note that it helps only in making the application responsive or parallelize your work in order to use the multiple processors available in the computer but does not scale the application.

Task is a part of System.Threading.Tasks name space and it has two forms. One which returns a parameter and another that returns nothing or void.
  • Task<T> which returns a value
  • Task which does not return any value
You can create instances of Task object using a new Task(Action) or using Factory class that creates a instance of Task.

Here is the program for a task (Developed using .NET 4.0) that shows
1) Creation of Task objects
2) Task that returns no value and a Task that returns a value:

using System;
using System.Threading;
using System.Threading.Tasks;
namespace TaskTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Main Thread {0}", Thread.CurrentThread.ManagedThreadId);
            //Create a task using new keyword
            Task t = new Task(() =>
            {
                Console.WriteLine("Task created using instantiation of Task class and passing Action as a parameter");
            });
t.Start();
            //Create a task using Factory
            //Task.Factory gives access to Factory methods to create instances of Task and Task<T>
            Task taskFactory = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Task created using Task.Factory");
            });
            Console.WriteLine("**********************TASK WITH NO RETURN VALUE***********************");
            Console.WriteLine("Before creating a new task that does not return a value");
            Task taskWithNoReturnValue = Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("TASK WITH NO RETURN VALUE RUNNING ON THREAD {0}", Thread.CurrentThread.ManagedThreadId);
                });
            Console.WriteLine("After creating  a task that does not return a value");
            Console.WriteLine("**********************TASK WITH RETURN VALUE***********************");
            Console.WriteLine("Before creating a new task that returns a value");
            Task<int> taskWithReturnValue = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("TASK WITH NO RETURN VALUE RUNNING ON THREAD {0}", Thread.CurrentThread.ManagedThreadId);
                return 6;
            });
            //Console.WriteLine("The return value from the task is {0}", taskWithReturnValue.Result);
            Console.WriteLine("After creating  a task that returns a value");
            Console.Read();
        }
    }
}
Output:
image

new Task(Action action) / Task.Factory.StartNew(Action action) queues the specified work to run on the thread pool and returns a task handle for that work.

Observe from the output of the program that Main Thread Id is 10 and the tasks are running on 12 and 14. Also notice that the messages “Before…” and “after..” are printed first than the operation performed by the Task. This tells that UI thread or Main thread is running in parallel to the Tasks. Also notice that we are not reading the Task return value in the program. We cannot control the flow of Tasks execution as they each run in a parallel mode on a new thread.

Let us see what happens if we read the value 6 from the Task. Uncomment the commented line that reads and displays the Task’s return value. You can get the result of the Task by property “Result”. Check the output now:

image

The main thread has to wait till the Task’s operation is completed and so you find that first, Task return value is displayed and after that “After..” message is displayed. If the Task is not finished, this call will block the current thread.

So, Task.Result will block the thread that is trying to read the result to wait until the task completes its operation and then the next line of code execution takes place.
There is one more method that makes the current thread to wait and this is similar to Thread.Join(). The method is Task.Wait().

Continuation operation to the Task’s operation:
System.Threading.Tasks class has another method called ContinueWith which creates a continuation task that runs another operation to execute as soon as the Task finishes. There are couple of overloads available for the ContinueWith method that you can configure when the continuation operation will run. It works both with the Task and Task<T> in which the prior case it takes Action as a parameter and in the later case the method accepts Func<<Task<T>,T> as a parameter. For example, in the above program if you want to continue with another operation once the Task<int> is finished, you can add the continuation task as below:

Task<int> taskWithReturnValue = Task.Factory.StartNew(() =>
           {
               Console.WriteLine("TASK WITH NO RETURN VALUE RUNNING ON THREAD {0}", Thread.CurrentThread.ManagedThreadId);
               return 6;
           }).ContinueWith((i) => {return i.Result*5;});

In this case, once the Task<int> completes the operation, then it continues with another operation that takes the result of Task<int> and multiplies the result with 5. So, the return value will be 30 instead of 6 as in the above program. Here is the output:

image

You can use the continuation method for different scenarios like:
  • When the Task is cancelled, if a certain operation is to be performed.
  • If you want to perform some operation if a Task is faulted.
  • If you want to perform some operation if a Task is finished its operation.
Here is a sample program to demonstrate the continuation operations:

static void Main(string[] args)
        {
            Task<string> task = Task.Factory.StartNew(() =>
                {
                    return "Hello EveryOne";
                });
            task.ContinueWith((s) =>
                {
                    Console.WriteLine(" I am completed!!!");
                },TaskContinuationOptions.OnlyOnRanToCompletion);
            task.ContinueWith((s) =>
                {
                   Console.WriteLine(" I am cancelled!!!");
                }, TaskContinuationOptions.OnlyOnCanceled);
            task.ContinueWith((s) =>
            {
                Console.WriteLine(" I am faulted!!!");
            }, TaskContinuationOptions.OnlyOnFaulted);
            Console.WriteLine(task.Result);
            Console.Read();
        }

Output:

image

Since in the program, the Task successfully completed its operation, only continuation option when Task is completed is executed. Had the Task got cancelled or faulted then the other operations would have been displayed.

The option OnlyOnFaulted executes only if the antecedent throws an unhandled exception. Change the above program code as below:

class Program
    {
        static void Main(string[] args)
        {
           Task<string> task = Task.Factory.StartNew(() =>
                {
                    throw new AggregateException("This task cannot execute!!!");
                    return "Hello EveryOne";

                });
            task.ContinueWith((s) =>
                {
                    Console.WriteLine(" I am completed!!!");
                },TaskContinuationOptions.OnlyOnRanToCompletion);
            task.ContinueWith((s) =>
                {
                   Console.WriteLine(" I am cancelled!!!");
                }, TaskContinuationOptions.OnlyOnCanceled);
            task.ContinueWith((s) =>
            {
                Console.WriteLine(" I am faulted!!!");
            }, TaskContinuationOptions.OnlyOnFaulted);
           //Console.WriteLine(task.Result);
            Console.Read();
        }

Note that I have commented out the line to display the result as it goes to exception and you cannot display the result. Also observe that in this program antecedent is the task created in the below line:


Task<string> task = Task.Factory.StartNew(() =>
                {
                    throw new AggregateException("This task cannot execute!!!");
                    return "Hello EveryOne";

                });

With ContinueWith is going to take the result of task and then creates a new Task to perform another operation. If an exception occurs in task operation, then it will propagated to the ContinueWith task’s operation and displays the message “I am faulted”.

  task.ContinueWith((s) =>
                  task.ContinueWith((s) =>
            {
                Console.WriteLine(" I am faulted!!!");
            }, TaskContinuationOptions.OnlyOnFaulted); Output:
image

Press continue and you will see the message below:
image

Task Cancellation:

Here is a simple program to demonstrate the task cancellation:

class Program
    {
        /// <summary>
        /// Method that runs time consuming task.
        /// This method accepts CancellationToken as a parameter.
        /// This program loops through 1 to 2000 and before the count reaches 2000
        /// if user hits any of the key then the tokensource is set to cancel and the token
        /// gets the cancellation request and the loop will get break and control returns to the
        /// Main program.
        /// </summary>
        /// <param name="ct"></param>
        static void RunTimeConsumingTask(CancellationToken ct)
        {
            if (ct.IsCancellationRequested)
            {
                Console.WriteLine("Cancellation request is sent and the operation is still in the execution");
                return;
            }
            for (int i=1;i<=2000;i++)
            {
                Console.WriteLine("Operation code {0} is completed", i);
                Thread.Sleep(1000);
                if (ct.IsCancellationRequested)
                {
                    Console.WriteLine("Cancelled before completing operation code 2000");
                    break;
                }
            }
        }
        static void Main(string[] args)
        {
            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;
            Console.WriteLine("Press enter or any key to cancel");
            Task<string> task = Task.Factory.StartNew(() =>
                {
                    RunTimeConsumingTask(token);
                    return "Hello EveryOne";
                });
            Console.ReadLine();
            tokenSource.Cancel();
            task.Wait();
            Console.Read();
        }
Output:
image

Here when the loop is reached to 4, if any key is hit then the task is cancelled.

Another program to demonstrate the “TaskContinuation.OnlyOnCanceled”:

In the below program task is the instance of Task class that always looks for the CancellationToken. The CancellationToken propagates the notification that operations should be cancelled.  The CancellationTokenSource is used to signal that the Task should cancel itself.
Another statement to note is below:
token.ThrowIfCancellationRequested();

This statement throws System.OperationCanceledException if token has cancellation requested and this lets the outside the Task code to let the program know that task is getting cancelled. The “ContinueWith” is added to display the message when task is cancelled by creating another task and adding TaskContinuation.OnlyOnCanceled.

static void Main(string[] args)
        {
            var cancellationTokenSource = new CancellationTokenSource();
            CancellationToken token = cancellationTokenSource.Token;
            Task task = Task.Factory.StartNew(() =>
                {
                    while (!token.IsCancellationRequested)
                    {
                        Console.WriteLine("Executing tasks");
                        Thread.Sleep(1000);
                    }
                    token.ThrowIfCancellationRequested();
                }, token).ContinueWith((t) =>
                    {
                        Console.WriteLine("Task is cancelled");
                    }, TaskContinuationOptions.OnlyOnCanceled);
                Console.WriteLine("Press any key to end the task");
                Console.ReadLine();
                cancellationTokenSource.Cancel();
                task.Wait();
            Console.WriteLine("Press enter to end the application");
            Console.ReadLine();
        }
Output:
image
When enter key is pressed the task is cancelled and the message “Task is cancelled” is displayed.

Nested Tasks:
One task can encapsulate another task or a number of tasks. Nested tasks does not necessarily ensure that outer task waits for the inner tasks to complete.
A simple program to demonstrate the nested tasks:
/// <summary>
        /// Program to show dependencies between Parent
        /// and child tasks
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            //parent task created with Task.Factory.StartNew
            //This task is the outer task which is going to encapsulate
            //inner tasks or child tasks
            var parent = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Parent Task statement1");
                //Create a TaskFactory. Once you create a TaskFactory instance, you can create as many tasks
                //as needed.
                TaskFactory tf = new TaskFactory(TaskCreationOptions.None, TaskContinuationOptions.ExecuteSynchronously);
                //Create Child1
                tf.StartNew(() =>
                    {
                        Console.WriteLine("Child1 statement1");
                        Thread.Sleep(5000);
                        Console.WriteLine("Child1 statement2");
                    });
                //Create Child2
                tf.StartNew(() =>
                {
                    Console.WriteLine("Child2 statement1");
                    Thread.Sleep(5000);
                    Console.WriteLine("Child2 statement2");
                });
                Console.WriteLine("Parent Task statement2");
            });
            //Here final task is going to wait for parent task
            var finalTask = parent.ContinueWith(parentTask =>
                {
                    Console.WriteLine("Final statement");
                });
            Console.ReadLine();
        }
image

In this program, we have:
1. One task “parent” is encapsulating two child tasks.
2. Then created a task which is going to execute when the parent task is completed.
3. We cannot control the flow of execution here in the case of child tasks. In the output, you can find that parent task statements are executed first then child’s statements are executed. By the time another statement is executed, final task statement is displayed.

If you want parent to wait till the child tasks to complete, System.Threading provides enum TaskContinuationOptions with value “AttachedToParent”. This will bind the child tasks to the parent task which makes the parent task to wait till the child tasks are completed.

image

Now modify the statement in your program as below:

TaskFactory tf = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously);

After this code change you will observe that parent task is going to wait for the child tasks to complete.

Check the following output:
image

Now you observe that final task waits till parent task is completed execution and parent task is going to wait till child tasks are completed.

Now if you see the output, you will get a question as I got when I learnt the basics “From the output it seems all the statements in the parent task are displayed first and then child task statements were displayed. Then how can you support your statement that parent task is going to wait for child tasks to complete?

Here is my answer for the question:

Because the output displays statements with “Parent Task…” are displayed first and then followed by statements with “Child…” does not mean parent has completed the execution and is not waiting for the child to complete. Here is the confirmation with the change highlighted to support:

/// <summary>
        /// Program to show dependencies between Parent
        /// and child tasks
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            //parent task created with Task.Factory.StartNew
            //This task is the outer task which is going to encapsulate
            //inner tasks or child tasks
            var parent = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Parent Task statement1");
                //Create a TaskFactory. Once you create a TaskFactory instance, you can create as many tasks
                //as needed.
                TaskFactory tf = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously);
                //Create Child1
                tf.StartNew(() =>
                    {
                        Console.WriteLine("Child1 statement1");
                        Thread.Sleep(5000);
                        Console.WriteLine("Child1 statement2");
                    }).ContinueWith((t) =>
                    {
                        Console.WriteLine("Child1 task is completed");
                    }, TaskContinuationOptions.OnlyOnRanToCompletion);
                //Create Child2
                tf.StartNew(() =>
                {
                    Console.WriteLine("Child2 statement1");
                    Thread.Sleep(5000);
                    Console.WriteLine("Child2 statement2");
                }).ContinueWith((t) =>
                {
                    Console.WriteLine("Child2 task is completed");
                }, TaskContinuationOptions.OnlyOnRanToCompletion);
                Console.WriteLine("Parent Task statement2");
            }).ContinueWith((t)=>
                {
                    Console.WriteLine("Parent task is completed");
                }, TaskContinuationOptions.OnlyOnRanToCompletion);
            //Here final task is going to wait for parent task
            var finalTask = parent.ContinueWith(parentTask =>
                {
                    Console.WriteLine("Final statement");
                });
            Console.ReadLine();
        }

Now we have added continue tasks once the task has completed the execution with “TaskContinuationOptions.OnlyOnRanToCompletion”. With this addition of confirmation statements, now check the output that confirms that parent task has completed only after child tasks are completed.
image

Additional methods:
- WaitAll() method can be used to wait for all multiple tasks to finish before continuation.
- WhenAll() method can be used to schedule a continuation method after all Tasks have completed.
- WaitAny() method can be used to wait until one of the Tasks is finished.

Hope you like this article. The programs are coded in C# 4.0. In the next article I will touch the same programs rewritten in C# 5.0 and then we will discuss the new features added in C# 5.0.



No comments:

Post a Comment