Monday, 20 June 2011

Concurrency : CompletionService

CompletionService Interface do tasks done by ExecutorService more efficiently.
ExecutorCompletionService is the sole standard implementation of the interface.

Let’s take a look at some sample code. The goal is to produce Widget objects and do some arbitrary task with them as they are created.
// service that wraps a thread pool with 5 threads
CompletionService compService = new ExecutorCompletionService(
Executors.newFixedThreadPool(5));

// how many futures there are to check
int remainingFutures = 0;

for (Callable<widget> c: getCallables()) {
remainingFutures++;

compService.submit(c);
}

Future<widget> completedFuture;
Widget newWidget;

while (remainingFutures > 0) {
// block until a callable completes
completedFuture = compService.take();
remainingFutures--;

// get the Widget, if the Callable was able to create it
try {
newWidget = completedFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
logger.warn("Widget creation failed", cause);
continue;
}

// a Widget was created, so do something with it
processCompletedWidget(newWidget);
}

This way, you can avoid the inefficiency of polling, as well as increasing responsiveness. This is far from the only way to use CompletionService, though. Just as an example, you could modify this code to cancel all waiting or in progress Callables if any Callable failed.
// service that wraps a thread pool with 5 threads
CompletionService compService = new ExecutorCompletionService(
Executors.newFixedThreadPool(5));

// Futures for all submitted Callables that have not yet been checked
Set<future<widget>> futures = new HashSet<future<widget>>();

for (Callable<widget> c: getCallables()) {
// keep track of the futures that get created so we can cancel them if necessary
futures.add(compService.submit(c));
}

Future<widget> completedFuture;
Widget newWidget;

while (futures.size() > 0) {
// block until a callable completes
completedFuture = compService.take();
futures.remove(completedFuture);

// get the Widget, if the Callable was able to create it
try {
newWidget = completedFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
logger.warn("Widget creation failed", cause);

for (Future<widget> f: futures) {
// pass true if you wish to cancel in-progress Callables as well as
// pending Callables
f.cancel(true);
}

break;
}

// a Widget was created, so do something with it
processCompletedWidget(newWidget);
}

As you can see, a CompletionService gives you the flexibility to treat it as a standard ExecutorService by providing Futures as you submit Callables, while also providing a handy queue-like interface to the very same Futures as their corresponding Callables complete. The one important thing you can’t do via the CompletionService interface is shutdown the underlying ExecutorService, so in some situations you may wish to keep a reference to the underlying ExecutorService to shutdown when you’re done using it. Nonetheless, CompletionService is a useful class for concurrent systems, and it deserves to be part of every Java programmer’s toolbox.

No comments:

Post a Comment