1 import java.util.concurrent.locks.*;
2
3 /**
4
5 */
6 public class BoundedQueue<E>
7 {
8 /**
9
10 @param capacity
11 */
12 public BoundedQueue(int capacity)
13 {
14 elements = new Object[capacity];
15 head = 0;
16 tail = 0;
17 size = 0;
18 }
19
20 /**
21
22 @return
23 */
24 public E remove() throws InterruptedException
25 {
26 queueLock.lock();
27 try
28 {
29 while (size == 0)
30 valueAvailableCondition.await();
31 @SuppressWarnings("unchecked") E r = (E) elements[head];
32 head++;
33 size--;
34 if (head == elements.length)
35 head = 0;
36 spaceAvailableCondition.signalAll();
37 return r;
38 }
39 finally
40 {
41 queueLock.unlock();
42 }
43 }
44
45 /**
46
47 @param newValue
48 */
49 public void add(E newValue) throws InterruptedException
50 {
51 queueLock.lock();
52 try
53 {
54 while (size == elements.length)
55 spaceAvailableCondition.await();
56 elements[tail] = newValue;
57 tail++;
58 size++;
59 if (tail == elements.length)
60 tail = 0;
61 valueAvailableCondition.signalAll();
62 }
63 finally
64 {
65 queueLock.unlock();
66 }
67 }
68
69 private Object[] elements;
70 private int head;
71 private int tail;
72 private int size;
73
74 private Lock queueLock = new ReentrantLock();
75 private Condition spaceAvailableCondition
76 = queueLock.newCondition();
77 private Condition valueAvailableCondition
78 = queueLock.newCondition();
79 }