Friday, 17 June 2011

CyclicBarrier

As written in java docs Cyclic barrier is -
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

So if a given thread should do its calculation and then wait until all the other threads are finished before starting to calculate the next step, you might have to use a barrier synchronization technique. This term implies that a thread is stopped at the barrier until all threads reach it. Once all have reached it, the barrier is lowered and they all start a new round of step calculations and all again wait at the barrier after they finish their work.

It is a synchronization mechanism that can synchronize threads progressing through some algorithm. In other words, it is a barrier that all threads must wait at, until all threads reach it, before any of the threads can continue.

Understanding the CyclicBarrier

Java concurrency utilities offers java.util.concurrent.CyclicBarrier class to provide us with barrier. (It's called cyclic because it can be reused after each time the waiting threads are released by it.)

Firstly, the barrier is constructed with the following:

  • the number of threads that will be participating in the parallel operation;
  • optionally, an amalgamation routine to run at the end of each stage/iteration.

Then, at each stage (or on each iteration) of the operation:

  • each thread carries out its portion of the work;
  • after doing its portion of the work, each thread calls the barrier's await() method;
  • the await() method returns only when:
    • all threads have called await();
    • the amalgamation method has run (the barrier calls this on the last thread to call await() before releasing the awaiting threads).
  • if any of the threads is interrupted or times out while waiting for the barrier, then the barrier is "broken" and all other waiting threads receive a BrokenBarrierException.

In other words, this last point means there is a mechanism for an error/timeout in one of the worker threads to "ripple out" to all threads and for the operation to halt, or for the operation to be interrupted externally by interrupting just one of the threads. 

Examples

A common use for this is in multi-threaded testing where it is typical to start a bunch of threads, meet, do some stuff, meet, validate some assertions, repeatedly. Lets see the example :

package com.vaani.cyclicbarrier;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo {

public static void main(String[] args) throws Exception {
int nThreads = 3;
final CyclicBarrier barrier = new CyclicBarrier(nThreads);

ExecutorService esvc = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
esvc.execute(new Runnable() {
public void run() {
try {
log("At run()");
barrier.await();

log("Do work");
Thread.sleep((int) (Math.random() * 1000));

log("Wait for end");
barrier.await();

log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}

private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}


Another nice trick with CyclicBarrier is that a Runnable action can be associated with the barrier to be run by the last thread reaching the barrier. You can very simply build a start/end timer for testing with this functionality:

package com.vaani.cyclicbarrier;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TimerBarrierDemo {

public static void main(String[] args) throws Exception {
int threads = 3;
final CyclicBarrier barrier = new CyclicBarrier(threads, new BarrierTimer());

ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
barrier.await();
long sleepTime = (int) (Math.random() * 1000);
System.out.println(Thread.currentThread().getId()
+ " working for " + sleepTime);
Thread.sleep(sleepTime);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

private static class BarrierTimer implements Runnable {
private long start;

public void run() {
if (start == 0) {
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
long elapsed = (end - start);
System.out.println("Completed in " + elapsed + " ms");
}
}
}
}


Here we rely on knowing that the barrier will be reached exactly twice – once at start and once at end. The first time it’s reached, we record a timestamp and the second time it’s reached we print out the timing. When we construct our barrier, we give it an instance of this timer class. Each thread then waits to start on the barrier, works for a random amount of time, and waits for the end barrier.

A run looks like this:

10 working for 136
9 working for 391
8 working for 720
Completed in 721 ms


Generally, you should expect the recorded elapsed time to be the maximum of the working time of any of the threads.

CyclicBarrier has a few additional tricks as well – threads can wait for a time period instead of forever, check whether a barrier has been broken (by interruption or forcibly with a reset() method), and determine the number of parties and the number currently waiting.


Download the Source


Source code can be downloaded from here.

No comments:

Post a Comment