Showing posts with label synchronizer. Show all posts
Showing posts with label synchronizer. Show all posts

Saturday, 18 June 2011

Latches

http://ydtech.blogspot.com/2010/06/countdownlatch-by-example.html

A latch is a boolean condition that is set at most once, ever. Once a single release is issued, all acquires will pass.

Latch is switch or gate
In concurrent programming, a latch is a type of "switch" or "gate". The latch is set up with a particular count value. The count is then counted down, and at strategic moments, a thread or threads waits for the countdown to reach zero before continuing to perform some process. Note that this is a one-off process: once the latch reaches zero, there is no way to reset the count.

So threads can then either count down on the latch or wait for it to reach 0. When the latch reaches 0, all waiting threads are released.

The CountDownLatch class allows one to prevent a set of threads from running until you are ready for them to. For example, you might want to create the threads and then do some initialization tasks before starting them all simultaneously.

CountDownLatch’s constructor takes an integer as a parameter which decides its behavior. Calling await() holds execution till countdownLatch’s count(constructor parameter) become zero and countdown() reduces the count on every call. So calling await() blocks the thread until the count reaches zero.  

Example

package com.vaani.countdown;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo {

public static void main(String[] args) throws Exception {
int nThreads = 3;
final CountDownLatch startLatch = new CountDownLatch(nThreads);
final CountDownLatch endLatch = new CountDownLatch(nThreads);

ExecutorService svc = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
log("At run()");
startLatch.countDown();
startLatch.await();

log("Do work");
Thread.sleep((int) (Math.random() * 1000));

log("Wait for end");
endLatch.countDown();
endLatch.await();

log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}

private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}

In this code, you’ll see two latches get initialized. Each thread that starts up counts down on the latch and awaits the latch counting down to 0 (when all threads have been initialized). Similarly, each thread waits for all threads to complete at the same time.

Sample output of this program is:

1308458964186: 8 At run()
1308458964286: 9 At run()
1308458964386: 10 At run()
1308458964386: 8 Do work
1308458964386: 10 Do work
1308458964387: 9 Do work
1308458964952: 8 Wait for end
1308458965037: 9 Wait for end
1308458965277: 10 Wait for end
1308458965277: 9 Done
1308458965277: 8 Done
1308458965277: 10 Done


You can see that each thread hits run() at different times, but proceeds past the barrier at the same time. They each then do some random amount of work and wait for the latch, then proceed past it together.

In the example above, each thread waits forever for the latch to trigger. You can also choose to wait for a specified time period before giving up. And you can check the latch to see how many threads have arrived and are now waiting. Each CountDownLatch instance can only be used once and is then dead.

Download the source


You can download the source code of the above program from here.

Useful Tricks with Semaphores

Release doesn't have to be called by same thread as acquire

One interesting property of Semaphores in Java is that release doesn’t have to be called by the same thread as acquire. This means you could have a thread limiter that pools or creates threads based on a semaphore by calling acquire(). Then, the running thread could release its own semaphore permit when it completes. This is a useful property that we don’t have with normal mutexes in Java.

Dynamically increase or decrease the permits

Another trick is to increase or decrease the number of permits at runtime.
To increase the number of Permits
Contrary to what you might guess, the number of permits in a semaphore isn’t fixed, and a call to release() will always increment the number of permits, even if no corresponding acquire() call was made. Note that this can also result in bugs if you are incorrectly calling release() when no acquire() was made.

To decrease number of permits

To be specific, if 10 threads currently have permits and 7 is the new limit, we won’t try to communicate to any threads that they are now in violation of the new limit. What we can do is make sure that the next call to acquire() will block until enough threads (4, to be precise) have called release() to bring the number of outstanding permits to below the new limit. It’s not infeasible to interrupt threads that are holding permits beyond the new upper limit, but that’s beyond the scope of a simple semaphore.

If you wanted to reconfigure it to only allow 7 permits, you have several options:

  • You call acquire() three times. This might work, or it might block forever — if more than 7 permits are currently in use, one (or more) of your acquire() calls could block for an arbitrarily long amount of time. Checking availablePermits() isn’t going to help, either, since using that to gauge whether or not an acquire() call will block is a classic check-then-act race condition. Even if the race condition could be avoided, it doesn’t help: instead of blocking, you’ll just keep getting 0 back from availablePermits().
  • You could call drainPermits() repeatedly until at least 3 permits had been drained (and then release() any extra that were drained beyond the 3 you need). This could take an arbitrarily long time.
  • You could call tryAcquire() repeatedly until you’ve gotten 3 permits. This also could take an arbitrarily long time.

These approaches all share two flaws:

  1. Most importantly, they block until as many permits as you are trying to remove have become available
  2. They don’t really achieve the goal directly

To be precise, the goal is that the semaphore should release fewer permits to the threads that are using the semaphore to control access to the shared resource. Acquiring (and presumably not releasing) permits is one way to achieve the goal, but it is not necessarily the only way. Put another way, the thread that is executing the reconfiguration does not technically need to acquire permits (whether it be by acquire() or drainPermits() or tryAcquire(), etc) to fulfill this requirement, since it doesn’t need to access the shared resource that the semaphore is guarding. So we come to the last solution - Extending Semaphore to use reducePermits().

Some more useful methods

Finally, there are a few useful methods to be familiar with in Java’s Semaphore. The method acquireInterruptibly() will acquire a resource, reattempting if it is interrupted. This means no outside handling of InterruptedException. The method tryAcquire() allows us to limit how long we will wait for a permit – we can either return immediately if there is no permit to obtain, or wait a specified timeout. If you somehow have known deadlocks that you can’t fix easily or track down, you could help prevent locking up processes by using tryAcquire() with suitable timeouts.

Handling Semaphores with care

Exception is raised
As with most methods of locking or synchronization, there are some potential issues.

The number one thing to remember is, always release what you acquire. This is done by using try..finally constructs. Example:

// Bad -- semaphore not released if the exception is thrown
try {
sem.acquire();
sharedResource.doStuff();
sem.release();
} catch (IOException e) {
logger.warn("The resource is broken", e);
}

So instead use try…finally:

// Good
try {
sem.acquire();
sharedResource.doStuff();
} catch (IOException e) {
logger.warn("The resource is broken", e);
} finally {
sem.release();
}


Issue of lock ordering
The following class shows a deadlock that only the luckiest of you will avoid. You’ll notice that the two threads which acquire the two semaphore permits do so in opposite order. (try..finally is left out for the sake of brevity).

public static void main(String[] args) throws Exception {
Semaphore s1 = new Semaphore(1);
Semaphore s2 = new Semaphore(1);

Thread t = new Thread(new DoubleResourceGrabber(s1, s2));
// now reverse them ... here comes trouble!
Thread t2 = new Thread(new DoubleResourceGrabber(s2, s1));

t.start();
t2.start();

t.join();
t2.join();
System.out.println("We got lucky!");
}

private static class DoubleResourceGrabber implements Runnable {
private Semaphore first;
private Semaphore second;

public DoubleResourceGrabber(Semaphore s1, Semaphore s2) {
first = s1;
second = s2;
}

public void run() {
try {
Thread t = Thread.currentThread();

first.acquire();
System.out.println(t + " acquired " + first);

Thread.sleep(200); // demonstrate deadlock

second.acquire();
System.out.println(t + " acquired " + second);

second.release();
System.out.println(t + " released " + second);

first.release();
System.out.println(t + " released " + first);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}


If you run this, you will more than likely have a hung process. Issues of lock ordering apply to semaphores as much as regular mutexes or synchronization in Java. In some cases, timeouts (see note on tryAcquire() later in the article) can be used to prevent deadlocks from causing a process to hang up, but typically a deadlock is a symptom of a logic error which can be avoided. If you’re unfamiliar with deadlocks, I recommend you read up on them. Wikipedia has a decent article on deadlocks which applies to all languages equally.

The main things that you should be careful of when using semaphores (including binary semaphores, i.e. mutexes) are:



  • Not releasing after acquire (either missing release call or an exception is thrown and there is no finally block)
  • Long held semaphores, causing thread starvation
  • Deadlocks (as seen above)

Uses of semaphores

What are some possible uses for counting semaphores? The following come to mind:

  • Limiting concurrent access to disk (this can kill performance due to competing disk seeks)
  • Thread creation limiting
  • JDBC connection pooling / limiting
  • Network connection throttling
  • Throttling CPU or memory intensive tasks

Java locks - Semaphores

Semaphores ensure that only n threads can enter into the critical section of the code at a given time.

Semaphores concept was invented by the famous Dutch computer scientist Edsger Dijkstra. Basically a semaphore is a counter (integer) that allows a thread to get into a critical region if the value of the counter is greater than 0. If it’s the case, the counter is decremented by one otherwise, the thread is waiting until it can go. And when the thread go away from the critical region, the counter is incremented by one to allow one more thread to pass the critical region. A semaphore is created with a certain value for its counter. So, you can execute two actions on a semaphore P and V.

V is also known as signal and it increments the semaphore. P is also known as wait and it decrements semaphore.

By example, if you have a critical that cannot be executed concurrently, you can use a semaphore :

sem mutex = new sem(1)

P(mutex)
//Critical region
V(mutex)

So you must always call by yourself the P operation before the critical region and V after it. We call a mutex (mutual exclusion) a semaphore with a value of one. So only one thread can enter the region guarded by the semaphore. This is the most used semaphore. The other use of semaphore is to guard a set of resources like database connections or a data pool. In Java, Semaphore were introduced in jdk5. A semaphore is created using the java.util.concurrent.Semaphore class. You can create easily :
Semaphore mutex = new Semaphore(1);
Semaphore available = new Semaphore(100);

The P and V operations

As discussed above, the P and V operations are represented using the acquire and release methods. The method acquire can be interrupted if the thread is interrupted. There is an ininterruptible version with the method acquireUninterruptibly(). There is also a third version with the tryAcquire method. This method acquire a permit only if there is one permit available, otherwise, this method return false directly. All the waiting methods have also an overloaded version with a timeout. You can also acquire several permits at once using the permits argument to the different versions of acquire methods.

Examples

A little example with a mutex using the same example as the previous post on Java concurrency :
public class Counter {
private int value = 0;

private final Semaphore mutex = new Semaphore(1)

public int getNextValue() throws InterruptedException {
try {
mutex.acquire();
return value++;
} finally {
mutex.release();
}
}
}
Consider the test class:
package com.vaani.semaphores.demo;

import java.util.concurrent.Semaphore;

import com.vaani.mutex.Counter;

public class MutexDemo extends Thread{
public static void main(String[] args)
{
// Limiting No on threads running to 2
Counter counter = new Counter();
for(int i=0 ;i<5;i++){
new MutexThread(String.valueOf(i),counter ).start();
}
System.out.println("End of Semaphore Test");
}
}


class MutexThread extends Thread
{

private String name = null;
private Counter counter =null;
public MutexThread(String name,Counter counter_){

this.name = name;
this.counter = counter_;
}


public void run(){
try {
System.out.println("Counter is now "+counter.getNextValue());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

For more informations about Semaphore in Java, the best is to consult the Javadoc of the Semaphore class.

Simple Semaphore Example

Consider a simple semaphore in which we just acquire lock and our work and release it:
import java.util.*;
import java.util.concurrent.*;
class SimpleSemaphoreDemo
{
public static void main(String[] args)
{
// Limiting No on threads running to 2
Semaphore semaphore = new Semaphore(2);
for(int i=0 ;i<5;i++){
new MyThread(String.valueOf(i),semaphore).start();
}
System.out.println("End of Semaphore Test");
}
}


class MyThread extends Thread
{

private String name = null;
private Semaphore semaphore =null;
public MyThread(String name,Semaphore semaphore){

this.name = name;
this.semaphore = semaphore;
}


public void run(){
try{
semaphore.acquire();
System.out.println("Thread "+ name +" is start running");
sleep(500);
semaphore.release();
System.out.println("Thread "+ name +" Ends");
}catch(Exception exp){
exp.printStackTrace();
}
}

}
In this example, we have created 5 threads but no of running threads are limited to only 2, as the line :
Semaphore semaphore = new Semaphore(2); Then we are calling acquire and release functions. The output in this case will look like this :

Thread 0 is start running
End of Semaphore Test
Thread 1 is start running
Thread 0 Ends
Thread 2 is start running
Thread 1 Ends
Thread 3 is start running
Thread 3 Ends
Thread 2 Ends
Thread 4 is start running
Thread 4 Ends

If we put semaphore for 3 threads:
Semaphore semaphore = new Semaphore(3); Now the output will look like this :
Thread 0 is start running
Thread 1 is start running
Thread 2 is start running
End of Semaphore Test
Thread 4 is start running
Thread 2 Ends
Thread 3 is start running
Thread 0 Ends
Thread 1 Ends
Thread 3 Ends
Thread 4 Ends

Conclusion

To conclude, semaphores are a powerful ways to solve concurrency problems, but this is not adapted to all problems. If you need only mutual exclusion, synchronized blocks are a better solutions. The problems with semaphores is that you can forget to call the release method and that can cause deadlock sometimes difficult to find.


Download the source


Source code of this example can be downloaded from here.

Synchronizers in java

Few of the recent posts described the use of the various constructs provided by Java 5, through the java.util.concurrent package. The next set describes which scenarios can be solved by the use of each of these constructs. I tried to gather the usage scenarios for the various synchronizers available in Java 5 in this post.
 

These are synchronizers in java -

Brief introduction to above synchronizers

  • Semaphores: A semaphore is the classic method for restricting access to shared resources in a multi-processing environment. While a synchronized block allows only one thread to access a resource, a semaphore allows multiple threads to access a shared resource. Semaphores are often used to restrict the number of threads than can access some resource.
    • Maintaining multiple connections to a database: Define a semaphore, which has same number of permits as there are connections to the database. If all the permits are used, then a thread requesting a connection will be blocked until another thread releases a permit, when this thread may acquire a permit.
    • Binary semaphore can be used in place of any Lock implementation. Such a implementation has an advantage when recovering from deadlocks, since the lock can be unlocked by another thread.
    • When throughput advantages of non-fair ordering often outweigh fairness considerations. Semphores allow barging behaviour. The tryAcquire() method can be used for barging ahead of other threads, irrespective of fairness settings. The tryAcquire(0, TimeUnit.SECONDS) respects fairness setting.
  • Barriers: A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. Scenrarios:
    • Joins: When you join a set of threads and start a new set, there may be a need for each thread to save state at the join point. A cyclic barrier may be used for such a scenario.
  • Latches: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. Usage Scenarios:
    • Divide a problem into N parts, describe each part with a Runnable that executes that portion and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating thread will be able to pass through await.
    • Latching works well with initialization tasks, where you want no process to run until everything needed is initialized.
  • Exchangers: A synchronization point at which two threads can exchange objects, the condition being that the exchanging threads have to be paired up, and a specific data type must be exchanged. Usage Scenarios:
    • An Exchanger is often used when you have two threads, one consuming a resource, and the other producing it. Similar to the producer/consumer problem, but where the buffer can be in one of only two states - empty or full.

Friday, 17 June 2011

java.util.concurrent.CountDownLatch


Exchanger


CyclicBarrier

As written in java docs Cyclic barrier is -
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

So if a given thread should do its calculation and then wait until all the other threads are finished before starting to calculate the next step, you might have to use a barrier synchronization technique. This term implies that a thread is stopped at the barrier until all threads reach it. Once all have reached it, the barrier is lowered and they all start a new round of step calculations and all again wait at the barrier after they finish their work.

It is a synchronization mechanism that can synchronize threads progressing through some algorithm. In other words, it is a barrier that all threads must wait at, until all threads reach it, before any of the threads can continue.

Understanding the CyclicBarrier

Java concurrency utilities offers java.util.concurrent.CyclicBarrier class to provide us with barrier. (It's called cyclic because it can be reused after each time the waiting threads are released by it.)

Firstly, the barrier is constructed with the following:

  • the number of threads that will be participating in the parallel operation;
  • optionally, an amalgamation routine to run at the end of each stage/iteration.

Then, at each stage (or on each iteration) of the operation:

  • each thread carries out its portion of the work;
  • after doing its portion of the work, each thread calls the barrier's await() method;
  • the await() method returns only when:
    • all threads have called await();
    • the amalgamation method has run (the barrier calls this on the last thread to call await() before releasing the awaiting threads).
  • if any of the threads is interrupted or times out while waiting for the barrier, then the barrier is "broken" and all other waiting threads receive a BrokenBarrierException.

In other words, this last point means there is a mechanism for an error/timeout in one of the worker threads to "ripple out" to all threads and for the operation to halt, or for the operation to be interrupted externally by interrupting just one of the threads. 

Examples

A common use for this is in multi-threaded testing where it is typical to start a bunch of threads, meet, do some stuff, meet, validate some assertions, repeatedly. Lets see the example :

package com.vaani.cyclicbarrier;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo {

public static void main(String[] args) throws Exception {
int nThreads = 3;
final CyclicBarrier barrier = new CyclicBarrier(nThreads);

ExecutorService esvc = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
esvc.execute(new Runnable() {
public void run() {
try {
log("At run()");
barrier.await();

log("Do work");
Thread.sleep((int) (Math.random() * 1000));

log("Wait for end");
barrier.await();

log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}

private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}


Another nice trick with CyclicBarrier is that a Runnable action can be associated with the barrier to be run by the last thread reaching the barrier. You can very simply build a start/end timer for testing with this functionality:

package com.vaani.cyclicbarrier;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TimerBarrierDemo {

public static void main(String[] args) throws Exception {
int threads = 3;
final CyclicBarrier barrier = new CyclicBarrier(threads, new BarrierTimer());

ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
barrier.await();
long sleepTime = (int) (Math.random() * 1000);
System.out.println(Thread.currentThread().getId()
+ " working for " + sleepTime);
Thread.sleep(sleepTime);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

private static class BarrierTimer implements Runnable {
private long start;

public void run() {
if (start == 0) {
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
long elapsed = (end - start);
System.out.println("Completed in " + elapsed + " ms");
}
}
}
}


Here we rely on knowing that the barrier will be reached exactly twice – once at start and once at end. The first time it’s reached, we record a timestamp and the second time it’s reached we print out the timing. When we construct our barrier, we give it an instance of this timer class. Each thread then waits to start on the barrier, works for a random amount of time, and waits for the end barrier.

A run looks like this:

10 working for 136
9 working for 391
8 working for 720
Completed in 721 ms


Generally, you should expect the recorded elapsed time to be the maximum of the working time of any of the threads.

CyclicBarrier has a few additional tricks as well – threads can wait for a time period instead of forever, check whether a barrier has been broken (by interruption or forcibly with a reset() method), and determine the number of parties and the number currently waiting.


Download the Source


Source code can be downloaded from here.