Showing posts with label today. Show all posts
Showing posts with label today. Show all posts

Wednesday, 22 June 2011

Concurrent hashmap in java

ConcurrentHashMap extends the abstract class AbstractMap and implements the ConcurrentMap interface. This class follows the operational specification and the similar functional specification as of Hashtable, for updating a hash table supports the fully concurrency of the recoveries and adjustable expected concurrency. This class allows the variants of methods correspondence to each method of Hashtable and also doesn't permit the 'null' for the 'key' or 'value'. All the operations of this class are thread-safe. Use of ConcurrentHashMap increases the performance because it permits the multiple threads to modify the map concurrently without require to block them however performance may be poor if the single threads access the map at a time.

Syntax

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>

Constructor's of ConcurrentHashMap

This class provides constructor through which we can create map according to our requirement :
ConcurrentHashMap() : This constructor makes a new vacate map of default size (16), default load factor (0.75) and concurrencyLevel (16).
ConcurrentHashMap(int initialCapacity) : This constructor makes a new vacate map of capacity defined at time of instantiation according to requirement with default load factor (0.75) and concurrencyLevel (16).
ConcurrentHashMap(int initialCapacity, float loadFactor) : This constructor makes a new vacate map of capacity and load factor defined at time of instantiation according to requirement and with default concurrencyLevel (16).
ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) : This constructor makes a new vacate map of capacity, load factor, and with concurrencyLevel defined at the time of instantiation according to requirement.
ConcurrentHashMap(Map<? extends K,? extends V> m) : This constructor makes the similar mappings according to the map given.

Methods of ConcurrentHashMap

This class provides methods some of the commonly used are :
  1. put()
  2.                         syntax : public V put(K key,V value)
  3. clear()
  4.                         syntax : public void clear()
  5. elements()
  6.                         syntax : public Enumeration elements()
  7. remove(Object key, Object value)
  8.                         syntax : public boolean remove(Object key,Object value)
  9. replace(K key, V value)
  10.                         syntax : public V replace(K key,V value)
  11. values()
  12.                         syntax : public Collection values()
  13. size()
  14.                         syntax : public int size()
Example :

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
class A implements Runnable {
String name;
ConcurrentMap cm;
public A(ConcurrentMap cm, String name) {
this.name = name;
this.cm = cm;
}
public void run() {
try {
cm.put(1, "A");
cm.put(2, "B");
cm.put(3, "C");
cm.put(4, "D");
cm.put(5, "E");
System.out.println(name + " maps the element : " + cm);
System.out.println(name + " represents the set of keys: "
+ cm.keySet());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class B implements Runnable {
String name;
ConcurrentMap cm;
public B(ConcurrentMap cm, String name) {
this.name = name;
this.cm = cm;
}
public void run() {
try {
boolean j = cm.remove(3, "C");
System.out.println(name + " removes the element : " + j);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class C implements Runnable {
String name;
ConcurrentMap cm;
public C(ConcurrentMap cm, String name) {
this.name = name;
this.cm = cm;
}
public void run() {
try {
Set s = cm.keySet();
System.out.println(name + " represents
the set of keys : " + s);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConcurrentMapDemo {
public static void main(String[] args) {
ConcurrentMap cm =
new ConcurrentHashMap();
Runnable a = new A(cm, "A");
Runnable b = new B(cm, "B");
Runnable c = new C(cm, "C");
new Thread(a).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(b).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(c).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Output
A maps the element : {5=E, 4=D, 3=C, 2=B, 1=A}

A represents the set of keys: [5, 4, 3, 2, 1]

B removes the element : true

C represents the set of keys : [5, 4, 2, 1]



ConcurrentHashSet in Java from ConcurrentHashMap

While you do have a ConcurrentHashMap class in Java, there is no ConcurrentHashSet.
Solution
You can easily get a ConcurrentHashSet with the following code -

Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>())

Notes
  • A Set lends itself to implementation via a Map if you think about it. So can actually just use a Map. But that may not fit in well with the context of your use.
  • The HashSet class internally uses a HashMap.
  • The ConcurrentHashSet obtained via the method inherits pretty much all the concurrency features of the underlying collection.
  • See api-docs for newSetFromMap

Blocking queues in java

The key facilities that BlockingQueue provides to such systems are, as its name implies, enqueuing and dequeueing methods that do not return until they have executed successfully. So, for example, a print server does not need to constantly poll the queue to discover whether any print jobs are waiting; it need only call the poll method, supplying a timeout, and the system will suspend it until either a queue element becomes available or the timeout expires. BlockingQueue defines seven new methods, in three groups:


Group1 : Adding an Element

boolean offer(E e, long timeout, TimeUnit unit)
// insert e, waiting up to the timeout
void put(E e) // add e, waiting as long as necessary

The nonblocking overload of offer defined in Queue will return false if it cannot immediately insert the element. This new overload waits for a time specified using java.util.concurrent.TimeUnit, an Enum which allows timeouts to be defined in units such as milliseconds or seconds.
Taking these methods together with those inherited from Queue, there are four ways in which the methods for adding elements to a BlockingQueue can behave: offer returns false if it does not succeed immediately, blocking offer returns false if it does not succeed within its timeout, add throws an exception if it does not succeed immediately, and put blocks until it succeeds.

Group2  : Removing an Element
E poll(long timeout, TimeUnit unit)
// retrieve and remove the head, waiting up to the timeout
E take() // retrieve and remove the head of this queue, waiting
// as long as necessary

Again taking these methods together with those inherited from Queue, there are four ways in which the methods for removing elements from a BlockingQueue can behave: poll returns null if it does not succeed immediately, blocking poll returns null if it does not succeed within its timeout, remove throws an exception if it does not succeed immediately, and take blocks until it succeeds.

Group 3 : Retrieving or Querying the Contents of the Queue
int drainTo(Collection<? super E> c)
// clear the queue into c
int drainTo(Collection<? super E> c, int maxElements)
// clear at most the specified number of elements into c
int remainingCapacity()
// return the number of elements that would be accepted
// without blocking, or Integer.MAX_VALUE if unbounded


The drainTo  methods perform atomically and efficiently, so the second overload is useful in situations in which you know that you have processing capability available immediately for a certain number of elements, and the first is useful for example when all producer threads have stopped working. Their return value is the number of elements transferred. RemainingCapacity reports the spare capacity of the queue, although as with any such value in multi-threaded contexts, the result of a call should not be used as part of a test-then-act sequence; between the test (the call of remainingCapacity) and the action (adding an element to the queue) of one thread, another thread might have intervened to add or remove elements.
BlockingQueue guarantees that the queue operations of its implementations will be thread-safe and atomic.
But this guarantee doesn't extend to the bulk operations inherited from CollectionaddAll, containsAll, retainAll and removeAllunless the individual implementation provides it. So it is possible, for example, for addAll to fail, throwing an exception, after adding only some of the elements in a collection.

Blocking queue has the following characteristics:
  • methods to add an item to the queue, waiting for space to become available in the queue if necessary;
  • corresponding methods that take an item from the queue, waiting for an item to put in the queue if it is empty;
  • optional time limits and interruptibility on the latter calls;
  • efficient thread-safety: blocking queues are specifically designed to have their put() method called from one thread and the take() method from another— in particular, items posted to the queue will be published correctly to any other thread taking the item from the queue again; significantly, the implementations generally achieve this without locking the entire queue, making them highly concurrent components;
  • integration with Java thread pools: a flavour of blocking queue can be passed into the constructor of ThreadPoolExecutor to customise the behaviour of the thread pool.
Implementations of blocking queue
ArrayBlockingQueue : A simple bounded BloickingQueue implementation backed by an array.

DelayQueue : An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.It uses elements that implement the new java.util.concurrent.Delayed interface.

PriorityBlockingQueue : This queue bases ordering on a specified Comparator, and the element returned by any take( ) call is the smallest element based on this ordering.

LinkedBlockingQueue : A simple bounded BloickingQueue implementation backed by a linked list.

SynchronousQueue : This queue has a size of zero (yes, you read that correctly). It blocks put( ) calls until another thread calls take( ), and blocks take( ) calls until another thread calls put( ). Essentially, elements can only go directly from a producer to a consumer, and nothing is stored in the queue itself (other than for transition purposes).


Example - Producer consumer problem with Blocking queue
The queue takes care of all the details of synchronizing access to its contents and notifying other threads of the availability of data.

Producer.java
public class Producer extends Thread {
private BlockingQueue cubbyhole;
private int number;

public Producer(BlockingQueue c, int num) {
cubbyhole = c;
number = num;
}

public void run() {
for (int i = 0; i < 10; i++) {
try {
cubbyhole.put(i);
System.out.format("Producer #%d put: %d%n", number, i);
sleep((int)(Math.random() * 100));
} catch (InterruptedException e) { }
}
}
}

Consumer.java

import java.util.concurrent.*;
public class Consumer extends Thread {
private BlockingQueue<Integer> cubbyhole;
private int number;

public Consumer(BlockingQueue<Integer> c, int num) {
cubbyhole = c;
number = num;
}

public void run() {
int value = 0;
for (int i = 0; i < 10; i++) {
try {
value = cubbyhole.take();
System.out.format("Consumer #%d got: %d%n", number, value);
} catch (InterruptedException e) { }
}
}
}


ProducerConsumerTest.java

public class ProducerConsumerTest {
public static void main(String[] args) {

ArrayBlockingQueue c = new ArrayBlockingQueue(1);
Producer p1 = new Producer(c, 1);
Consumer c1 = new Consumer(c, 1);

p1.start();
c1.start();
}
}

Possible Use cases for BlockingQueue

These features make BlockingQueues useful for cases such as the following:
  • a server, where incoming connections are placed on a queue, and a pool of threads picks them up as those threads become free;
  • in a variety of parallel processes, where we want to manage or limit resource usage at different stages of the process.

RSS Parser (SAX)

RSS (Really Simple Syndication)
RSS is way to publish frequently changing contents like blog posts, news updates, stock quotes & things like that. An RSS document, which is called a “feed,” “web feed,” or “channel,” contains either a summary of content from an associated web site or the full text. RSS formats are specified using XML, a generic specification for the creation of data formats.
I have attached a simple SAX parser for RSS. Please let me know if there is any flaw in the attached code. This code is provided for learning purpose with less focus on coding standards & it’s efficiency. You are free to use & modify it.

Code
RssParser.java

package com.vaani.rss.parser;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;

import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.xml.sax.Attributes;
import org.xml.sax.helpers.DefaultHandler;

public class RssParser extends DefaultHandler
{
private String urlString;
private RssFeed rssFeed;
private StringBuilder text;
private Item item;
private boolean imgStatus;

public RssParser(String url)
{
this.urlString = url;
this.text = new StringBuilder();
}

public void parse()
{
InputStream urlInputStream = null;
SAXParserFactory spf = null;
SAXParser sp = null;

try
{
URL url = new URL(this.urlString);
_setProxy(); // Set the proxy if needed
urlInputStream = url.openConnection().getInputStream();
spf = SAXParserFactory.newInstance();
if (spf != null)
{
sp = spf.newSAXParser();
sp.parse(urlInputStream, this);
}
}

/*
         * Exceptions need to be handled
         * MalformedURLException
         * ParserConfigurationException
         * IOException
         * SAXException
         */

catch (Exception e)
{
System.out.println("Exception: " + e);
e.printStackTrace();
}
finally
{
try
{
if (urlInputStream != null) urlInputStream.close();
}
catch (Exception e) {}
}
}

public RssFeed getFeed()
{
return (this.rssFeed);
}

public void startElement(String uri, String localName, String qName,
Attributes attributes)
{
if (qName.equalsIgnoreCase("channel"))
this.rssFeed = new RssFeed();
else if (qName.equalsIgnoreCase("item") && (this.rssFeed != null))
{
this.item = new Item();
this.rssFeed.addItem(this.item);
}
else if (qName.equalsIgnoreCase("image") && (this.rssFeed != null))
this.imgStatus = true;
}

public void endElement(String uri, String localName, String qName)
{
if (this.rssFeed == null)
return;

if (qName.equalsIgnoreCase("item"))
this.item = null;

else if (qName.equalsIgnoreCase("image"))
this.imgStatus = false;

else if (qName.equalsIgnoreCase("title"))
{
if (this.item != null) this.item.title = this.text.toString().trim();
else if (this.imgStatus) this.rssFeed.imageTitle = this.text.toString().trim();
else this.rssFeed.title = this.text.toString().trim();
}

else if (qName.equalsIgnoreCase("link"))
{
if (this.item != null) this.item.link = this.text.toString().trim();
else if (this.imgStatus) this.rssFeed.imageLink = this.text.toString().trim();
else this.rssFeed.link = this.text.toString().trim();
}

else if (qName.equalsIgnoreCase("description"))
{
if (this.item != null) this.item.description = this.text.toString().trim();
else this.rssFeed.description = this.text.toString().trim();
}

else if (qName.equalsIgnoreCase("url") && this.imgStatus)
this.rssFeed.imageUrl = this.text.toString().trim();

else if (qName.equalsIgnoreCase("language"))
this.rssFeed.language = this.text.toString().trim();

else if (qName.equalsIgnoreCase("generator"))
this.rssFeed.generator = this.text.toString().trim();

else if (qName.equalsIgnoreCase("copyright"))
this.rssFeed.copyright = this.text.toString().trim();

else if (qName.equalsIgnoreCase("pubDate") && (this.item != null))
this.item.pubDate = this.text.toString().trim();

else if (qName.equalsIgnoreCase("category") && (this.item != null))
this.rssFeed.addItem(this.text.toString().trim(), this.item);

this.text.setLength(0);
}

public void characters(char[] ch, int start, int length)
{
this.text.append(ch, start, length);
}

public static void _setProxy()
throws IOException
{
Properties sysProperties = System.getProperties();
sysProperties.put("proxyHost", "<Proxy IP Address>");
sysProperties.put("proxyPort", "<Proxy Port Number>");
System.setProperties(sysProperties);
}

public static class RssFeed
{
public String title;
public String description;
public String link;
public String language;
public String generator;
public String copyright;
public String imageUrl;
public String imageTitle;
public String imageLink;

public ArrayList <Item> items;
public HashMap <String, ArrayList <Item>> category;

public void addItem(Item item)
{
if (this.items == null)
this.items = new ArrayList<Item>();
this.items.add(item);
}

public void addItem(String category, Item item)
{
if (this.category == null)
this.category = new HashMap<String, ArrayList<Item>>();
if (!this.category.containsKey(category))
this.category.put(category, new ArrayList<Item>());
this.category.get(category).add(item);
}
}



}

Item.java

package com.vaani.rss.parser;

public class Item
{
public String title;
public String description;
public String link;
public String pubDate;

public String toString()
{
return (this.title + ": " +
this.pubDate + "n" + this.description);
}
}

RssParserDemo.java - Ready with demo

package com.vaani.rss.main;

import java.util.ArrayList;

import com.vaani.rss.parser.RssParser;
import com.vaani.rss.parser.RssParser.RssFeed;
import com.vaani.rss.parser.Item;



public class RssParserDemo {

public static void main(String[] args){
RssParser rp = new RssParser("<some rss feed>");
rp.parse();
RssFeed feed = rp.getFeed();

// Listing all categories & the no. of elements in each category
if (feed.category != null)
{
System.out.println("Category List: ");
for (String category : feed.category.keySet())
{
System.out.println(category
+ ": "
+ ((ArrayList<Item>)feed.category.get(category)).size());
}
}

// Listing all items in the feed
for (int i = 0; i < feed.items.size(); i++)
System.out.println(feed.items.get(i).title);
}
}

reading data from the .Doc file by using Apache POI api

This program simply explains how to read data from the MS wordfile(.DOC) line by line using Apache POI,
what is Apache POI and what is the need i already explain in previous post, you can find that post here
for executing this program we need to download Apache POI api and make jar files  in classpath.

Example
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.poi.hwpf.HWPFDocument;
import org.apache.poi.hwpf.extractor.WordExtractor;

public class NewDocReader {
public static void main(String args[]) throws FileNotFoundException, IOException{

File docFile=new File(“c:\\multi\\multi.doc”); // file object was created
// file input stream with docFile
 
FileInputStream finStream=new FileInputStream(docFile.getAbsolutePath()); 
// throws IOException and need to import org.apache.poi.hwpf.HWPFDocument;
HWPFDocument doc=new HWPFDocument(finStream);
// import  org.apache.poi.hwpf.extractor.WordExtractor
WordExtractor wordExtract=new WordExtractor(doc);
String [] dataArray =wordExtract.getParagraphText();
// dataArray stores the each line from the document
for(int i=0;i<dataArray.length;i++)
{
System.out.println(“\n–”+dataArray[i]);
// printing lines from the array
}
finStream.close(); //closing fileinputstream
}
}