CodeSky 代码之空

随手记录自己的学习过程

Java多线程 生产者消费者的N种姿势

2015-10-18 16:40分类: Java评论: 0

学校里有作业,需要实现生产者消费者模型,最简单的就是用synchronized同步锁一包,什么事情都没有了,不过由于是从最外层包起来的,所以总体而言就Low很多了。

一共用了四种方法,心好累:

synchronized

synchronized同步锁同样踩到了坑,主要是关于extends Threadimplements Runnable的区别上,这是第一个坑。

1package com.producer;
2
3public class Main {
4
5    public static void main(String[] args) {
6        ThreadSynchronized semaphore = new ThreadSynchronized();
7
8        new Thread(semaphore.new Producer("Pro1"), "Pro1").start();
9        new Thread(semaphore.new Producer("Pro2"), "Pro2").start();
10        new Thread(semaphore.new Producer("Pro3"), "Pro3").start();
11
12        new Thread(semaphore.new Consumer("Con1"), "Con1").start();
13        new Thread(semaphore.new Consumer("Con2"), "Con2").start();
14
15        //System.out.println("你好");
16    }
17}
18

总之代码相当的不干净,这是Main部分,之后我们除了声明基本上都不需要修改了。

1package com.producer;
2
3import java.util.LinkedList;
4
5/**
6 * Created by SkyAo on 15/10/18.
7 */
8public class ThreadSynchronized {
9    private static int pid;
10    private static int cid;
11    public LinkedList<Item> items = new LinkedList<>();
12    private Item temp;
13    private int item;
14    private boolean flag = false;
15
16    class Producer extends Thread {
17        private String name;
18        private int id;
19
20        public Producer(String name) {
21            this.name = name;
22        }
23
24        @Override
25        public void run() {
26            this.id = ++pid;
27            System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
28            //while (true)
29                this.produce();
30        }
31
32        private synchronized void produce() {
33            while (true)
34            try {
35
36                Thread.sleep((int)(Math.random()*5000)+3000);
37                while (items.size() == 5) {
38                    super.wait();
39                }
40                items.add(new Item(this.id, Thread.currentThread().getName()));
41                temp = items.getLast();
42                System.out.println(temp.sourceName + " Produce: " + temp.semi);
43
44                System.out.println("Left: ");
45
46                for (Item item : items) {
47                    System.out.println(item.sourceName + ":" + item.semi);
48                }
49
50                System.out.println("---Left: " + items.size() + "---");
51            } catch (Exception e) {
52                e.printStackTrace();
53            } finally {
54                super.notifyAll();
55            }
56
57        }
58
59
60    }
61
62    class Consumer extends Thread {
63        private String name;
64        private int id;
65
66        public Consumer(String name) {
67            this.name = name;
68            //this.id = ++cid;
69        }
70
71        @Override
72        public void run() {
73            this.id = ++cid;
74            System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
75
76            this.consume();
77
78        }
79
80        private synchronized void consume() {
81            while (true)
82            try {
83                Thread.sleep((int) (Math.random() * 5000) + 3000);
84                while (items.size() == 0) {
85                    this.wait();
86                }
87                temp = items.removeFirst();
88                System.out.println(Thread.currentThread().getName() + " Consume: " + temp.semi);
89                System.out.println("Left: ");
90
91                for (Item item : items) {
92                    System.out.println(item.sourceName + ":" + item.semi);
93                }
94
95                System.out.println("---Left: " + items.size() + "---");
96
97            } catch (Exception e) {
98                e.printStackTrace();
99            } finally {
100                super.notifyAll();
101            }
102        }
103    }
104}
105

调了半天,如果是implements Runnable会导致同一个线程重复工作。

wait()和notifyAll()就是拿来阻塞和通知的,notifyAll()和notify()的区别根据网上所说是随机通知和唤醒全部再去竞争,从现象而言其实是看不出效果的吧。

Semaphore

信号量的方法实现起来就跟书上的代码一样,非常爽,没有之一。

1package com.producer;
2
3import java.util.LinkedList;
4import java.util.concurrent.Semaphore;
5
6/**
7 * Created by SkyAo on 15/10/18.
8 */
9public class ThreadSemaphore {
10    public static Semaphore mutex = new Semaphore(1);
11    public static Semaphore notFull = new Semaphore(5);
12    public static Semaphore notEmpty = new Semaphore(0);
13    private static int pid;
14    private static int cid;
15    public LinkedList<Item> items = new LinkedList<>();
16    private Item temp;
17
18    class Producer extends Thread {
19        private String name;
20        private int id;
21
22        public Producer(String name) {
23            this.name = name;
24            this.id = ++pid;
25        }
26
27         @Override
28         public void run() {
29             System.out.println("Start Thread(" + this.id + "): " + this.name);
30             this.produce();
31         }
32
33         private void produce() {
34             do {
35                 try {
36                     Thread.sleep(3000);
37                     notFull.acquire();
38                     mutex.acquire();
39                     items.add(new Item(this.id, this.name));
40                     temp =  items.getLast();
41                     System.out.println(temp.sourceName + " Produce: " + temp.semi);
42
43                     System.out.println("Left: ");
44
45                     for (Item item : items) {
46                         System.out.println(item.sourceName + ":" + item.semi);
47                     }
48
49                     System.out.println("---Left: " + items.size() + "---");
50
51                 } catch (Exception e) {
52                     e.printStackTrace();
53                 } finally {
54                     mutex.release();
55                     notEmpty.release();
56                 }
57
58
59             } while (true);
60         }
61    }
62
63    class Consumer extends Thread {
64        private String name;
65        private int id;
66
67        public Consumer(String name) {
68            this.name = name;
69            this.id = ++cid;
70        }
71
72        @Override
73        public void run() {
74            System.out.println("Start Thread(" + this.id + "): " + this.name);
75            this.consume();
76        }
77
78        private void consume() {
79            do {
80                try {
81                    Thread.sleep(3000);
82                    notEmpty.acquire();
83                    mutex.acquire();
84                    temp = items.removeFirst();
85                    System.out.println(this.name + " Consume: " + temp.semi);
86                    System.out.println("Left: ");
87
88                    for (Item item : items) {
89                        System.out.println(item.sourceName + ":" + item.semi);
90                    }
91
92                    System.out.println("---Left: " + items.size() + "---");
93
94                } catch (Exception e) {
95                    e.printStackTrace();
96                } finally {
97                    mutex.release();
98                    notFull.release();
99                }
100            } while (true);
101        }
102    }
103}
104

信号量就是通过初始化时设置令牌,然后去争抢令牌,用完记得释放。

如果是:

1    public static Semaphore notEmpty = new Semaphore(0);
2

这个就得先等释放令牌了。

Condition ReentrantLock

Java更新之后有了一个更好的锁:ReentrantLock

跟synchronized差不多。

1import java.util.concurrent.locks.Condition;  
2import java.util.concurrent.locks.ReentrantLock;
3import java.io.*;
4import java.net.*;
5
6public class ThreadTest {
7	private int items;
8
9	private final ReentrantLock lock = new ReentrantLock();		// 设置锁
10	private Condition notEmpty = lock.newCondition();			// 设置条件
11	private Condition notFull = lock.newCondition();			// 设置条件
12
13	/*
14		get方式发送数据
15	*/
16	static class URLAction {
17		public static void url(String name, int delete, int num){
18			try {
19				URL url = new URL("http://localhost:3000/?name="+name+"&delete=" + delete + "&num=" + num);
20				URLConnection conn = url.openConnection();
21
22				BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
23			} catch (IOException e) {
24				e.printStackTrace();
25			}
26		}
27	}
28
29	/*
30		生产者模型
31	*/
32	class Producer extends Thread {
33
34		String name;
35		int num;
36
37		public Producer(String name, int num) {
38			this.name = name;
39			this.num = num;
40		}
41
42		public void run() {
43			System.out.println("Start Producer");
44
45			produce();
46		}
47
48		private void produce() {
49			do {
50				try {
51					Thread.sleep((int)(Math.random()*5000)+3000);
52				} catch (InterruptedException e) {
53					e.printStackTrace();
54				}
55
56				lock.lock();
57
58				try {
59					if (items >= 5) {
60						try {
61							System.out.println("位置用完了");
62							notFull.await();
63						} catch (Exception e) {
64							e.printStackTrace();
65						}
66					}
67
68					System.out.println(name + "生产" + ++items);
69					
70
71					URLAction.url(name, 0, num);
72					notEmpty.signal();
73				} catch (Exception e) {
74					e.printStackTrace();
75				} finally {
76					lock.unlock();
77
78				}
79			} while (true);
80		}
81	}
82
83	class Consumer extends Thread {
84
85		String name;
86		int num;
87
88		public Consumer(String name, int num) {
89			this.name = name;
90			this.num = num;
91		}
92
93		public void run() {
94			System.out.println("Start Consumer");
95			consume();
96		}
97
98		private void consume() {
99			do {
100
101				try{
102						try {
103							Thread.sleep((int)(Math.random()*5000)+2000);
104						} catch (InterruptedException e) {
105							e.printStackTrace();
106						}
107						lock.lock();
108
109						if (items <= 0) {
110							try {
111								System.out.println("位置空,等待生产");
112								notEmpty.await();
113							} catch (Exception e) {
114								e.printStackTrace();
115							}
116						}
117
118						notFull.signal();
119						System.out.println(name + "消费" + --items);
120						URLAction.url(name, 1, num);
121						
122				} catch (Exception e) {
123					e.printStackTrace();
124				} finally {
125					lock.unlock();
126
127				}
128			} while (true);
129		}
130	}
131
132	public static void main(String args[]) {
133		ThreadTest test = new ThreadTest();
134		System.out.println("Start Thread, items:" + test.items);
135		new Thread(test.new Producer("Pro1", 0)).start();
136		new Thread(test.new Producer("Pro2", 1)).start();
137		new Thread(test.new Producer("Pro3", 2)).start();
138
139		new Thread(test.new Consumer("Con1", 0)).start();
140		new Thread(test.new Consumer("Con2", 1)).start();
141
142	}
143}
144

Mutex

Mutex也是个锁,跟信号量一样要上两把锁,然后要及时释放,不然就会卡了。

1import java.io.*;
2import java.net.*;
3import java.util.*;
4import com.sun.corba.se.impl.orbutil.concurrent.*;
5
6public class ThreadTestMutex {
7	
8	private Mutex mutex = new Mutex();	// 同步信号量
9	private Mutex notFull = new Mutex();	// 是否满的信号量
10	private Mutex notEmpty = new Mutex();	// 是否为空的信号量
11	private Mutex access = new Mutex();
12
13
14	private int items = 0;
15	private int lock = 1;
16
17	class Producer extends Thread {
18		private String name;
19		private int pid;
20
21		public Producer(String name, int pid) {
22			this.name = name;
23			this.pid = pid;
24		}
25
26		public void run() {
27			System.out.println("Start Producer: ");
28			this.produce();
29		}
30
31		private void produce() {
32			do {
33				try {
34					try {
35						Thread.sleep(400);
36					} catch (InterruptedException e) {
37						e.printStackTrace();
38					}
39
40					notFull.acquire();
41					mutex.acquire();
42
43					if (items < 5) {
44						items++;
45
46						//System.out.println("位置用完了" + items);
47						notEmpty.release();
48					}
49
50					System.out.println(name + ":" + items);
51
52				} catch (Exception e) {
53					e.printStackTrace();
54				} finally {				
55					mutex.release();
56					notFull.release();
57				}
58
59			} while(true);
60		}
61	}
62
63	class Consumer extends Thread {
64		private String name;
65		private int pid;
66
67		public Consumer(String name, int pid) {
68			this.name = name;
69			this.pid = pid;
70		}
71
72		public void run() {
73			System.out.println("Start Consumer: ");
74			this.consume();
75		}
76
77		private void consume() {
78			do{
79				try {
80					Thread.sleep(500);
81				} catch (InterruptedException e) {
82					e.printStackTrace();
83				}
84				try {
85					notEmpty.acquire();
86					mutex.acquire();
87
88					if (items > 0) {
89						items--;
90
91						//System.out.println("没有产品" + items);
92						notFull.release();
93					}
94
95					System.out.println(name + ":" + items);
96
97				} catch (Exception e) {
98					e.printStackTrace();
99				} finally {
100					mutex.release();
101					notEmpty.release();
102				}
103			} while(true);
104
105
106		}
107	}
108
109	public static void main(String args[]) {
110		ThreadTestMutex test = new ThreadTestMutex();
111
112		System.out.println("Start");
113		try {
114			test.notEmpty.acquire();
115		} catch (Exception e) {
116			e.printStackTrace();
117		}
118		new Thread(test.new Producer("pro1", 0)).start();
119		new Thread(test.new Producer("pro2", 1)).start();
120		new Thread(test.new Producer("pro3", 2)).start();
121		new Thread(test.new Producer("pro4", 3)).start();
122		new Thread(test.new Producer("pro5", 4)).start();
123
124		new Thread(test.new Consumer("con1", 0)).start();
125		new Thread(test.new Consumer("con2", 1)).start();
126
127	}
128}
129

评论 (0)