Showing posts with label thread pool. Show all posts
Showing posts with label thread pool. Show all posts

Wednesday, 29 June 2011

How to make your own Thread Pool

Following are the steps to create your own thread pool :
  • create your ThreadPoolExecutor that includes specific BlockingQueue and Comparator.
  • override the beforeExecute and afterExecute
  • create your ThreadFactory that implement uncaughtException to prevent thread dead problem
Creating the ThreadPoolExecutor

class MyThreadPoolExecutor extends ThreadPoolExecutor{
protected MyThreadPoolExecutor(){
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(maxCapacity,new MyComparator()),new MyThreadFactory());
}
@Override
protected void beforeExecute(Thread t,Runnable r){
/*do your stuff*/
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
/*do your stuff*/
super.afterExecute(r, t);
}

}

Creating our ThreadFactory:

class MyThreadFactory implements ThreadFactory {
private final MyThreadGroup tg = new MyThreadGroup();
public Thread newThread(Runnable r) {
return new Thread(tg,r);
}
}
private class MyThreadGroup extends ThreadGroup{
private MyThreadGroup(){
super("MyThreadGroup");
}
public void uncaughtException(Thread t, Throwable e){
log.debug(t);
/*do something*/
}
}

Sunday, 19 June 2011

FixedThreadPool example

Creates a fixed-size thread pool. Here is the syntax:
public static ExecutorService 
newFixedThreadPool(int nThreads)

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

Example
Here is a runnable task called WorkerThread (in a .java source file). The task performs some work and then periodically reports what percent of the work it has completed.
public class WorkerThread implements Runnable {
private int workerNumber;

WorkerThread(int number) {
workerNumber = number;
}

public void run() {
for (int i=0;i<=100;i+=20) {
//Perform some work...
System.out.format("Worker number: %d, percent complete: %d%n",
workerNumber, i);
try {
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) { }
}
}
}

In FixedThreadPoolDemo (in a .java source file), you can specify the number of worker threads to create and the size of the thread pool that will be used to run the threads. The following example uses a fixed thread pool so that you can observe the effect of running the program with fewer threads than tasks.
import java.util.concurrent.*;
public class FixedThreadPoolDemo {
public static void main(String[] args) {
int numWorkers = Integer.parseInt(args[0]);
int threadPoolSize = Integer.parseInt(args[1]);
ExecutorService tpes =
Executors.newFixedThreadPool(threadPoolSize);
WorkerThread[] workers = new WorkerThread[numWorkers];
for (int i = 0; i < numWorkers; i++) {
workers[i] = new WorkerThread(i);
tpes.execute(workers[i]);
}
tpes.shutdown();
}
}

Here is the result of running the test with 4 workers and a pool of 2 threads.
% java ThreadPoolTest 4 2
Worker number: 0, percent complete: 0
Worker number: 1, percent complete: 0
Worker number: 0, percent complete: 20
Worker number: 0, percent complete: 40
Worker number: 1, percent complete: 20
Worker number: 0, percent complete: 60
Worker number: 0, percent complete: 80
Worker number: 0, percent complete: 100
Worker number: 1, percent complete: 40
Worker number: 1, percent complete: 60
Worker number: 2, percent complete: 0
Worker number: 1, percent complete: 80
Worker number: 2, percent complete: 20
Worker number: 2, percent complete: 40
Worker number: 1, percent complete: 100
Worker number: 2, percent complete: 60
Worker number: 2, percent complete: 80
Worker number: 2, percent complete: 100
Worker number: 3, percent complete: 0
Worker number: 3, percent complete: 20
Worker number: 3, percent complete: 40
Worker number: 3, percent complete: 60
Worker number: 3, percent complete: 80
Worker number: 3, percent complete: 100

Notice how workers 0 and 1 are assigned to the two threads in the pool and that they alternately run to completion, then tasks 2 and 3 are assigned to the threads.

Like most of the other tasks in this chapter, WorkerThread implements the Runnable (in the API reference documentation) interface. Another way to create a task is to implement the Callable (in the API reference documentation) interface, as shown in the following example, CallableWorkerThread (in a .java source file). A Callable is more flexible than a Runnable because it can return a value and throw an exception. To implement a Callable, you provide the call method, which returns a value, in this case, an Integer that represents the task's number.

import java.util.concurrent.*;
public class CallableWorkerThread implements Callable<Integer> {
private int workerNumber;

CallableWorkerThread(int number) {
workerNumber = number;
}

public Integer call() {
for (int i = 0; i <= 100; i += 20) {
//Perform some work...
System.out.format("Worker number: %d, percent complete: %d%n",
workerNumber, i);
try {
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) {}
}
return(workerNumber);
}
}

Monday, 13 June 2011

Implementing ThreadPools via ExecutorService


A thread pool is represented by an instance of the class ExecutorService. With an ExecutorService, you can submit task that will be completed in the future. Here are the type of thread pools you can create with the Executors class :
  • Single Thread Executor : A thread pool with only one thread. So all the submitted task will be executed sequentially. Method : Executors.newSingleThreadExecutor()
  • Cached Thread Pool : A thread pool that create as many threads it needs to execute the task in parralel. The old available threads will be reused for the new tasks. If a thread is not used during 60 seconds, it will be terminated and removed from the pool. Method : Executors.newCachedThreadPool()
  • Fixed Thread Pool : A thread pool with a fixed number of threads. If a thread is not available for the task, the task is put in queue waiting for an other task to ends. Method : Executors.newFixedThreadPool()
  • Scheduled Thread Pool : A thread pool made to schedule future task. Method : Executors.newScheduledThreadPool()
  • Single Thread Scheduled Pool : A thread pool with only one thread to schedule future task. Method : Executors.newSingleThreadScheduledExecutor()
Once you have a thread pool, you can submit task to it using the different submit methods. You can submit a Runnable or a Callable to the thread pool. The method return a Future representing the future state of the task. If you submitted a Runnable, the Future object return null once the task finished.

Thread Pool example in java

Get a Callable
private final class StringTask extends Callable<String>{
public String call(){
//Long operations

return "Run";
}
}

If you want to execute that task 10 times using 4 threads, you can use that code :

ExecutorService pool = Executors.newFixedThreadPool(4);

for(int i = 0; i < 10; i++){
pool.submit(new StringTask());
}

But you must shutdown the thread pool in order to terminate all the threads of the pool :
pool.shutdown();

If you don’t do that, the JVM risk to not shutdown because there is still threads not terminated. You can also force the shutdown of the pool using shutdownNow, with that the currently running tasks will be interrupted and the tasks not started will not be started at all.

But with that example, you cannot get the result of the task. So let’s get the Future objects of the tasks :

ExecutorService pool = Executors.newFixedThreadPool(4);

List<Future<String>> futures = new ArrayList<Future<String>>(10);

for(int i = 0; i < 10; i++){
futures.add(pool.submit(new StringTask()));
}

for(Future<String> future : futures){
String result = future.get();

//Compute the result
}

pool.shutdown();

But this code is a bit complicated. And there is a disadvantage. If the first task takes a long time to compute and all the other tasks ends before the first, the current thread cannot compute the result before the first task ends. Once again, Java has the solution for you, CompletionService.

A CompletionService is a service that make easier to wait for result of submitted task to an executor. The implementation is ExecutorCompletionService who’s based on an ExecutorService to work. So let’s try :
ExecutorService threadPool = Executors.newFixedThreadPool(4);
CompletionService<String> pool = new ExecutorCompletionService<String>(threadPool);

for(int i = 0; i < 10; i++){
pool.submit(new StringTask());
}

for(int i = 0; i < 10; i++){
String result = pool.take().get();

//Compute the result
}

threadPool.shutdown();

With that, you have the result in the order they are completed and you don’t have to keep a collection of Future.

Here we are, you have the tools in hand to launch tasks in parralel using performing thread pools. Using Executors, ExecutorService and CompletionService you can create complex algorithm using several taks. With that tools, it’s really easy to change the number of threads performing in parallel or adding more tasks without changing a lot of code.

Thursday, 9 June 2011

ThreadPools

What Is Thread Pooling?
Thread pooling refers to a technique where a pool of worker threads is created and managed by the application. When a new job arrives, instead of creating a new thread to service it, it's queued by the thread-pool manager and dispatched later to one of the available worker threads. The thread-pool manager manages the number of active worker threads based on available resources as well as load considerations, adding new threads to the pool or freeing some worker threads in response to the number of outstanding requests. The primary goals of thread pooling are managing the number of active threads in the system and reducing the overhead of creating new threads by reusing threads from a pool.
Pool Management
Various management methods exist for the pools. You can shutdown() the pool, which will reject any future submissions but complete processing of in-process executions and even those that had not yet started but were submitted before the shutdown was initiated. You can also more aggressively perform a shutdownNow(). This will also prevent any future submissions, but it has a few different, notable behaviours. It will not start execution of submitted but unstarted tasks. They will be in the returned list. It will also attempt to stop, or more precisely, Thread.interrupt() currently executing tasks. This is a best effort with no guarantee that these tasks will be successfully interrupted.



Thread Pools Implementation
You can either provide your own implementation of thread pools or use java's implementation. As discussed below, you can have following implementations in Java:
  • A thread pool implementation is provided as instance of ExecutorService class,
  • you can put in different implementations of BlockingQueue to specify different queue behavior such as queue bounds or priority ordering

Implementation of Thread Pools via ExecutorService
Implementation of Thread Pool via Executor Service is shown here.

Implementation of ThreadPools via BlockingQueue
Implementation of Thread Pools via BlockingQueue is shown here.


Benefits of Thread Pooling

  • It saves the machine work of creating a new thread.
  • Single thread can recycle again and again.
  • The size of thread pool is given in starting .We can expand on the designs presented in this chapter to include a method to support growing the size of the pool at runtime if you need this kind of dynamic tuning.
  • Response time can be quick.

Risks of using thread pools

  • High Rejection rate - Suppose if an task is rejected because the thread pool is empty. There will be high rejection rate.Or if task on wait state then waiting time would be too long. Sometime there is problem of deadlock also. 
  • Contention - If you have a very large memory, and threads equivalent to that, you are contending the threads, though you have resources. So there threadlocal will be a good idea to use.