Java多线程 生产者消费者的N种姿势

学校里有作业,需要实现生产者消费者模型,最简单的就是用synchronized同步锁一包,什么事情都没有了,不过由于是从最外层包起来的,所以总体而言就Low很多了。

一共用了四种方法,心好累:

synchronized

synchronized同步锁同样踩到了坑,主要是关于extends Threadimplements Runnable的区别上,这是第一个坑。

package com.producer;

public class Main {

    public static void main(String[] args) {
        ThreadSynchronized semaphore = new ThreadSynchronized();

        new Thread(semaphore.new Producer("Pro1"), "Pro1").start();
        new Thread(semaphore.new Producer("Pro2"), "Pro2").start();
        new Thread(semaphore.new Producer("Pro3"), "Pro3").start();

        new Thread(semaphore.new Consumer("Con1"), "Con1").start();
        new Thread(semaphore.new Consumer("Con2"), "Con2").start();

        //System.out.println("你好");
    }
}

总之代码相当的不干净,这是Main部分,之后我们除了声明基本上都不需要修改了。

package com.producer;

import java.util.LinkedList;

/**
 * Created by SkyAo on 15/10/18.
 */
public class ThreadSynchronized {
    private static int pid;
    private static int cid;
    public LinkedList<Item> items = new LinkedList<>();
    private Item temp;
    private int item;
    private boolean flag = false;

    class Producer extends Thread {
        private String name;
        private int id;

        public Producer(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            this.id = ++pid;
            System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
            //while (true)
                this.produce();
        }

        private synchronized void produce() {
            while (true)
            try {

                Thread.sleep((int)(Math.random()*5000)+3000);
                while (items.size() == 5) {
                    super.wait();
                }
                items.add(new Item(this.id, Thread.currentThread().getName()));
                temp = items.getLast();
                System.out.println(temp.sourceName + " Produce: " + temp.semi);

                System.out.println("Left: ");

                for (Item item : items) {
                    System.out.println(item.sourceName + ":" + item.semi);
                }

                System.out.println("---Left: " + items.size() + "---");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                super.notifyAll();
            }

        }


    }

    class Consumer extends Thread {
        private String name;
        private int id;

        public Consumer(String name) {
            this.name = name;
            //this.id = ++cid;
        }

        @Override
        public void run() {
            this.id = ++cid;
            System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());

            this.consume();

        }

        private synchronized void consume() {
            while (true)
            try {
                Thread.sleep((int) (Math.random() * 5000) + 3000);
                while (items.size() == 0) {
                    this.wait();
                }
                temp = items.removeFirst();
                System.out.println(Thread.currentThread().getName() + " Consume: " + temp.semi);
                System.out.println("Left: ");

                for (Item item : items) {
                    System.out.println(item.sourceName + ":" + item.semi);
                }

                System.out.println("---Left: " + items.size() + "---");

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                super.notifyAll();
            }
        }
    }
}

调了半天,如果是implements Runnable会导致同一个线程重复工作。

wait()和notifyAll()就是拿来阻塞和通知的,notifyAll()和notify()的区别根据网上所说是随机通知和唤醒全部再去竞争,从现象而言其实是看不出效果的吧。

Semaphore

信号量的方法实现起来就跟书上的代码一样,非常爽,没有之一。

package com.producer;

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

/**
 * Created by SkyAo on 15/10/18.
 */
public class ThreadSemaphore {
    public static Semaphore mutex = new Semaphore(1);
    public static Semaphore notFull = new Semaphore(5);
    public static Semaphore notEmpty = new Semaphore(0);
    private static int pid;
    private static int cid;
    public LinkedList<Item> items = new LinkedList<>();
    private Item temp;

    class Producer extends Thread {
        private String name;
        private int id;

        public Producer(String name) {
            this.name = name;
            this.id = ++pid;
        }

         @Override
         public void run() {
             System.out.println("Start Thread(" + this.id + "): " + this.name);
             this.produce();
         }

         private void produce() {
             do {
                 try {
                     Thread.sleep(3000);
                     notFull.acquire();
                     mutex.acquire();
                     items.add(new Item(this.id, this.name));
                     temp =  items.getLast();
                     System.out.println(temp.sourceName + " Produce: " + temp.semi);

                     System.out.println("Left: ");

                     for (Item item : items) {
                         System.out.println(item.sourceName + ":" + item.semi);
                     }

                     System.out.println("---Left: " + items.size() + "---");

                 } catch (Exception e) {
                     e.printStackTrace();
                 } finally {
                     mutex.release();
                     notEmpty.release();
                 }


             } while (true);
         }
    }

    class Consumer extends Thread {
        private String name;
        private int id;

        public Consumer(String name) {
            this.name = name;
            this.id = ++cid;
        }

        @Override
        public void run() {
            System.out.println("Start Thread(" + this.id + "): " + this.name);
            this.consume();
        }

        private void consume() {
            do {
                try {
                    Thread.sleep(3000);
                    notEmpty.acquire();
                    mutex.acquire();
                    temp = items.removeFirst();
                    System.out.println(this.name + " Consume: " + temp.semi);
                    System.out.println("Left: ");

                    for (Item item : items) {
                        System.out.println(item.sourceName + ":" + item.semi);
                    }

                    System.out.println("---Left: " + items.size() + "---");

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notFull.release();
                }
            } while (true);
        }
    }
}

信号量就是通过初始化时设置令牌,然后去争抢令牌,用完记得释放。

如果是:

    public static Semaphore notEmpty = new Semaphore(0);

这个就得先等释放令牌了。

Condition ReentrantLock

Java更新之后有了一个更好的锁:ReentrantLock

跟synchronized差不多。

import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;
import java.io.*;
import java.net.*;

public class ThreadTest {
    private int items;

    private final ReentrantLock lock = new ReentrantLock();        // 设置锁
    private Condition notEmpty = lock.newCondition();            // 设置条件
    private Condition notFull = lock.newCondition();            // 设置条件

    /*
        get方式发送数据
    */
    static class URLAction {
        public static void url(String name, int delete, int num){
            try {
                URL url = new URL("http://localhost:3000/?name="+name+"&delete=" + delete + "&num=" + num);
                URLConnection conn = url.openConnection();

                BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /*
        生产者模型
    */
    class Producer extends Thread {

        String name;
        int num;

        public Producer(String name, int num) {
            this.name = name;
            this.num = num;
        }

        public void run() {
            System.out.println("Start Producer");

            produce();
        }

        private void produce() {
            do {
                try {
                    Thread.sleep((int)(Math.random()*5000)+3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock.lock();

                try {
                    if (items >= 5) {
                        try {
                            System.out.println("位置用完了");
                            notFull.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(name + "生产" + ++items);
                    

                    URLAction.url(name, 0, num);
                    notEmpty.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();

                }
            } while (true);
        }
    }

    class Consumer extends Thread {

        String name;
        int num;

        public Consumer(String name, int num) {
            this.name = name;
            this.num = num;
        }

        public void run() {
            System.out.println("Start Consumer");
            consume();
        }

        private void consume() {
            do {

                try{
                        try {
                            Thread.sleep((int)(Math.random()*5000)+2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        lock.lock();

                        if (items <= 0) {
                            try {
                                System.out.println("位置空,等待生产");
                                notEmpty.await();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }

                        notFull.signal();
                        System.out.println(name + "消费" + --items);
                        URLAction.url(name, 1, num);
                        
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();

                }
            } while (true);
        }
    }

    public static void main(String args[]) {
        ThreadTest test = new ThreadTest();
        System.out.println("Start Thread, items:" + test.items);
        new Thread(test.new Producer("Pro1", 0)).start();
        new Thread(test.new Producer("Pro2", 1)).start();
        new Thread(test.new Producer("Pro3", 2)).start();

        new Thread(test.new Consumer("Con1", 0)).start();
        new Thread(test.new Consumer("Con2", 1)).start();

    }
}

Mutex

Mutex也是个锁,跟信号量一样要上两把锁,然后要及时释放,不然就会卡了。

import java.io.*;
import java.net.*;
import java.util.*;
import com.sun.corba.se.impl.orbutil.concurrent.*;

public class ThreadTestMutex {
    
    private Mutex mutex = new Mutex();    // 同步信号量
    private Mutex notFull = new Mutex();    // 是否满的信号量
    private Mutex notEmpty = new Mutex();    // 是否为空的信号量
    private Mutex access = new Mutex();


    private int items = 0;
    private int lock = 1;

    class Producer extends Thread {
        private String name;
        private int pid;

        public Producer(String name, int pid) {
            this.name = name;
            this.pid = pid;
        }

        public void run() {
            System.out.println("Start Producer: ");
            this.produce();
        }

        private void produce() {
            do {
                try {
                    try {
                        Thread.sleep(400);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    notFull.acquire();
                    mutex.acquire();

                    if (items < 5) {
                        items++;

                        //System.out.println("位置用完了" + items);
                        notEmpty.release();
                    }

                    System.out.println(name + ":" + items);

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {                
                    mutex.release();
                    notFull.release();
                }

            } while(true);
        }
    }

    class Consumer extends Thread {
        private String name;
        private int pid;

        public Consumer(String name, int pid) {
            this.name = name;
            this.pid = pid;
        }

        public void run() {
            System.out.println("Start Consumer: ");
            this.consume();
        }

        private void consume() {
            do{
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    notEmpty.acquire();
                    mutex.acquire();

                    if (items > 0) {
                        items--;

                        //System.out.println("没有产品" + items);
                        notFull.release();
                    }

                    System.out.println(name + ":" + items);

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notEmpty.release();
                }
            } while(true);


        }
    }

    public static void main(String args[]) {
        ThreadTestMutex test = new ThreadTestMutex();

        System.out.println("Start");
        try {
            test.notEmpty.acquire();
        } catch (Exception e) {
            e.printStackTrace();
        }
        new Thread(test.new Producer("pro1", 0)).start();
        new Thread(test.new Producer("pro2", 1)).start();
        new Thread(test.new Producer("pro3", 2)).start();
        new Thread(test.new Producer("pro4", 3)).start();
        new Thread(test.new Producer("pro5", 4)).start();

        new Thread(test.new Consumer("con1", 0)).start();
        new Thread(test.new Consumer("con2", 1)).start();

    }
}

植入部分

如果您觉得文章不错,可以通过赞助支持我。

如果您不希望打赏,也可以通过关闭广告屏蔽插件的形式帮助网站运作。

标签: 成品, 源码, 代码段, 语法

添加新评论