How BlockingQueue works – Its internal implementation

BlockingQueue should be used when you have to share resources between multiple threads where one of the threads acts as a producer and other threads act as consumers. This is a producer-consumer example in which we have one producer thread producing some random strings and 2 consumer threads trying to consume those random strings produced by the producer.

public class BlockingQueueExample {
 
 
	public static void main(String[] args) {
		BlockingQueue<String> bq = new ArrayBlockingQueue<>(2);
 
 
		Thread producer = new Thread(new Runnable() {
 
			@Override
			public void run() {
 
				try {
                                        bq.put("one");
					Thread.sleep(2000);
                                        bq.put("two");
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
 
 
			}
		});
 
		Thread consumer1 = new Thread(new Runnable() {
 
			@Override
			public void run() {
				try {
					System.out.println(bq.take());
					Thread.sleep(2000);
					System.out.println(bq.take());
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
 
			}
		});
 
		Thread consumer2 = new Thread(new Runnable() {
 
			@Override
			public void run() {
				try {
					System.out.println(bq.take());
					Thread.sleep(2000);
					System.out.println(bq.take());
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
 
			}
		});
 
		producer.start();
		consumer1.start();
		consumer2.start();
 
	}
}

Lets assume first thread 2 executes its run method. It calls take method of blockingqueue

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

Here ‘count‘ is global variable which is equal to number of elements currently in the queue. Whenever queue is empty take() method needs to wait till someone fills the queue. So this thread will get hung at line notEmpty.await(). notEmpty is a condition object got by lock.newCondition(). Similarly we have notFull condition object also

Similarly lets assume consumer2 also reaches this point, so it will also wait. Inside notEmpty.await there is a call to addConditionWaiter() method

   public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
  private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

This method basically maintains the list(in the form of linkedlist) of threads which are waiting for queue to get non empty.

So currently 2 threads are waiting, consumer1 and consumer2

Now lets assume Producer gets active

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

offer() method also gets into waiting mode if queue is full. Anyways currently it wont wait as queue is not full. Flow will go into enqueue method

 private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

Here it adds item to an global array and in the end calls notEmpty.signal() to signal that queue is no more empty. First thread in the queue which was waiting for this queue to be non-empty will get executed now.

Uday Ogra

Connect with me at http://facebook.com/tendulkarogra and lets have some healthy discussion :)

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *