Design a blocking queue with Condition and Monitor

另一道多线程面试的经典题目,设计blocking queue.
Blocking Queue: Blocking queue it means if the queue is empty, the dequeue thread should be blocked until some other thread enqueue anything. If the queue is full then the enqueue thread gets blocked until the dequeue thread dequeue anything from the queue.
根据Blocking Queue给出的概念,就是当queue为空时,阻止线程去call dequeue方法。当queue满的时候,阻止线程去call enqueue方法.
大概题目要求要实现push, pop, pushList以下几个方法:

public interface FixedSizeBlockingQueue<E> {

   // only initialize this queue once and throws Exception if the user is trying to initialize it multiple t times.
   public void init(int capacity) throws Exception;

   // throws Exception if the queue is not initialized
   public void push(E obj) throws Exception;

   // throws Exception if the queue is not initialized
   public E pop() throws Exception;

   // implement an atomic putList function which can put a list of object atomically. By atomically it mean the objs in the list should be next to each other in the queue. The size of the list could be larger than the queue capacity.
   // throws Exception if the queue is not initialized
   public void pushList(List<E> objs) throws Exception;
}

直接给出用Condition跟Lock的解法:

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBlockingQueue<E> {

 private int capacity;
 private Queue<E> queue;
 private Lock lock = new ReentrantLock();
 private Lock pushLock = new ReentrantLock();
 private Condition notFull = this.lock.newCondition();
 private Condition notEmpty = this.lock.newCondition();

 // only initialize this queue once and throws Exception if the user is
 // trying to initialize it multiple t times.
 public void init(int capacity) throws Exception {
     this.lock.lock();
     try{
         if(this.queue == null){
             this.queue = new LinkedList<>();
             this.capacity = capacity;
         } else {
             throw new Exception();
         }
     }finally{
         this.lock.unlock();
     }
 }

 // throws Exception if the queue is not initialized
 public void push(E obj) throws Exception {
     this.pushLock.lock();
      this.lock.lock();
     try{
         while(this.capacity == this.queue.size())
             this.notFull.wait();
         this.queue.add(obj);
         this.notEmpty.notifyAll();
     }finally{
         this.lock.unlock();
         this.pushLock.lock();
     }
 }

 // throws Exception if the queue is not initialized
 public E pop() throws Exception {
     this.lock.lock();
     try{
         while(this.capacity==0)
             this.notEmpty.wait();
         E result = this.queue.poll();
         notFull.notifyAll();
         return result;
     }finally{
         this.lock.unlock();
     }
 }

 // implement a atomic putList function which can put a list of object
 // atomically. By atomically i mean the objs in the list should next to each
 // other in the queue. The size of the list could be larger than the queue
 // capacity.
 // throws Exception if the queue is not initialized
 public void pushList(List<E> objs) throws Exception {
     this.pushLock.lock();
     this.lock.lock();
     try{
         for(E obj : objs){
             while(this.queue.size() == this.capacity)
                 this.notFull.wait();
             this.queue.add(obj);
             this.notEmpty.notifyAll();
         }
     }finally{
         this.lock.unlock();
         this.pushLock.unlock();
     }
 }
}

Reference:
http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html

实现Cyclic Buffer with Condition and Monitor

贴一下实现Cyclic Buffer的代码,面试中多线程容易考到的题目.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class BoundedBuffer {
    private final String[] buffer;
    private final int capacity;

    private int front;
    private int rear;
    private int count;

    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int capacity) {
        super();
        this.capacity = capacity;
        buffer = new String[capacity];
    }

    public void deposit(String data) throws InterruptedException {
        lock.lock();

        try {
            while (count == capacity) {
                notFull.await();
            }
            buffer[rear] = data;
            rear = (rear + 1) % capacity; // rounded
            count++;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public String fetch() throws InterruptedException {
        lock.lock();
        try {
            while(count == 0) {
                notEmpty.await();
            }
            String result = buffer[front];
            front = (count + 1) % capacity;
            count--;
            notFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }
}

Reference:
ava-concurrency-part-5-monitors-locks-and-conditions