阻塞队列的低版本实现

在复习阻塞队列时,使用原生的 wait、notify 自己实现的阻塞队列竟然出现超最大长度的问题,有问题的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class SimulateBlockingQueue {

// 底层队列
private LinkedList list = new LinkedList();

// 队列数据的上下限
private final int minSize = 0;
private final int maxSize;

public SimulateBlockingQueue(int size) {
this.maxSize = size - 1;
}

private Object lock = new Object();

/**
* 存数据
* @param obj
*/
public void put(Object obj) {
synchronized (lock) {
while (list.size() == this.maxSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(obj);
lock.notify();
}
}

/**
* 取数据
* @return
*/
public Object get() {
Object result = null;
synchronized (lock) {
while (list.size() == this.minSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
result = list.removeFirst();
lock.notify();
}
return result;
}

public int getSize() {
return list.size();
}

public static void main(String[] args) throws InterruptedException {
final SimulateBlockingQueue mq = new SimulateBlockingQueue(5);

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
mq.put(i);
System.out.println("++增加:" + i);
}
}
}, "t1");
t1.start();

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("移除:" + mq.get());
}
}
}, "t2");
t2.start();

TimeUnit.SECONDS.sleep(2);
}

}

最后发现不是 list.size() 的问题,是打印的问题,代码改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class SimulateBlockingQueue {

// 底层队列
private LinkedList list = new LinkedList();
// 原子性
private AtomicInteger count = new AtomicInteger(0);

// 队列数据的上下限
private final int minSize = 0;
private final int maxSize;


public SimulateBlockingQueue(int size) {
this.maxSize = size;
}

private Object lock = new Object();

/**
* 存数据
* @param obj
*/
public void put(Object obj) {
synchronized (lock) {
while (list.size() == this.maxSize) { // 对比可以使用count
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("++++增加:" + obj);
list.add(obj);
count.incrementAndGet();
lock.notify();
}
}

/**
* 取数据
* @return
*/
public Object get() {
Object result;
synchronized (lock) {
while (list.size() == this.minSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
result = list.removeFirst();
System.out.println("--移除:" + result);
count.decrementAndGet();
lock.notify();
}
return result;
}

public int getSize() {
return list.size();
}

public static void main(String[] args) throws InterruptedException {
final SimulateBlockingQueue mq = new SimulateBlockingQueue(5);

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
mq.put(i);
}
}
}, "t1");
t1.start();

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
mq.get();
}
}
}, "t2");
t2.start();

TimeUnit.SECONDS.sleep(2);
}

}

思路来源:https://www.jianshu.com/p/99b7ef411988