Skip to content

Latest commit

 

History

History
420 lines (334 loc) · 13.3 KB

File metadata and controls

420 lines (334 loc) · 13.3 KB

##概念##

生产者必须将数据安全地交给消费者。虽然只是这样的问题,但当生产者与消费者在不同线程上运行时,两者的处理速度差将是最大的问题。

当消费者要取数据时生产者还没建立出数据,或是生产者建立出数据时消费者的状态还没办法接受数据等等。

Producer-Consumer Pattern 是在生产者与消费者之间加入一个“桥梁参与者”。以这个桥梁参与者缓冲线程之间的处理速度差。

##范例##

  • Main.java

操作测试的类

public class Main {
    public static void main(String[] args) {
        Table table = new Table(3);     // 建立可以放置3个蛋糕的桌子
        new MakerThread("MakerThread-1", table, 31415).start();
        new MakerThread("MakerThread-2", table, 92653).start();
        new MakerThread("MakerThread-3", table, 58979).start();
        new EaterThread("EaterThread-1", table, 32384).start();
        new EaterThread("EaterThread-2", table, 62643).start();
        new EaterThread("EaterThread-3", table, 38327).start();
    }
}



  • MakerThread.java

厨师

import java.util.Random;

public class MakerThread extends Thread {
    private final Random random;
    private final Table table;
    private static int id = 0; // 蛋糕的流水号(所有厨师相同,同步的)
    public MakerThread(String name, Table table, long seed) {
        super(name);
        this.table = table;
        this.random = new Random(seed);
    }
    public void run() {
        try {
            while (true) {
            	//制作蛋糕-->放在桌上
                Thread.sleep(random.nextInt(1000));
                String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";
                table.put(cake);
            }
        } catch (InterruptedException e) {
        }
    }
    private static synchronized int nextId() {
        return id++;
    }
}

  • EaterThread.java

客人

import java.util.Random;

public class EaterThread extends Thread {
    private final Random random;
    private final Table table;
    public EaterThread(String name, Table table, long seed) {
        super(name);
        this.table = table;
        this.random = new Random(seed);
    }
    public void run() {
        try {
            while (true) {
            //从桌上拿蛋糕-->吃掉
                String cake = table.take();
                Thread.sleep(random.nextInt(1000));
            }
        } catch (InterruptedException e) {
        }
    }
}

  • Table.java

桌子

public class Table {
	//正式环境中推荐使用BlcokingQueue<String> buffer;
    private final String[] buffer;
    private int tail;  // 下一个put的地方
    private int head;  //  下一个take的地方
    private int count; // buffer内的蛋糕数
    public Table(int count) {
        this.buffer = new String[count];
        this.head = 0;
        this.tail = 0;
        this.count = 0;
    }
    // 放置蛋糕
    public synchronized void put(String cake) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " puts " + cake);
        //以桌上可以摆的数量上限,减掉现在已经摆的个数,剩下的就是可以摆放蛋糕的空间,得到的值必须大于0
        while (count >= buffer.length) {
        System.out.println(Thread.currentThread().getName() + " wait BEGIN");
            wait();
        System.out.println(Thread.currentThread().getName() + " wait END");
        }
        buffer[tail] = cake;
        //等价于下面三行,如果超过buffer的长度,就回到0
        //tail++;
        //if(tail >= buffer.length){
        //	tail =0;
        //}

        tail = (tail + 1) % buffer.length;
        count++;
        //因为桌子的状态变化了,所以要执行notifyAll,把wait中的线程全部唤醒
        notifyAll();
    }
    // 获取蛋糕
    public synchronized String take() throws InterruptedException {
    	//蛋糕至少有1个
        while (count <= 0) {
        System.out.println(Thread.currentThread().getName() + " wait BEGIN");
            wait();
            System.out.println(Thread.currentThread().getName() + " wait END");
        }
        String cake = buffer[head];
        head = (head + 1) % buffer.length;
        count--;
        notifyAll();
        System.out.println(Thread.currentThread().getName() + " takes " + cake);
        return cake;
    }
}

//清楚Table的clear方法
public synchronized void clear(){
	//这段while语句只是为了表示被取走的蛋糕,不写也没关系
	while(count>0){
		String cake = buffer[head];
		System.out.println(Thread.currentThread().getName() + " clears "+cake);
		head = (head + 1 ) % buffer.length;
		count--;
		
	}

	head = 0;
	tail =0 ;
	count = 0;
	notifyAll();
}

##Producer-Consumer Pattern 的所有参与者##

  • Data参与者-->String类蛋糕
  • Producer(生产者)参与者-->MakerThread
  • Consumer(消费者)参与者-->EaterThread
  • Channel(通道)参与者-->Table

##知识点##

  • 不可以直接传递吗?

这样一来,处理所花的时间,也得有Producer线程来负担,因此制作下一个数据的操作也必须延迟。这样会使程序的响应性变差。 直接调用该对方的方法,就好像厨师制作的蛋糕直接交给客人,等客人吃完才开始做下一个(或是比喻厨师在自己吃自己做的蛋糕)

  • Channel参与者负荷派生的问题

桌上可以摆放的蛋糕最多可以有3个。厨师最多可以在桌上摆放3个蛋糕,如果要摆放4个以上,就必须等待到客人拿走蛋糕才行。如果客人慢吞吞的吃,那厨师就要等待很久。 如果增加桌上可以摆放的蛋糕数量呢?这么一来,就算客人吃的慢一点,厨师也可以不用等待,可以不断做出新蛋糕摆到桌上。桌上的蛋糕数(buffer字段的元素数),会直接影响到缓冲MakerThread与EaterThread间处理速度差的程度。 当然,如果客人吃蛋糕的平均速度比厨师做出的蛋糕的平均速度低,那桌上留下的蛋糕会越来越多,一旦时间后还是会达到buffer字段的上限。

  • 线程共享互斥问题,观察的切入点放在“保护什么”上面,会比较容易找到问题的症结。
  • 线程的合作要想“放在中间的东西”
  • 线程的互斥要想“应该保护的东西”

仔细的思考不难发现,合作与互斥其实是表里一体的。线程为了协调合作,所以必须进行共享互斥,使共享的东西不会损坏。 而线程的共享互斥,也是为了让线程合作才进行的。 也就是说说,合作与互斥本来就具有深远的关系,因此上述口诀,可说是多线程程序中,解决问题的重要线索。

  • 只有一个Consumer参与者时会如何

Consumer参与者为唯一时,处理channel参与者里存放的Data参与者的线程只有一条。如果consumer参与者为复数时,就必须小心consumer参与者的线程之间互相不能发生干涉。但如果consumer参与者只有一条,就不需要烦恼consumer参与者的线程之间的干涉了。(当然,consumer参与者和producer参与者之间的干涉还是需要注意的) 具体来说,也就是说只有consumer参与者的线程会访问的返回,就不需要考虑共享互斥了。这样可以提升程序的性能。 Produer参与者为复数时,而consumer参与者为单数的状况,就是所谓多线程的处理简化为单一线程。 Swing(JFC)Framework中,事件处理的部分,就是使用这个方法(多Producer参与者和单一Consumer参与者)。进行Swing事件处理的线程,称为 event-dispatching thread(事件分配处理线程)。这个线程相当于从channel参与者里的事件队列取出来事件来处理的Consumer参与者。 event-dispatching thread的数量只有一个,这种设计方式,可以是时间处理的程序好写的多,而且处理的速度也很快。

##练习问题##

  1. 加上可以清楚蛋糕的方法
  • ClearThread.java

public class ClearThread extends Thread {
    private final Table table;
    public ClearThread(String name, Table table) {
        super(name);
        this.table = table;
    }
    public void run() {
        try {
            while (true) {
                Thread.sleep(1000);
                System.out.println("===== " + getName() + " clears =====");
                table.clear();
            }
        } catch (InterruptedException e) {
        }
    }
}

  • Main.java
public class Main {
    public static void main(String[] args) {
        Table table = new Table(3);     // 建立一个能放三个蛋糕的桌子
        new MakerThread("MakerThread-1", table, 31415).start();
        new MakerThread("MakerThread-2", table, 92653).start();
        new MakerThread("MakerThread-3", table, 58979).start();
        
        new EaterThread("EaterThread-1", table, 32384).start();
        new EaterThread("EaterThread-2", table, 62643).start();
        new EaterThread("EaterThread-3", table, 38327).start();
        
        new ClearThread("ClearThread-0", table).start();
    }
}

  1. 修改上面的Main方法,可以10后结束线程
  • Main.java
public class Main {
    public static void main(String[] args) {
        Table table = new Table(3);     // 建立一个只能放三个蛋糕的桌子
        Thread[] threads = {
            new MakerThread("MakerThread-1", table, 31415),
            new MakerThread("MakerThread-2", table, 92653),
            new MakerThread("MakerThread-3", table, 58979),
            new EaterThread("EaterThread-1", table, 32384),
            new EaterThread("EaterThread-2", table, 62643),
            new EaterThread("EaterThread-3", table, 38327),
        };

        // 启动线程
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        // 约休息10秒
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
        }

        System.out.println("***** interrupt *****");

        //interrupt
        for (int i = 0; i < threads.length; i++) {
            threads[i].interrupt();
        }
    }
}

  1. 使繁重的工作可以取消
  • Main.java
public class Main {
    public static void main(String[] args) {
        // 处理Host的大型处理的线程
        Thread executor = new Thread() {
            public void run() {
                System.out.println("Host.execute BEGIN");
                try {
                    Host.execute(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Host.execute END");
            }
        };

        // 启动
        executor.start();

        // 休息约15秒
        try {
            Thread.sleep(15000);
        } catch (InterruptedException e) {
        }

        // 执行取消
        System.out.println("***** interrupt *****");
        executor.interrupt();
    }
}

  • Host.java
public class Host {
    public static void execute(int count) throws InterruptedException {
        for (int i = 0; i < count; i++) {
        //依靠这里抛出异常,上层捕捉到异常后,程序停止
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            doHeavyJob();
        }
    }
    private static void doHeavyJob() {
        // 以下为
        // “不可取消的大型处理”的代用
        // (进行约10秒的循环)
        System.out.println("doHeavyJob BEGIN");
        long start = System.currentTimeMillis();
        while (start + 10000 > System.currentTimeMillis()) {
            // busy loop
        }
        System.out.println("doHeavyJob END");
    }
}

  1. notify替代notifyAll的情况会是怎么样?

notify方法只会从在waitset中等待的线程中调用一个。因此,无关的线程进入waitset时若正好碰上notify引发该线程的话,notify所要进行的“通知”就会不见了。

  • LazyThread.java
public class LazyThread extends Thread {
   private final Table table;
   public LazyThread(String name, Table table) {
       super(name);
       this.table = table;
   }
   public void run() {
       while (true) {
           try {
               synchronized (table) {
                   table.wait();
               }
               System.out.println(getName() + " is notified!");
           } catch (InterruptedException e) {
           }
       }
   }
}

  • Table.java

  • Main.java
public class Main {
    public static void main(String[] args) {
        Table table = new Table(3);     // 建议一个只能放3个蛋糕的桌子
        new MakerThread("MakerThread-1", table, 31415).start();
        new MakerThread("MakerThread-2", table, 92653).start();
        new MakerThread("MakerThread-3", table, 58979).start();
        new EaterThread("EaterThread-1", table, 32384).start();
        new EaterThread("EaterThread-2", table, 62643).start();
        new EaterThread("EaterThread-3", table, 38327).start();
        new LazyThread("LazyThread-1", table).start();
        new LazyThread("LazyThread-2", table).start();
        new LazyThread("LazyThread-3", table).start();
        new LazyThread("LazyThread-4", table).start();
        new LazyThread("LazyThread-5", table).start();
        new LazyThread("LazyThread-6", table).start();
        new LazyThread("LazyThread-7", table).start();
    }
}