Java多线程 生产者消费者的N种姿势
学校里有作业,需要实现生产者消费者模型,最简单的就是用synchronized同步锁一包,什么事情都没有了,不过由于是从最外层包起来的,所以总体而言就Low很多了。
一共用了四种方法,心好累:
synchronized
synchronized同步锁同样踩到了坑,主要是关于extends Thread
和implements Runnable
的区别上,这是第一个坑。
package com.producer;
public class Main {
public static void main(String[] args) {
ThreadSynchronized semaphore = new ThreadSynchronized();
new Thread(semaphore.new Producer("Pro1"), "Pro1").start();
new Thread(semaphore.new Producer("Pro2"), "Pro2").start();
new Thread(semaphore.new Producer("Pro3"), "Pro3").start();
new Thread(semaphore.new Consumer("Con1"), "Con1").start();
new Thread(semaphore.new Consumer("Con2"), "Con2").start();
//System.out.println("你好");
}
}
总之代码相当的不干净,这是Main部分,之后我们除了声明基本上都不需要修改了。
package com.producer;
import java.util.LinkedList;
/**
* Created by SkyAo on 15/10/18.
*/
public class ThreadSynchronized {
private static int pid;
private static int cid;
public LinkedList<Item> items = new LinkedList<>();
private Item temp;
private int item;
private boolean flag = false;
class Producer extends Thread {
private String name;
private int id;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
this.id = ++pid;
System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
//while (true)
this.produce();
}
private synchronized void produce() {
while (true)
try {
Thread.sleep((int)(Math.random()*5000)+3000);
while (items.size() == 5) {
super.wait();
}
items.add(new Item(this.id, Thread.currentThread().getName()));
temp = items.getLast();
System.out.println(temp.sourceName + " Produce: " + temp.semi);
System.out.println("Left: ");
for (Item item : items) {
System.out.println(item.sourceName + ":" + item.semi);
}
System.out.println("---Left: " + items.size() + "---");
} catch (Exception e) {
e.printStackTrace();
} finally {
super.notifyAll();
}
}
}
class Consumer extends Thread {
private String name;
private int id;
public Consumer(String name) {
this.name = name;
//this.id = ++cid;
}
@Override
public void run() {
this.id = ++cid;
System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
this.consume();
}
private synchronized void consume() {
while (true)
try {
Thread.sleep((int) (Math.random() * 5000) + 3000);
while (items.size() == 0) {
this.wait();
}
temp = items.removeFirst();
System.out.println(Thread.currentThread().getName() + " Consume: " + temp.semi);
System.out.println("Left: ");
for (Item item : items) {
System.out.println(item.sourceName + ":" + item.semi);
}
System.out.println("---Left: " + items.size() + "---");
} catch (Exception e) {
e.printStackTrace();
} finally {
super.notifyAll();
}
}
}
}
调了半天,如果是implements Runnable
会导致同一个线程重复工作。
wait()和notifyAll()就是拿来阻塞和通知的,notifyAll()和notify()的区别根据网上所说是随机通知和唤醒全部再去竞争,从现象而言其实是看不出效果的吧。
Semaphore
信号量的方法实现起来就跟书上的代码一样,非常爽,没有之一。
package com.producer;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
* Created by SkyAo on 15/10/18.
*/
public class ThreadSemaphore {
public static Semaphore mutex = new Semaphore(1);
public static Semaphore notFull = new Semaphore(5);
public static Semaphore notEmpty = new Semaphore(0);
private static int pid;
private static int cid;
public LinkedList<Item> items = new LinkedList<>();
private Item temp;
class Producer extends Thread {
private String name;
private int id;
public Producer(String name) {
this.name = name;
this.id = ++pid;
}
@Override
public void run() {
System.out.println("Start Thread(" + this.id + "): " + this.name);
this.produce();
}
private void produce() {
do {
try {
Thread.sleep(3000);
notFull.acquire();
mutex.acquire();
items.add(new Item(this.id, this.name));
temp = items.getLast();
System.out.println(temp.sourceName + " Produce: " + temp.semi);
System.out.println("Left: ");
for (Item item : items) {
System.out.println(item.sourceName + ":" + item.semi);
}
System.out.println("---Left: " + items.size() + "---");
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
} while (true);
}
}
class Consumer extends Thread {
private String name;
private int id;
public Consumer(String name) {
this.name = name;
this.id = ++cid;
}
@Override
public void run() {
System.out.println("Start Thread(" + this.id + "): " + this.name);
this.consume();
}
private void consume() {
do {
try {
Thread.sleep(3000);
notEmpty.acquire();
mutex.acquire();
temp = items.removeFirst();
System.out.println(this.name + " Consume: " + temp.semi);
System.out.println("Left: ");
for (Item item : items) {
System.out.println(item.sourceName + ":" + item.semi);
}
System.out.println("---Left: " + items.size() + "---");
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
} while (true);
}
}
}
信号量就是通过初始化时设置令牌,然后去争抢令牌,用完记得释放。
如果是:
public static Semaphore notEmpty = new Semaphore(0);
这个就得先等释放令牌了。
Condition ReentrantLock
Java更新之后有了一个更好的锁:ReentrantLock
跟synchronized差不多。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.io.*;
import java.net.*;
public class ThreadTest {
private int items;
private final ReentrantLock lock = new ReentrantLock(); // 设置锁
private Condition notEmpty = lock.newCondition(); // 设置条件
private Condition notFull = lock.newCondition(); // 设置条件
/*
get方式发送数据
*/
static class URLAction {
public static void url(String name, int delete, int num){
try {
URL url = new URL("http://localhost:3000/?name="+name+"&delete=" + delete + "&num=" + num);
URLConnection conn = url.openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
生产者模型
*/
class Producer extends Thread {
String name;
int num;
public Producer(String name, int num) {
this.name = name;
this.num = num;
}
public void run() {
System.out.println("Start Producer");
produce();
}
private void produce() {
do {
try {
Thread.sleep((int)(Math.random()*5000)+3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
if (items >= 5) {
try {
System.out.println("位置用完了");
notFull.await();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(name + "生产" + ++items);
URLAction.url(name, 0, num);
notEmpty.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} while (true);
}
}
class Consumer extends Thread {
String name;
int num;
public Consumer(String name, int num) {
this.name = name;
this.num = num;
}
public void run() {
System.out.println("Start Consumer");
consume();
}
private void consume() {
do {
try{
try {
Thread.sleep((int)(Math.random()*5000)+2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
if (items <= 0) {
try {
System.out.println("位置空,等待生产");
notEmpty.await();
} catch (Exception e) {
e.printStackTrace();
}
}
notFull.signal();
System.out.println(name + "消费" + --items);
URLAction.url(name, 1, num);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} while (true);
}
}
public static void main(String args[]) {
ThreadTest test = new ThreadTest();
System.out.println("Start Thread, items:" + test.items);
new Thread(test.new Producer("Pro1", 0)).start();
new Thread(test.new Producer("Pro2", 1)).start();
new Thread(test.new Producer("Pro3", 2)).start();
new Thread(test.new Consumer("Con1", 0)).start();
new Thread(test.new Consumer("Con2", 1)).start();
}
}
Mutex
Mutex也是个锁,跟信号量一样要上两把锁,然后要及时释放,不然就会卡了。
import java.io.*;
import java.net.*;
import java.util.*;
import com.sun.corba.se.impl.orbutil.concurrent.*;
public class ThreadTestMutex {
private Mutex mutex = new Mutex(); // 同步信号量
private Mutex notFull = new Mutex(); // 是否满的信号量
private Mutex notEmpty = new Mutex(); // 是否为空的信号量
private Mutex access = new Mutex();
private int items = 0;
private int lock = 1;
class Producer extends Thread {
private String name;
private int pid;
public Producer(String name, int pid) {
this.name = name;
this.pid = pid;
}
public void run() {
System.out.println("Start Producer: ");
this.produce();
}
private void produce() {
do {
try {
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
notFull.acquire();
mutex.acquire();
if (items < 5) {
items++;
//System.out.println("位置用完了" + items);
notEmpty.release();
}
System.out.println(name + ":" + items);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
} while(true);
}
}
class Consumer extends Thread {
private String name;
private int pid;
public Consumer(String name, int pid) {
this.name = name;
this.pid = pid;
}
public void run() {
System.out.println("Start Consumer: ");
this.consume();
}
private void consume() {
do{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
notEmpty.acquire();
mutex.acquire();
if (items > 0) {
items--;
//System.out.println("没有产品" + items);
notFull.release();
}
System.out.println(name + ":" + items);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
} while(true);
}
}
public static void main(String args[]) {
ThreadTestMutex test = new ThreadTestMutex();
System.out.println("Start");
try {
test.notEmpty.acquire();
} catch (Exception e) {
e.printStackTrace();
}
new Thread(test.new Producer("pro1", 0)).start();
new Thread(test.new Producer("pro2", 1)).start();
new Thread(test.new Producer("pro3", 2)).start();
new Thread(test.new Producer("pro4", 3)).start();
new Thread(test.new Producer("pro5", 4)).start();
new Thread(test.new Consumer("con1", 0)).start();
new Thread(test.new Consumer("con2", 1)).start();
}
}
植入部分
如果您觉得文章不错,可以通过赞助支持我。
如果您不希望打赏,也可以通过关闭广告屏蔽插件的形式帮助网站运作。