Showing posts with label concurrent interfaces. Show all posts
Showing posts with label concurrent interfaces. Show all posts

Monday, 20 June 2011

Interfaces in java concurrency utils

Java’s concurrency library, java.util.concurrent, has a variety of useful classes to make concurrent code easier to write and more reliable. (If you’re not familiar with that library, you’ll find this post a lot more comprehensible if you read Sun’s tutorial on concurrency.)
There are many ways to put the various building blocks that java.util.concurrent (j.u.c from now on) provides to create concurrent systems. Before we begin, here’s a short reminder of what the most basic j.u.c classes and interfaces are.


Runnable
Intended to be more or less equivalent to starting a Thread. It defines a method that does not return anything and cannot throw checked exceptions.
Callable
A more flexible variant of the same idea. It defines a method that can return a value and throw a checked exception.
Future
Represents the result of a computation (e.g. a Callable). It is used in place of a value that may not be computed yet.
Executor
Accepts Runnables and returns void.
ExecutorService
Extends Executor to also provide a method that accepts a Callable and returns a Future.
Executors
Provides simple methods to get common types of ExecutorService implementations.
Some of them are:
CompletionService

Concurrency : CompletionService

CompletionService Interface do tasks done by ExecutorService more efficiently.
ExecutorCompletionService is the sole standard implementation of the interface.

Let’s take a look at some sample code. The goal is to produce Widget objects and do some arbitrary task with them as they are created.
// service that wraps a thread pool with 5 threads
CompletionService compService = new ExecutorCompletionService(
Executors.newFixedThreadPool(5));

// how many futures there are to check
int remainingFutures = 0;

for (Callable<widget> c: getCallables()) {
remainingFutures++;

compService.submit(c);
}

Future<widget> completedFuture;
Widget newWidget;

while (remainingFutures > 0) {
// block until a callable completes
completedFuture = compService.take();
remainingFutures--;

// get the Widget, if the Callable was able to create it
try {
newWidget = completedFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
logger.warn("Widget creation failed", cause);
continue;
}

// a Widget was created, so do something with it
processCompletedWidget(newWidget);
}

This way, you can avoid the inefficiency of polling, as well as increasing responsiveness. This is far from the only way to use CompletionService, though. Just as an example, you could modify this code to cancel all waiting or in progress Callables if any Callable failed.
// service that wraps a thread pool with 5 threads
CompletionService compService = new ExecutorCompletionService(
Executors.newFixedThreadPool(5));

// Futures for all submitted Callables that have not yet been checked
Set<future<widget>> futures = new HashSet<future<widget>>();

for (Callable<widget> c: getCallables()) {
// keep track of the futures that get created so we can cancel them if necessary
futures.add(compService.submit(c));
}

Future<widget> completedFuture;
Widget newWidget;

while (futures.size() > 0) {
// block until a callable completes
completedFuture = compService.take();
futures.remove(completedFuture);

// get the Widget, if the Callable was able to create it
try {
newWidget = completedFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
logger.warn("Widget creation failed", cause);

for (Future<widget> f: futures) {
// pass true if you wish to cancel in-progress Callables as well as
// pending Callables
f.cancel(true);
}

break;
}

// a Widget was created, so do something with it
processCompletedWidget(newWidget);
}

As you can see, a CompletionService gives you the flexibility to treat it as a standard ExecutorService by providing Futures as you submit Callables, while also providing a handy queue-like interface to the very same Futures as their corresponding Callables complete. The one important thing you can’t do via the CompletionService interface is shutdown the underlying ExecutorService, so in some situations you may wish to keep a reference to the underlying ExecutorService to shutdown when you’re done using it. Nonetheless, CompletionService is a useful class for concurrent systems, and it deserves to be part of every Java programmer’s toolbox.

Friday, 17 June 2011

The Executor Class

The most important new feature for the casual developer of multithreaded applications is the new Executor framework. However, the old way of creating threads in Java was to extend the java.lang.Thread class or to implement the java.lang.Runnable interface and pass it to Thread as argument. In this approach, the task is modeled as a Runnable and you create one or more threads for each task. There were no built in facilities to re-use threads such as thread pools. Additionally, once a task was started, there was no easy way to know when the task completed without implementing the wait/notify mechanism.

Since JDK5, another abstraction for concurrent execution of tasks is the Executor interface.

public interface Executor {
void execute(Runnable cmd) ;
}

To use the thread pooling framework, you create an Executor instance, then you pass it some runnable tasks:
class MyExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}

Now you can create objects of this executor and run the methods:
Executor executor = new MyExecutor;
executor.execute(aRunnable1);
executor.execute(aRunnable2);


 A java.util.concurrent.Executor is an object that executes Runnable tasks. It is similar to calling
new Thread (aRunnableObject).start ();
The concurrency utilities also include a ThreadPoolExecutor class that offers support for many common pooling operations. For a single new thread, there is perhaps not much reason to use an Executor. However, most multithreaded applications involve several threads. Threads need stack and heap space, and, depending on the platform, thread creation can be expensive. In addition, cancellation and shutdown of threads can be difficult.
The new Executor framework solves all those problems in a way that decouples task submission from the mechanics of how each task will be run, including the details of thread use, scheduling, etc.
An Executor can and should be used instead of explicitly creating threads. For example, rather than creating a new thread and starting it as above, you can use:
Executor executor = some Executor factory method;
exector.execute (aRunnable);
Notice that our Executor object was returned by an Executor factory. (We discuss design patterns like factory methods ; a factory is a standard name for a method that is used to obtain an instance of a class or subclass rather than making it with a constructor.)
There are several static Executor factory methods available in the java.util.concurrent.Executors class. If you have several Runnable tasks to carry out, but do not have specific requirements about the order in which they occur, then a simple thread pool arrangement is provided as follows:
Executor executor = Executors.newFixedThreadPool (5);
executor.execute (new RunnableTask1 ());
executor.execute (new RunnableTask2 ());
executor.execute (new RunnableTask3 ());
...
Here the tasks run and complete under the control of the Executor, which reuses threads from the thead pool as needed without incurring the overhead of always creating new threads. You can stop the threads with
executor.shutdown();
There are several more Executor factories in the Executors class for other needs beyond the scope of this book. Refer to the J2SE 5.0 API docs for complete information.

Thursday, 16 June 2011

The Future Interface

Future interface defined in java 5 represents the result of an asynchronous computation. Future has been provide with some really powerful methods. For Instance -
1) We can check whether a task has been completed or not.
2) We can cancel a task.
3) Check whether task was cancelled or complete normally.

The Future Interface
Future interface looks like this :
public interface Future {

//Attempts to cancel execution of this task.
boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

// Waits if necessary for the computation to complete,
// and then retrieves its result.
V get() throws InterruptedException, ExecutionException;

// Waits if necessary for at most the given time for the computation
// to complete, and then retrieves its result, if available.
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Understanding the methods


boolean cancel(boolean mayInterruptIfRunning) -
This method does following -
1) If the process(Thread) has not started, then cancel the thread.
2) If the process has started, then check if
mayInterruptIfRunning = true ==> Inturrupt the thread and cancel it
mayInterruptIfRunning = false ==> Let it run
This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason.
After this method returns, subsequent calls to isDone() will always return true.
Subsequent calls to isCancelled() will always return true if this method returned true.

boolean isCancelled()
Returns true if this task was cancelled before it completed normally

V get() throws InterruptedException, ExecutionException
This method is a blocking call. It will cause JVM to wait if necessary for the computation to complete, and then retrieves its result.

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation - in all of these cases, this method will return true.

Here’s where the combined power of the thread pools and Callable come together. When you submit a Callable to one of the thread pools, you are provided an instance of Future that is typed to the Callable you passed in. This object substitutes for an actual Thread instance that you would have used prior to 1.5. Whereas you previously had to do Thread.join() or Thread.join(long millis), now you may use them as in this example.

Example
CallableImpl.java - Implementing the Callable interface
package com.vaani.callable;

import java.util.concurrent.Callable;

public class CallableImpl implements Callable<Integer> {

private int name;
public CallableImpl(int i){
name = i;
}

public Integer call() {
for(int i = 0; i < 10; i++) {
System.out.println("Thread : " + getName() + " I is : " + i);
}
return new Integer(getName());

}

public int getName() {
return name;
}

public void setMyName(int myName) {
this.name = myName;
}

}

CallableDemo.java - Understanding the use of future via demo
public class CallableDemo {

public static void main(String[] args) {
Callable<Integer> callable = new CallableImpl(2);

ExecutorService executor = new ScheduledThreadPoolExecutor(5);
Future<Integer> future = executor.submit(callable);

try {
System.out.println("Future value: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}

The Executor service tutorial

An asynchronous process is a process which has the probability to not finishing its execution right away, but later. If the execution finishes right away, then those are known as synchronous process. Special care is needed when designing asynchronous process, because we are used to synchronous programming model.

The java.util.concurrent.ExecutorService interface represents an asynchronous execution mechanism which is capable of executing tasks in the background. It extends Executor interface.

Implementation of Executor service
Here are the type of thread pools you can create with the Executors class, which are got from the factory class called Executors. Following are the implementation of ExecutorService :
Single Thread Executor : A thread pool with only one thread. So all the submitted task will be executed sequentially.
Method : Executors.newSingleThreadExecutor()
Cached Thread Pool : A thread pool that create as many threads it needs to execute the task in parralel. The old available threads will be reused for the new tasks. If a thread is not used during 60 seconds, it will be terminated and removed from the pool.
Method : Executors.newCachedThreadPool()
Fixed Thread Pool : A thread pool with a fixed number of threads. If a thread is not available for the task, the task is put in queue waiting for an other task to ends.
Method : Executors.newFixedThreadPool()
Scheduled Thread Pool : A thread pool made to schedule future task.
Method : Executors.newScheduledThreadPool()
Single Thread Scheduled Pool : A thread pool with only one thread to schedule future task.
Method : Executors.newSingleThreadScheduledExecutor()

So we can create our ExecutorService like this:
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(4);
ExecutorService executorService3 = Executors.newScheduledThreadPool(4);

Methods in Executor Service
There are a few different ways to delegate tasks for execution to an ExecutorService:
  • execute(Runnable) - The execute(Runnable) method takes a java.lang.Runnable object, and executes it asynchronously.There is no way of obtaining the result of the executed Runnable.
  • submit(Runnable) - The submit(Runnable) method also takes a Runnable implementation, but returns a Future object. This Future object can be used to check if the Runnable as finished executing.
  • submit(Callable)-The submit(Callable) method is similar to the submit(Runnable) method except for the type of parameter it takes. The Callable instance is very similar to a Runnable except that its call() method can return a result. The Runnable.run() method cannot return a result.
  • invokeAny(...) - The invokeAny() method takes a collection of Callable objects, or subinterfaces of Callable. Invoking this method does not return a Future, but returns the result of one of the Callable objects. You have no guarantee about which of the Callable's results you get. Just one of the ones that finish.
  • invokeAll(...) - The invokeAny() method takes a collection of Callable objects, or subinterfaces of Callable. Invoking this method does not return a Future, but returns the result of one of the Callable objects. You have no guarantee about which of the Callable's results you get. Just one of the ones that finish.

Lets look at them one by one. First get the instance of executor service:
ExecutorService executorService = Executors.newSingleThreadExecutor();

execute()
Example to be added soon.
submit()
submit() takes callable and returns Future object.
Callable c = new Callable() {
public Object call() throws Exception {
Thread.sleep(3000);
ReturnObject rt = new ReturnObject();
rt.setReturnName(“serkan sunel”);
return rt;
}
};

Future future = executorService.submit(c);
//Now get the result 
try {
Object returnObj = future.get();
System.out.println(“returned :+ returnObj);
} catch (Exception e) {
e.printStackTrace();
}

So we can get result of the thread.

Closing the ExecutorService
You can close the executor service by calling shutdown() method. But this will not shut down immediately, but it will no longer accept new tasks, and once all threads have finished current tasks, the ExecutorService shuts down. All tasks submitted to the ExecutorService before shutdown() is called, are executed. You can use shutdownNow() to shutdown the service immediately.

Example
We are going to look into a program, which will initialize a fixed pool size ExecutorService and submit number of tasks (threads) to the executor service to execute them. To simulate asynchronous behavior each of those threads will work, and the sleep for a while, and then again work before returning result.

public class ExecutorServiceWithCallable {

public void executorServiceForCallable(int poolSize) throws
ExecutionException, InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(poolSize);

List<Future> futures = new ArrayList<Future>();
for (int index = 0; index < poolSize; index++) {
Future future = executorService.submit(new RepeatingGreetings(index));
futures.add(future);
}

for (Future future : futures) {
System.out.println(future.get());
}
}
class RepeatingGreetings implements Callable {
private int index;

RepeatingGreetings(int index) {
this.index = index;
}

@Override
public V call() throws Exception {
System.out.println("Callable: " + index + " is working");
int sleepValue = (int) (Math.random() * 1000);
Thread.sleep(sleepValue);
return (V) ("Callable: " + index + " sleept for: " + sleepValue);
}
}
}

First we are going to talk about the Callable we are creating, and then we will look into ExecutorService.

Callable is same as java.lang.Runnable, both of them are designed to be executed by another thread. The only difference is, Callable is designed to return result, but Runnable are not. The call() method of Callable works same as run() method of Runnable.

The use of Callable on our example is pretty straight forward. The RepeatingGreetings is Callable which will print a message, sleep for a random amount of time, and then return a string message as result of execution.

The ExecutorServiceWithCallable is doing the main job here. We are creating a new executor service for supplied pool size. Then we are submitting our Callables to the ExecutorService to execute. As soon as we invoke the submit() method of ExecutorService the Callable are handed over to ExecutorService to execute. Here one thing we have to note, the submit() is not blocking. So, all of our Callables will be submitted right away to the ExecutorService, and ExecutorService will decide when to execute which callable. For each Callable we get a Future object to get the result later.

Once the Callables are submitted for execution, we are iterating through all Futures to get the result of execution. We are invoking the get() method of Future to get the result. Here we have to remember that, the get() is a blocking method. So the first invocation of get() will wait until that Callable is finished. Once we receive the result of the execution of first Callable it will wait for next Callable to finish, and so on. Since the get() is blocking, we could put the iteration on a separate thread to continue the execution, and get the result later when all Futures are ready with their results.

There is also, another useful method named isDone() on Future which let us check if the Callable for that Future is finished or not.

Output
The two random execution of our above example for five Callables produces output like

Callable: 0 is working
Callable: 1 is working
Callable: 3 is working
Callable: 2 is working
Callable: 4 is working
Callable: 4 sleept for: 18
Callable: 0 sleept for: 70
Callable: 1 sleept for: 449
Callable: 3 sleept for: 744
Callable: 2 sleept for: 844
Another Random output -
Callable: 0 is working
Callable: 1 is working
Callable: 3 is working
Callable: 2 is working
Callable: 4 is working
Callable: 0 sleept for: 270
Callable: 1 sleept for: 348
Callable: 2 sleept for: 427
Callable: 4 sleept for: 428
Callable: 3 sleept for: 564

See also - Implementing Thread Pools via ExecutorService 

Monday, 13 June 2011

The Callable Interface

One of the beautiful things about Java from its very first release was the ease with which we could write multi-threaded programs and introduce asynchronous processing into our designs. The Thread class and Runnable interface combined with Java’s memory management model meant for straightforward thread programming.

The run() method in Runnable cannot return a result (i.e. it returns void) and cannot throw a checked exception. If you try to throw an exception in a run() method, the javac compiler insists that you use a throws clause in the method signature. However, the superclass run() method doesn't throw an exception, so javac will not accept this. The lack of thrown checked exceptions was a little more serious.

Even if you were careful and you stored these for later verification, you couldn’t force all uses of the class to check the exception. You could go through all your getters and throw the Exception if it existed on each one. Besides being cumbersome, even that wasn’t foolproof. You couldn’t enforce calls to any of these. Thread programmers would correctly call join() to wait for it complete and may then have gone on their merry way.

The new java.util.concurrent.Callable interface is much like Runnable but overcomes two drawbacks with Runnable.
The interface looks like this :
public interface Callable<V> {
V call() throws Exception;
}


Implementing Runnable to get returned value
If you need a result from a Runnable task, you have to provide some external means of getting that result. A common technique is to set an instance variable in the Runnable object and provide a method to retrieve that value. For example,

public MyRunnable implements Runnable
{
private int fResult = 0;
public void run () {
...
fResult = 1;
} // run

// A getter method to provide the result of the thread.
public int getResult () { return fResult; }

} // class MyRunnable


Even if you were careful and you stored these for later verification, you couldn’t force all uses of the class to check the exception. You could go through all your getters and throw the Exception if it existed on each one. Besides being cumbersome, even that wasn’t foolproof. You couldn’t enforce calls to any of these. Thread programmers would correctly call join() to wait for it complete and may then have gone on their merry way.
Implementing the Callable
The Callable interface solves these problems. Instead of a run() method the Callable interface defines a single call() method that takes no parameters but is allowed to throw an exception. A simple example is

import java.util.concurrent.*;
public class MyCallable implements Callable
{
public Integer call () throws java.io.IOException {
return 1;
}
} // MyCallable

This call() method returns an Integer. (Note that we have conveniently used the autoboxing support in J2SE 5.0 to have the literal int 1 value automatically boxed into an Integer return value.)

Note
The call() method is the entry point into a Callable object, and it's return type is the type parameter set in the Callable object. To implement Callable with no return value, use Callable<void>. Also, note that the call() method throws a checked exception, as compared to the run() method in Runnable which does not throw any exception. The Executors class contains utility methods to convert from other common forms to Callable classes. However, Callable cannot be used in place of a Runnable. Callable objects have to be invoked by ExecutorService. The Executor framework provides the Future interface to allow handling the cancellation and returns of a Callable object.

Getting the return value from a Callable depends upon the new generics feature:

FutureTask task = new FutureTask (new MyCallable ());
ExecutorService es = Executors.newSingleThreadExecutor ();
es.submit (task);
try {
int result = task.get ();
System.out.println ("Result from task.get () = " + result);
}
catch (Exception e) {
System.err.println (e);
}
es.shutdown ();

Here, we use the FutureTask class that supports an Integer return value. Then the task is submitted using the ExecutorService submit() method, and the result is obtained from the FutureTask get() method, again using auto-unboxing to convert the Integer to an int. See the API documentation for more information on ExecutorService, and FutureTask.

 

Callable Example


CallableImpl.java


package com.vaani.callable;

import java.util.concurrent.Callable;

public class CallableImpl implements Callable<Integer> {

private int name;
public CallableImpl(int i){
name = i;
}

public Integer call() {
for(int i = 0; i < 10; i++) {
System.out.println("Thread : " + getName() + " I is : " + i);
}
return new Integer(getName());

}

public int getName() {
return name;
}

public void setMyName(int myName) {
this.name = myName;
}

}

CallableDemo.java



public class CallableDemo {

public static void main(String[] args) {
Callable<Integer> callable = new CallableImpl(2);

ExecutorService executor = new ScheduledThreadPoolExecutor(5);
Future<Integer> future = executor.submit(callable);

try {
System.out.println("Future value: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}

Also we have future which is similar to Callable except Future represents the result of an asynchronous computation. But it is discussed here.







Download the source


Source code can be downloaded from here.

Thursday, 23 September 2010

Implementing the Runnable Interface

The Clock applet shown below displays the current time and updates its display every second. You can scroll this page and perform other tasks while the clock continues to update because the code that updates the clock's display runs within its own thread.
The Clock applet uses a different technique than SimpleThread for providing the run method for its thread. Instead of subclassing Thread, Clock implements the Runnable interface (and therefore implements the run method defined in it). Clock then creates a thread with itself as the Thread's target. When created in this way, the Thread gets its run method from its target. The code that accomplishes this is shown in bold here:
import java.awt.Graphics;
import java.util.*;
import java.text.DateFormat;
import java.applet.Applet;

public class Clock extends Applet implements Runnable {
private Thread clockThread = null;
public void start() {
if (clockThread == null) {
clockThread = new Thread(this, "Clock");
clockThread.start();
}
}
public void run() {
Thread myThread = Thread.currentThread();
while (clockThread == myThread) {
repaint();
try {
Thread.sleep(1000);
} catch (InterruptedException e){
// the VM doesn't want us to sleep anymore,
// so get back to work
}
}
}
public void paint(Graphics g) {
// get the time and convert it to a date
Calendar cal = Calendar.getInstance();
Date date = cal.getTime();
// format it and display it
DateFormat dateFormatter = DateFormat.getTimeInstance();
g.drawString(dateFormatter.format(date), 5, 10);
}
// overrides Applet's stop method, not Thread's
public void stop() {
clockThread = null;
}
}


The Clock applet's run method loops until the browser asks it to stop. During each iteration of the loop, the clock repaints its display. The paint method figures out what time it is, formats it in a localized way, and displays it. You'll see more of the Clock applet in The Life Cycle of a Thread which uses it to teach you about the life of a thread.

Deciding to Use the Runnable Interface

You have now seen two ways to provide the run method for a Java thread:

  1. Subclass the Thread class defined in the java.lang package and override the run method.



    Example: See the SimpleThread class described in Subclassing Thread and Overriding run.



  2. Provide a class that implements the Runnable interface (also defined in the java.lang package) and therefore implements the run method. In this case, a Runnable object provides the run method to the thread.



    Example: See the Clock applet just shown.


There are good reasons for choosing either of these options over the other. However, for most cases, including that of the Clock applet, the following rule of thumb will guide you to the best option.



Rule of Thumb: If your class must subclass some other class (the most common example being Applet), you should use Runnable as described in option #2.


To run in a Java-enabled browser, the Clock class has to be a subclass of the Applet class. Also, the Clock applet needs a thread so that it can continuously update its display without taking over the process in which it is running. (Some browsers might create a new thread for each applet so as to prevent a misbehaved applet from taking over the main browser thread. However, you should not count on this when writing your applets; your applets should create their own threads when doing computer-intensive work.) But since the Java language does not support multiple class inheritance, the Clock class cannot be a subclass of both Thread and Applet. Thus the Clock class must use the Runnable interface to provide its threaded behavior.


Download


Download the source code from here. Also see Clock applet here (soon to be added).