Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Thursday, 7 July 2011

java.util.concurrent.TimeUnit ( concurrency )

A TimeUnit represents time durations at a given unit of granularity and provides utility methods to convert across units, and to perform timing and delay operations in these units. A TimeUnit does not maintain time information, but only helps organize and use time representations that may be maintained separately across various contexts.
A TimeUnit is mainly used to inform time-based methods how a given timing parameter should be interpreted. For example, the following code will timeout in 50 milliseconds if the lock is not available:
Lock lock = ...;
if ( lock.tryLock(50L, TimeUnit.MILLISECONDS) ) ...
while this code will timeout in 50 seconds:
Lock lock = ...;
if ( lock.tryLock(50L, TimeUnit.SECONDS) ) ...
Note however, that there is no guarantee that a particular timeout implementation will be able to notice the passage of time at the same granularity as the given TimeUnit.

Not only this it can be used in executor service, to see if there is any timeout or not. 

Although it is part of the java.util.concurrent package, the TimeUnit enum is useful in many contexts outside of concurrency. In this post, I look at how the TimeUnit enum can be used even in code that does not directly deal with concurrent functionality before examining how this enum is an example of many broader concepts in Java development.

Most of us who have probably seen (or implemented, but we'll blame the other developer for it now) code like that shown in the next code listing. In this code listing, a number of provided milliseconds is converted to a whole number of days by dividing by a previously determined single hard-coded number.

/**
    * Convert provided number of milliseconds into number of days
    */
private static long convertMilliSecondsToDaysViaSingleMagicNumber
(final long numberMilliseconds)
{
// 86400000 = 86400 seconds in a day multipled by 1000 ms per second
return numberMilliseconds / 86400000;
}

There are some problems with the approach taken by the above code listing. The most obvious issue may be the use of the magic number 86400000. Although most of us recognize 86400 as the number of seconds in a day, this may not be obvious to everyone and then there's the issue of it being 1000 times greater than that number. The comment in the code listing helps by explaining the underlying meaning of the numbers, but wouldn't it be nice if the code spoke more clearly for itself?

The next code listing shows an arguable slight improvement. Rather than using a single hard-coded number, individual hard-coded numbers are used that are more readable because they are separate. A reader of the code has a better chance of seeing how the number was constructed.

private static long convertMilliSecondsToDaysViaMoreExplanatoryMagicNumbers
(final long numberMilliseconds)  
{
// 60 seconds in minute, 60 minutes in hour, 24 hours in day, and
// one thousand milliseconds in a second
return numberMilliseconds / (60 * 60 * 24 * 1000);
}


Even though the individual numbers might make it easier to see what's happening in the conversion, the comment still might be useful in ensuring that the proper function is understood well. Magic numbers are also still involved and most code analysis tools will report issues with their use. The next code example attempts to deal with the issue of magic numbers.

private final static int NUMBER_MILLISECONDS_IN_SECOND = 1000;
private final static int NUMBER_SECONDS_IN_MINUTE = 60;
private final static int NUMBER_MINUTES_IN_HOUR = 60;
private final static int NUMBER_SECONDS_IN_HOUR =
NUMBER_SECONDS_IN_MINUTE * NUMBER_MINUTES_IN_HOUR;
private final static int NUMBER_HOURS_IN_DAY = 24;
private final static int NUMBER_MINUTES_IN_DAY =
NUMBER_HOURS_IN_DAY * NUMBER_MINUTES_IN_HOUR;
private final static int NUMBER_SECONDS_IN_DAY =
NUMBER_HOURS_IN_DAY * NUMBER_SECONDS_IN_HOUR;
private final static int NUMBER_MILLISECONDS_IN_DAY =
NUMBER_SECONDS_IN_DAY * NUMBER_MILLISECONDS_IN_SECOND;

/**
    * Convert provided number of milliseconds into number of days.
    */
private static long convertMilliSecondsToDaysViaDefinedConstant
(final long numberMilliseconds)
{
return numberMilliseconds / NUMBER_MILLISECONDS_IN_DAY;
}

The approach in the code above is commonly seen in Java code. The "magic" numbers are now defined as constants that can be reused in more than just one place. Although this is arguably an improvement, TimeUnit allows us to make a further improvement to this code.

private static long convertMillisecondsToDaysViaTimeUnit
(final long numberMilliseconds)
{
return TimeUnit.MILLISECONDS.toDays(numberMilliseconds);
}

This code takes advantage of TimeUnit's MILLISECONDS enum constant and toDays(long) method to easily perform this conversion is a standardized and highly readable way. There isn't a magic number in sight!

The above example demonstrates how TimeUnit can be used even when concurrency is not involved. Besides MILLISECONDS, other time unit representations provided by TimeUnit include DAYS, HOURS, MICROSECONDS, MINUTES, NANOSECONDS, and SECONDS. These cover the most commonly used time units one would need.

The methods on the TimeUnit enum allow easy conversion from the unit represented by the enum constant to a different unit of time. There is a general conversion method TimeUnit.convert(long, TimeUnit) that can be used for this purpose. More specific methods are also available for converting to specific types of time units so that the second parameter need not be applied. These methods include the already demonstrated toDays(long) as well as toHours(long), toMicros(long), toMillis(long), toMinutes(long), toNanos(long), and toSeconds(long). Although most of this enum was introduced with J2SE 5, the methods toMinutes(long), toHours(long), and toDays(long) were introduced with Java SE 6.

The enum constants and methods on TimeUnit defined so far are not specifically associated with concurrency and are generally useful. The TimeUnit enum offers three additional methods of interest. TimeUnit.sleep(long) provides a more readable Thread.sleep(long, int). The enum constant of the TimeUnit implies the applicable unit of time, so only a base number needs to be provided. The implication here, of course, is that more obvious numbers can be provided for sleeping rather than needing to worry about expressing a large number in milliseconds or even remembering that the method requires the time be specified in milliseconds.

Two other related useful methods available in TimeUnit are TimeUnit.timedJoin(Thread,long) [convenience method for Thread.join] and TimeUnit.timedWait(Thread,long) [convenience method for Object.wait].

I have used this post to demonstrate how TimeUnit is most obviously useful: it helps developers to write clear code without use of magic numbers for converting between different time measurement units. This is handy in its own right because different APIs often expect different time units. However, TimeUnit has benefits beyond its obvious intended functionality benefits. The TimeUnit enum shows off the power of Java enums and how this power can be leveraged. I look at this next.

Most of us who transitioned from C++ to Java missed having an enum in versions of Java prior to J2SE 5. Fortunately, the wait was worth it as the Java enum is far superior to the C++ enum. There are numerous ways in which the Java enum is better than the C++ enum, but one of the main advantages is the ability to implement methods on the enum. This was shown in the above example where a toDays(long) method allowed for easy conversion of milliseconds via the MILLISECONDS.toDays(long) call. A Java enum is much more than simply an encapsulation of a finite set of integral values. The ability to add behaviors to these enum constants is very powerful.

There are two main approaches for defining methods on an enum. One approach is to define a method at the overall enum level and override it individually at each enum constant's level. The other approach is to implement the method once for the entire enum and all of its enum constants with no need to override the single definition. In other words, one approach is to write an implementation of a method for each enum constant and the other approach writes a method that all the enum constants share. The TimeUnit enum demonstrates both approaches. Its general convert method and all of the convenient toXXXXX methods (where XXXXX are things like Hours or Days) are written specifically for each enum constant and the parent method at the overall enum level throws an AbstractMethodError if not properly overridden by each enum constant (fortunately it always is!). The remaining public methods (timedWait, timedJoin, and sleep) are written with the second approach: a single method implementation exists for each of these that is used by any enum constant defined for TimeUnit.

Besides its usefulness in providing highly readable time unit conversions and besides its usefulness in demonstrating the significant advantages of the Java enum, TimeUnit provides an example of one other "often true" principle in Java: highly and generally useful classes (or enum in this case) can often be found in the SDK where you might least expect it. Although the usefulness of TimeUnit is obvious in concurrent applications, its usefulness goes beyond concurrent functionality. This is not the only case where a more generally useful construct is available in the JDK in a more particular package. I have often seen this in projects I've worked on as well. Often a team will put together a nice class or enum for their own use that is more generally applicable, but which ends up remaining in their rather particular package instead of being in a more generally accessible package.

When we build our own time conversion routines, we typically see hard-coded numbers (or constants defined as) with values such as 1000, 60, and 24. So, it is not surprising that the source code for TimeUnit defines these as constants that it uses in its own conversions. Eventually, the rubber must hit the road and these conversions must take place with these hard numbers. The difference is that use of TimeUnit allows us to have those numbers defined and used outside of our direct code in a well-tested and standardly available enum. It is also interesting to note that hard-coded integers were used in early versions of TimeUnit, but were eventually replaced with internally defined constants:

// Handy constants for conversion methods
static final long C0 = 1L;
static final long C1 = C0 * 1000L;
static final long C2 = C1 * 1000L;
static final long C3 = C2 * 1000L;
static final long C4 = C3 * 60L;
static final long C5 = C4 * 60L;
static final long C6 = C5 * 24L;

This post has been lengthy already. So thanks for bearing :)

Wednesday, 29 June 2011

LinkedBlockingQueue vs SynchronousBlockingQueue


In case of producer consumer problem, the code of SynchronousBlockingQueue is almost identical to linked blocking queue one, but the application has an added benefit, in that SynchronousQueue will allow an insert into the queue only if there is a thread waiting to consume it.

As discussed here, synchronous blocking queue has capacity of zero. So it implements a rendezvous approach (producer waits until consumer is ready, consumer waits until producer is ready) behind the interface of Queue.
Also implementation of SynchronousQueue seems to be heavily optimized, so if you don't need anything more than a rendezvous point (as in the case of Executors.newCachedThreadPool(), where consumers are created "on-demand", so that queue items don't accumulate), you can get a performance gain by using SynchronousQueue.

How to make your own Thread Pool

Following are the steps to create your own thread pool :
  • create your ThreadPoolExecutor that includes specific BlockingQueue and Comparator.
  • override the beforeExecute and afterExecute
  • create your ThreadFactory that implement uncaughtException to prevent thread dead problem
Creating the ThreadPoolExecutor

class MyThreadPoolExecutor extends ThreadPoolExecutor{
protected MyThreadPoolExecutor(){
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(maxCapacity,new MyComparator()),new MyThreadFactory());
}
@Override
protected void beforeExecute(Thread t,Runnable r){
/*do your stuff*/
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
/*do your stuff*/
super.afterExecute(r, t);
}

}

Creating our ThreadFactory:

class MyThreadFactory implements ThreadFactory {
private final MyThreadGroup tg = new MyThreadGroup();
public Thread newThread(Runnable r) {
return new Thread(tg,r);
}
}
private class MyThreadGroup extends ThreadGroup{
private MyThreadGroup(){
super("MyThreadGroup");
}
public void uncaughtException(Thread t, Throwable e){
log.debug(t);
/*do something*/
}
}

Monday, 27 June 2011

A simple LRU cache in 5 lines

The applications usually need to cache information in memory. The most often used classes to do this in Java are HashMap and Hashtable. If you need to do any sophisticated caching, then you can use JBoss Cache, OSCache or EHCache. Even if you use an external caching system, you may still want to cache some information locally within an object just to have fast access. The problem with this approach is that, if you are not careful and do not control the size of this in-memory cache, then it may grow too big and affect the performance of your application.
A very simple solution to this problem is to set a maximum size for your in-memory cache and most preferably make it LRU (Least Recently Used). This way you will have a predictable memory utilization and only the items used recently will be kept in the cache.
Starting with JDK 1.4, a new (and very rarely used) collection class was added called LinkedHashMap. There are couple of benefits of using a LinkedHashMap:
  • It is possible to preserve the order of items in the map. So, the order of iteration through the items is same as the order of insertion. A special constructor is provided for this purpose. This is very useful when you already have a sorted collection of data and you want to do some processing on it and return it as a Map. Using a TreeMap (the only other map that allows iteration in a given order) is too expensive for this scenario.
  • It exposes a method removeEldestEntry(Map.Entry) that may be overridden to impose a policy for removing stale mappings automatically when new mappings are added to the map. This is what we are going to use to create a LRU cache.

Check out the following snippet for an example of simple LRU cache.
import java.util.*;

public class SimpleLRU {

private static final int MAX_ENTRIES = 50;

private Map mCache = new LinkedHashMap(MAX_ENTRIES, .75F, true) {
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_ENTRIES;
}
};

public SimpleLRU() {
for(int i = 0; i < 100; i++) {
String numberStr = String.valueOf(i);
mCache.put(numberStr, numberStr);

System.out.print("\rSize = " + mCache.size() +  
          "\tCurrent value = " + i + "\tLast Value in cache = " 
          + mCache.get(numberStr));
try {
Thread.sleep(10);
} catch(InterruptedException ex) {
}
}

System.out.println("");
}

public static void main(String[] args) {
new SimpleLRU();
}
}

Monitors in java

Monitors are an other mechanism of concurrent programming. It’s a higher level mechanism than semaphores and also more powerful. A monitor is an instance of a class that can be used safely by several threads. All the methods of a monitor are executed with mutual exclusion. So at most one thread can execute a method of the monitor at the same time. This mutual exclusion policy makes easier to work with monitor and to develop the method content of the monitor.

Monitors have an other feature, the possibility to make a thread waiting for a condition. During the wait time, the thread temporarily gives up its exclusive access and must reacquire it after the condition has been met. You can also signal one or more threads that a condition has been met.
There is several advantages on using monitors instead of a lower-level mechanisms :
  • All the synchronization code is centralized in one location and the users of this code don’t need to know how it’s implemented.
  • The code doesn’t depend on the number of processes, it works for as many processes as you want
  • You don’t need to release something like a mutex, so you cannot forget to do it
When we must describe a monitor, we simple use the monitor keyword and describe the methods as common methods :
monitor SimpleMonitor {
public method void testA(){
//Some code
}

public method int testB(){
return 1;
}
}


To describe a condition variable, we use the cond keyword. A condition variable is a kind of queue of process who are waiting on the same condition. You have several operations available on a condition, the most important is to signal a process waiting to be awaken and to wait on a condition. There are some similarities between signal/wait operations and P and V of semaphores, but this is a little different. The signal operation does nothing if the queue is empty and the wait operation put always the thread in the waiting queue. The process queue is served in a first come, first served mode. When a thread wakes up after waiting on a condition, it must reacquire the lock before continuing in the code.
Before going further, we must have more informations about the signal operations. When writing monitors, you normally have the choice between several philosophies for the signaling operation :
  1. Signal & Continue (SC) : The process who signal keep the mutual exclusion and the signaled will be awaken but need to acquire the mutual exclusion before going.
  2. Signal & Wait (SW) : The signaler is blocked and must wait for mutual exclusion to continue and the signaled thread is directly awaken and can start continue its operations.
  3. Signal & Urgent Wait (SU) : Like SW but the signaler thread has the guarantee than it would go just after the signaled thread
  4. Signal & Exit (SX) : The signaler exits from the method directly after the signal and the signaled thread can start directly. This philosophy is not often used.
The available policies depends on the programming language, in Java, there is only one policy available, the SC one.
In Java there is no keyword to directly create a monitor. To implement a monitor, you must create a new class and use Lock and Condition classes. Lock is the interface is ReentrantLock is the main used implementation, this is the one that we’ll learn to use in the current post. To create a ReentrantLock, you have two constructors, a default constructor and a constructor with a boolean argument indicating if the lock is fair or not. A fair lock indicates that the threads will acquire the locks in the order they ask for. Fairness is a little heavier than default locking strategies, so use it only if you need it. To acquire the lock, you just have to use the method lock and unlock to release it.
The explicit locks have the same memory semantics than the synchronized blocks. So the visibility of the changes is guarantee when you use lock()/unlock() blocks.
So to implement, the monitor example we’ve seen before, we just need to create a class and use the lock to make the mutual exclusion :

public class SimpleMonitor {
private final Lock lock = new ReentrantLock();

public void testA() {
lock.lock();

try {
//Some code
} finally {
lock.unlock();
}
}

public int testB() {
lock.lock();

try {
return 1;
} finally {
lock.unlock();
}
}
}

The person who’ve already read the other parts of this post set will say that it will be easier to use the synchronized keyword on the two methods. But with synchronized, we will not have the condition variables. If you don’t need condition variables but only locking, it will be easier to use the synchronized blocks instead of Locks.

You can create conditions using the newCondition method on the lock. A condition is a variable of type Condition. You can make the current thread wait on the condition using the await method (and its variant with timeout) and you can signal threads using signal and signalAll methods. The signalAll methods wakes up all the threads waiting on the condition variable.
Let’s try with a simple common example : A bounded buffer. It’s a cyclic buffer with a certain capacity with a start and an end.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
private final String[] buffer;
private final int capacity;

private int front;
private int rear;
private int count;

private final Lock lock = new ReentrantLock();

private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

public BoundedBuffer(int capacity) {
super();

this.capacity = capacity;

buffer = new String[capacity];
}

public void deposit(String data) throws InterruptedException {
lock.lock();

try {
while (count == capacity) {
notFull.await();
}

buffer[rear] = data;
rear = (rear + 1) % capacity;
count++;

notEmpty.signal();
} finally {
lock.unlock();
}
}

public String fetch() throws InterruptedException {
lock.lock();

try {
while (count == 0) {
notEmpty.await();
}

String result = buffer[front];
front = (front + 1) % capacity;
count--;

notFull.signal();

return result;
} finally {
lock.unlock();
}
}
}


So some explications :
  1. The two methods are protected with the lock to ensure mutual exclusion
  2. Then we use two conditions variables. One to wait for the buffer to be not empty and an other one to wait for the buffer to be not full.
  3. You can see that I have wrapped the await operation on a while loop. This is to avoid signal stealers problem that can occurs when using Signal & Continue
And that BoundedBuffer can be easily used with several threads with no problems.
As you can see, you can use monitors to solve a lot of concurrent programming problems and this mechanism is really powerful and performing.
I hope you find this post useful.


Wednesday, 22 June 2011

Concurrent hashmap in java

ConcurrentHashMap extends the abstract class AbstractMap and implements the ConcurrentMap interface. This class follows the operational specification and the similar functional specification as of Hashtable, for updating a hash table supports the fully concurrency of the recoveries and adjustable expected concurrency. This class allows the variants of methods correspondence to each method of Hashtable and also doesn't permit the 'null' for the 'key' or 'value'. All the operations of this class are thread-safe. Use of ConcurrentHashMap increases the performance because it permits the multiple threads to modify the map concurrently without require to block them however performance may be poor if the single threads access the map at a time.

Syntax

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>

Constructor's of ConcurrentHashMap

This class provides constructor through which we can create map according to our requirement :
ConcurrentHashMap() : This constructor makes a new vacate map of default size (16), default load factor (0.75) and concurrencyLevel (16).
ConcurrentHashMap(int initialCapacity) : This constructor makes a new vacate map of capacity defined at time of instantiation according to requirement with default load factor (0.75) and concurrencyLevel (16).
ConcurrentHashMap(int initialCapacity, float loadFactor) : This constructor makes a new vacate map of capacity and load factor defined at time of instantiation according to requirement and with default concurrencyLevel (16).
ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) : This constructor makes a new vacate map of capacity, load factor, and with concurrencyLevel defined at the time of instantiation according to requirement.
ConcurrentHashMap(Map<? extends K,? extends V> m) : This constructor makes the similar mappings according to the map given.

Methods of ConcurrentHashMap

This class provides methods some of the commonly used are :
  1. put()
  2.                         syntax : public V put(K key,V value)
  3. clear()
  4.                         syntax : public void clear()
  5. elements()
  6.                         syntax : public Enumeration elements()
  7. remove(Object key, Object value)
  8.                         syntax : public boolean remove(Object key,Object value)
  9. replace(K key, V value)
  10.                         syntax : public V replace(K key,V value)
  11. values()
  12.                         syntax : public Collection values()
  13. size()
  14.                         syntax : public int size()
Example :

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
class A implements Runnable {
String name;
ConcurrentMap cm;
public A(ConcurrentMap cm, String name) {
this.name = name;
this.cm = cm;
}
public void run() {
try {
cm.put(1, "A");
cm.put(2, "B");
cm.put(3, "C");
cm.put(4, "D");
cm.put(5, "E");
System.out.println(name + " maps the element : " + cm);
System.out.println(name + " represents the set of keys: "
+ cm.keySet());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class B implements Runnable {
String name;
ConcurrentMap cm;
public B(ConcurrentMap cm, String name) {
this.name = name;
this.cm = cm;
}
public void run() {
try {
boolean j = cm.remove(3, "C");
System.out.println(name + " removes the element : " + j);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class C implements Runnable {
String name;
ConcurrentMap cm;
public C(ConcurrentMap cm, String name) {
this.name = name;
this.cm = cm;
}
public void run() {
try {
Set s = cm.keySet();
System.out.println(name + " represents
the set of keys : " + s);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConcurrentMapDemo {
public static void main(String[] args) {
ConcurrentMap cm =
new ConcurrentHashMap();
Runnable a = new A(cm, "A");
Runnable b = new B(cm, "B");
Runnable c = new C(cm, "C");
new Thread(a).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(b).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(c).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Output
A maps the element : {5=E, 4=D, 3=C, 2=B, 1=A}

A represents the set of keys: [5, 4, 3, 2, 1]

B removes the element : true

C represents the set of keys : [5, 4, 2, 1]



ConcurrentHashSet in Java from ConcurrentHashMap

While you do have a ConcurrentHashMap class in Java, there is no ConcurrentHashSet.
Solution
You can easily get a ConcurrentHashSet with the following code -

Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>())

Notes
  • A Set lends itself to implementation via a Map if you think about it. So can actually just use a Map. But that may not fit in well with the context of your use.
  • The HashSet class internally uses a HashMap.
  • The ConcurrentHashSet obtained via the method inherits pretty much all the concurrency features of the underlying collection.
  • See api-docs for newSetFromMap

Blocking queues in java

The key facilities that BlockingQueue provides to such systems are, as its name implies, enqueuing and dequeueing methods that do not return until they have executed successfully. So, for example, a print server does not need to constantly poll the queue to discover whether any print jobs are waiting; it need only call the poll method, supplying a timeout, and the system will suspend it until either a queue element becomes available or the timeout expires. BlockingQueue defines seven new methods, in three groups:


Group1 : Adding an Element

boolean offer(E e, long timeout, TimeUnit unit)
// insert e, waiting up to the timeout
void put(E e) // add e, waiting as long as necessary

The nonblocking overload of offer defined in Queue will return false if it cannot immediately insert the element. This new overload waits for a time specified using java.util.concurrent.TimeUnit, an Enum which allows timeouts to be defined in units such as milliseconds or seconds.
Taking these methods together with those inherited from Queue, there are four ways in which the methods for adding elements to a BlockingQueue can behave: offer returns false if it does not succeed immediately, blocking offer returns false if it does not succeed within its timeout, add throws an exception if it does not succeed immediately, and put blocks until it succeeds.

Group2  : Removing an Element
E poll(long timeout, TimeUnit unit)
// retrieve and remove the head, waiting up to the timeout
E take() // retrieve and remove the head of this queue, waiting
// as long as necessary

Again taking these methods together with those inherited from Queue, there are four ways in which the methods for removing elements from a BlockingQueue can behave: poll returns null if it does not succeed immediately, blocking poll returns null if it does not succeed within its timeout, remove throws an exception if it does not succeed immediately, and take blocks until it succeeds.

Group 3 : Retrieving or Querying the Contents of the Queue
int drainTo(Collection<? super E> c)
// clear the queue into c
int drainTo(Collection<? super E> c, int maxElements)
// clear at most the specified number of elements into c
int remainingCapacity()
// return the number of elements that would be accepted
// without blocking, or Integer.MAX_VALUE if unbounded


The drainTo  methods perform atomically and efficiently, so the second overload is useful in situations in which you know that you have processing capability available immediately for a certain number of elements, and the first is useful for example when all producer threads have stopped working. Their return value is the number of elements transferred. RemainingCapacity reports the spare capacity of the queue, although as with any such value in multi-threaded contexts, the result of a call should not be used as part of a test-then-act sequence; between the test (the call of remainingCapacity) and the action (adding an element to the queue) of one thread, another thread might have intervened to add or remove elements.
BlockingQueue guarantees that the queue operations of its implementations will be thread-safe and atomic.
But this guarantee doesn't extend to the bulk operations inherited from CollectionaddAll, containsAll, retainAll and removeAllunless the individual implementation provides it. So it is possible, for example, for addAll to fail, throwing an exception, after adding only some of the elements in a collection.

Blocking queue has the following characteristics:
  • methods to add an item to the queue, waiting for space to become available in the queue if necessary;
  • corresponding methods that take an item from the queue, waiting for an item to put in the queue if it is empty;
  • optional time limits and interruptibility on the latter calls;
  • efficient thread-safety: blocking queues are specifically designed to have their put() method called from one thread and the take() method from another— in particular, items posted to the queue will be published correctly to any other thread taking the item from the queue again; significantly, the implementations generally achieve this without locking the entire queue, making them highly concurrent components;
  • integration with Java thread pools: a flavour of blocking queue can be passed into the constructor of ThreadPoolExecutor to customise the behaviour of the thread pool.
Implementations of blocking queue
ArrayBlockingQueue : A simple bounded BloickingQueue implementation backed by an array.

DelayQueue : An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.It uses elements that implement the new java.util.concurrent.Delayed interface.

PriorityBlockingQueue : This queue bases ordering on a specified Comparator, and the element returned by any take( ) call is the smallest element based on this ordering.

LinkedBlockingQueue : A simple bounded BloickingQueue implementation backed by a linked list.

SynchronousQueue : This queue has a size of zero (yes, you read that correctly). It blocks put( ) calls until another thread calls take( ), and blocks take( ) calls until another thread calls put( ). Essentially, elements can only go directly from a producer to a consumer, and nothing is stored in the queue itself (other than for transition purposes).


Example - Producer consumer problem with Blocking queue
The queue takes care of all the details of synchronizing access to its contents and notifying other threads of the availability of data.

Producer.java
public class Producer extends Thread {
private BlockingQueue cubbyhole;
private int number;

public Producer(BlockingQueue c, int num) {
cubbyhole = c;
number = num;
}

public void run() {
for (int i = 0; i < 10; i++) {
try {
cubbyhole.put(i);
System.out.format("Producer #%d put: %d%n", number, i);
sleep((int)(Math.random() * 100));
} catch (InterruptedException e) { }
}
}
}

Consumer.java

import java.util.concurrent.*;
public class Consumer extends Thread {
private BlockingQueue<Integer> cubbyhole;
private int number;

public Consumer(BlockingQueue<Integer> c, int num) {
cubbyhole = c;
number = num;
}

public void run() {
int value = 0;
for (int i = 0; i < 10; i++) {
try {
value = cubbyhole.take();
System.out.format("Consumer #%d got: %d%n", number, value);
} catch (InterruptedException e) { }
}
}
}


ProducerConsumerTest.java

public class ProducerConsumerTest {
public static void main(String[] args) {

ArrayBlockingQueue c = new ArrayBlockingQueue(1);
Producer p1 = new Producer(c, 1);
Consumer c1 = new Consumer(c, 1);

p1.start();
c1.start();
}
}

Possible Use cases for BlockingQueue

These features make BlockingQueues useful for cases such as the following:
  • a server, where incoming connections are placed on a queue, and a pool of threads picks them up as those threads become free;
  • in a variety of parallel processes, where we want to manage or limit resource usage at different stages of the process.

invokeAll via ExecutorService

Syntax of this method is like this(in java 6):
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
throws InterruptedException

In traditional Java – If we have to release multiple threads- we have to create Thread objects and call
Start method one by one.
In Java 5.0 and above – If we can put all callable objects in a collection object and pass the collection objects to ExecutorService to release.

The invokeAll() method invokes all of the Callable objects you pass to it in the collection passed as parameter. The invokeAll() returns a list of Future objects via which you can obtain the results of the executions of each Callable. invokeAll is a blocking method. It means – JVM won’t proceed to next line until all the threads are complete.


Keep in mind that a task might finish due to an exception, so it may not have "succeeded". There is no way on a Future to tell the difference.

Example:
ExecutorService executorService = Executors.newFixedThreadPool();

List<Callable<String>> callables = new ArrayList<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}

executorService.shutdown();