Java多线程 生产者消费者的N种姿势
学校里有作业,需要实现生产者消费者模型,最简单的就是用synchronized同步锁一包,什么事情都没有了,不过由于是从最外层包起来的,所以总体而言就Low很多了。
一共用了四种方法,心好累:
synchronized
synchronized同步锁同样踩到了坑,主要是关于extends Thread
和implements Runnable
的区别上,这是第一个坑。
1package com.producer;
2
3public class Main {
4
5 public static void main(String[] args) {
6 ThreadSynchronized semaphore = new ThreadSynchronized();
7
8 new Thread(semaphore.new Producer("Pro1"), "Pro1").start();
9 new Thread(semaphore.new Producer("Pro2"), "Pro2").start();
10 new Thread(semaphore.new Producer("Pro3"), "Pro3").start();
11
12 new Thread(semaphore.new Consumer("Con1"), "Con1").start();
13 new Thread(semaphore.new Consumer("Con2"), "Con2").start();
14
15 //System.out.println("你好");
16 }
17}
18
总之代码相当的不干净,这是Main部分,之后我们除了声明基本上都不需要修改了。
1package com.producer;
2
3import java.util.LinkedList;
4
5/**
6 * Created by SkyAo on 15/10/18.
7 */
8public class ThreadSynchronized {
9 private static int pid;
10 private static int cid;
11 public LinkedList<Item> items = new LinkedList<>();
12 private Item temp;
13 private int item;
14 private boolean flag = false;
15
16 class Producer extends Thread {
17 private String name;
18 private int id;
19
20 public Producer(String name) {
21 this.name = name;
22 }
23
24 @Override
25 public void run() {
26 this.id = ++pid;
27 System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
28 //while (true)
29 this.produce();
30 }
31
32 private synchronized void produce() {
33 while (true)
34 try {
35
36 Thread.sleep((int)(Math.random()*5000)+3000);
37 while (items.size() == 5) {
38 super.wait();
39 }
40 items.add(new Item(this.id, Thread.currentThread().getName()));
41 temp = items.getLast();
42 System.out.println(temp.sourceName + " Produce: " + temp.semi);
43
44 System.out.println("Left: ");
45
46 for (Item item : items) {
47 System.out.println(item.sourceName + ":" + item.semi);
48 }
49
50 System.out.println("---Left: " + items.size() + "---");
51 } catch (Exception e) {
52 e.printStackTrace();
53 } finally {
54 super.notifyAll();
55 }
56
57 }
58
59
60 }
61
62 class Consumer extends Thread {
63 private String name;
64 private int id;
65
66 public Consumer(String name) {
67 this.name = name;
68 //this.id = ++cid;
69 }
70
71 @Override
72 public void run() {
73 this.id = ++cid;
74 System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
75
76 this.consume();
77
78 }
79
80 private synchronized void consume() {
81 while (true)
82 try {
83 Thread.sleep((int) (Math.random() * 5000) + 3000);
84 while (items.size() == 0) {
85 this.wait();
86 }
87 temp = items.removeFirst();
88 System.out.println(Thread.currentThread().getName() + " Consume: " + temp.semi);
89 System.out.println("Left: ");
90
91 for (Item item : items) {
92 System.out.println(item.sourceName + ":" + item.semi);
93 }
94
95 System.out.println("---Left: " + items.size() + "---");
96
97 } catch (Exception e) {
98 e.printStackTrace();
99 } finally {
100 super.notifyAll();
101 }
102 }
103 }
104}
105
调了半天,如果是implements Runnable
会导致同一个线程重复工作。
wait()和notifyAll()就是拿来阻塞和通知的,notifyAll()和notify()的区别根据网上所说是随机通知和唤醒全部再去竞争,从现象而言其实是看不出效果的吧。
Semaphore
信号量的方法实现起来就跟书上的代码一样,非常爽,没有之一。
1package com.producer;
2
3import java.util.LinkedList;
4import java.util.concurrent.Semaphore;
5
6/**
7 * Created by SkyAo on 15/10/18.
8 */
9public class ThreadSemaphore {
10 public static Semaphore mutex = new Semaphore(1);
11 public static Semaphore notFull = new Semaphore(5);
12 public static Semaphore notEmpty = new Semaphore(0);
13 private static int pid;
14 private static int cid;
15 public LinkedList<Item> items = new LinkedList<>();
16 private Item temp;
17
18 class Producer extends Thread {
19 private String name;
20 private int id;
21
22 public Producer(String name) {
23 this.name = name;
24 this.id = ++pid;
25 }
26
27 @Override
28 public void run() {
29 System.out.println("Start Thread(" + this.id + "): " + this.name);
30 this.produce();
31 }
32
33 private void produce() {
34 do {
35 try {
36 Thread.sleep(3000);
37 notFull.acquire();
38 mutex.acquire();
39 items.add(new Item(this.id, this.name));
40 temp = items.getLast();
41 System.out.println(temp.sourceName + " Produce: " + temp.semi);
42
43 System.out.println("Left: ");
44
45 for (Item item : items) {
46 System.out.println(item.sourceName + ":" + item.semi);
47 }
48
49 System.out.println("---Left: " + items.size() + "---");
50
51 } catch (Exception e) {
52 e.printStackTrace();
53 } finally {
54 mutex.release();
55 notEmpty.release();
56 }
57
58
59 } while (true);
60 }
61 }
62
63 class Consumer extends Thread {
64 private String name;
65 private int id;
66
67 public Consumer(String name) {
68 this.name = name;
69 this.id = ++cid;
70 }
71
72 @Override
73 public void run() {
74 System.out.println("Start Thread(" + this.id + "): " + this.name);
75 this.consume();
76 }
77
78 private void consume() {
79 do {
80 try {
81 Thread.sleep(3000);
82 notEmpty.acquire();
83 mutex.acquire();
84 temp = items.removeFirst();
85 System.out.println(this.name + " Consume: " + temp.semi);
86 System.out.println("Left: ");
87
88 for (Item item : items) {
89 System.out.println(item.sourceName + ":" + item.semi);
90 }
91
92 System.out.println("---Left: " + items.size() + "---");
93
94 } catch (Exception e) {
95 e.printStackTrace();
96 } finally {
97 mutex.release();
98 notFull.release();
99 }
100 } while (true);
101 }
102 }
103}
104
信号量就是通过初始化时设置令牌,然后去争抢令牌,用完记得释放。
如果是:
1 public static Semaphore notEmpty = new Semaphore(0);
2
这个就得先等释放令牌了。
Condition ReentrantLock
Java更新之后有了一个更好的锁:ReentrantLock
跟synchronized差不多。
1import java.util.concurrent.locks.Condition;
2import java.util.concurrent.locks.ReentrantLock;
3import java.io.*;
4import java.net.*;
5
6public class ThreadTest {
7 private int items;
8
9 private final ReentrantLock lock = new ReentrantLock(); // 设置锁
10 private Condition notEmpty = lock.newCondition(); // 设置条件
11 private Condition notFull = lock.newCondition(); // 设置条件
12
13 /*
14 get方式发送数据
15 */
16 static class URLAction {
17 public static void url(String name, int delete, int num){
18 try {
19 URL url = new URL("http://localhost:3000/?name="+name+"&delete=" + delete + "&num=" + num);
20 URLConnection conn = url.openConnection();
21
22 BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
23 } catch (IOException e) {
24 e.printStackTrace();
25 }
26 }
27 }
28
29 /*
30 生产者模型
31 */
32 class Producer extends Thread {
33
34 String name;
35 int num;
36
37 public Producer(String name, int num) {
38 this.name = name;
39 this.num = num;
40 }
41
42 public void run() {
43 System.out.println("Start Producer");
44
45 produce();
46 }
47
48 private void produce() {
49 do {
50 try {
51 Thread.sleep((int)(Math.random()*5000)+3000);
52 } catch (InterruptedException e) {
53 e.printStackTrace();
54 }
55
56 lock.lock();
57
58 try {
59 if (items >= 5) {
60 try {
61 System.out.println("位置用完了");
62 notFull.await();
63 } catch (Exception e) {
64 e.printStackTrace();
65 }
66 }
67
68 System.out.println(name + "生产" + ++items);
69
70
71 URLAction.url(name, 0, num);
72 notEmpty.signal();
73 } catch (Exception e) {
74 e.printStackTrace();
75 } finally {
76 lock.unlock();
77
78 }
79 } while (true);
80 }
81 }
82
83 class Consumer extends Thread {
84
85 String name;
86 int num;
87
88 public Consumer(String name, int num) {
89 this.name = name;
90 this.num = num;
91 }
92
93 public void run() {
94 System.out.println("Start Consumer");
95 consume();
96 }
97
98 private void consume() {
99 do {
100
101 try{
102 try {
103 Thread.sleep((int)(Math.random()*5000)+2000);
104 } catch (InterruptedException e) {
105 e.printStackTrace();
106 }
107 lock.lock();
108
109 if (items <= 0) {
110 try {
111 System.out.println("位置空,等待生产");
112 notEmpty.await();
113 } catch (Exception e) {
114 e.printStackTrace();
115 }
116 }
117
118 notFull.signal();
119 System.out.println(name + "消费" + --items);
120 URLAction.url(name, 1, num);
121
122 } catch (Exception e) {
123 e.printStackTrace();
124 } finally {
125 lock.unlock();
126
127 }
128 } while (true);
129 }
130 }
131
132 public static void main(String args[]) {
133 ThreadTest test = new ThreadTest();
134 System.out.println("Start Thread, items:" + test.items);
135 new Thread(test.new Producer("Pro1", 0)).start();
136 new Thread(test.new Producer("Pro2", 1)).start();
137 new Thread(test.new Producer("Pro3", 2)).start();
138
139 new Thread(test.new Consumer("Con1", 0)).start();
140 new Thread(test.new Consumer("Con2", 1)).start();
141
142 }
143}
144
Mutex
Mutex也是个锁,跟信号量一样要上两把锁,然后要及时释放,不然就会卡了。
1import java.io.*;
2import java.net.*;
3import java.util.*;
4import com.sun.corba.se.impl.orbutil.concurrent.*;
5
6public class ThreadTestMutex {
7
8 private Mutex mutex = new Mutex(); // 同步信号量
9 private Mutex notFull = new Mutex(); // 是否满的信号量
10 private Mutex notEmpty = new Mutex(); // 是否为空的信号量
11 private Mutex access = new Mutex();
12
13
14 private int items = 0;
15 private int lock = 1;
16
17 class Producer extends Thread {
18 private String name;
19 private int pid;
20
21 public Producer(String name, int pid) {
22 this.name = name;
23 this.pid = pid;
24 }
25
26 public void run() {
27 System.out.println("Start Producer: ");
28 this.produce();
29 }
30
31 private void produce() {
32 do {
33 try {
34 try {
35 Thread.sleep(400);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39
40 notFull.acquire();
41 mutex.acquire();
42
43 if (items < 5) {
44 items++;
45
46 //System.out.println("位置用完了" + items);
47 notEmpty.release();
48 }
49
50 System.out.println(name + ":" + items);
51
52 } catch (Exception e) {
53 e.printStackTrace();
54 } finally {
55 mutex.release();
56 notFull.release();
57 }
58
59 } while(true);
60 }
61 }
62
63 class Consumer extends Thread {
64 private String name;
65 private int pid;
66
67 public Consumer(String name, int pid) {
68 this.name = name;
69 this.pid = pid;
70 }
71
72 public void run() {
73 System.out.println("Start Consumer: ");
74 this.consume();
75 }
76
77 private void consume() {
78 do{
79 try {
80 Thread.sleep(500);
81 } catch (InterruptedException e) {
82 e.printStackTrace();
83 }
84 try {
85 notEmpty.acquire();
86 mutex.acquire();
87
88 if (items > 0) {
89 items--;
90
91 //System.out.println("没有产品" + items);
92 notFull.release();
93 }
94
95 System.out.println(name + ":" + items);
96
97 } catch (Exception e) {
98 e.printStackTrace();
99 } finally {
100 mutex.release();
101 notEmpty.release();
102 }
103 } while(true);
104
105
106 }
107 }
108
109 public static void main(String args[]) {
110 ThreadTestMutex test = new ThreadTestMutex();
111
112 System.out.println("Start");
113 try {
114 test.notEmpty.acquire();
115 } catch (Exception e) {
116 e.printStackTrace();
117 }
118 new Thread(test.new Producer("pro1", 0)).start();
119 new Thread(test.new Producer("pro2", 1)).start();
120 new Thread(test.new Producer("pro3", 2)).start();
121 new Thread(test.new Producer("pro4", 3)).start();
122 new Thread(test.new Producer("pro5", 4)).start();
123
124 new Thread(test.new Consumer("con1", 0)).start();
125 new Thread(test.new Consumer("con2", 1)).start();
126
127 }
128}
129
评论 (0)