##概念##
生产者必须将数据安全地交给消费者。虽然只是这样的问题,但当生产者与消费者在不同线程上运行时,两者的处理速度差将是最大的问题。
当消费者要取数据时生产者还没建立出数据,或是生产者建立出数据时消费者的状态还没办法接受数据等等。
Producer-Consumer Pattern 是在生产者与消费者之间加入一个“桥梁参与者”。以这个桥梁参与者缓冲线程之间的处理速度差。
##范例##
- Main.java
操作测试的类
public class Main {
public static void main(String[] args) {
Table table = new Table(3); // 建立可以放置3个蛋糕的桌子
new MakerThread("MakerThread-1", table, 31415).start();
new MakerThread("MakerThread-2", table, 92653).start();
new MakerThread("MakerThread-3", table, 58979).start();
new EaterThread("EaterThread-1", table, 32384).start();
new EaterThread("EaterThread-2", table, 62643).start();
new EaterThread("EaterThread-3", table, 38327).start();
}
}
- MakerThread.java
厨师
import java.util.Random;
public class MakerThread extends Thread {
private final Random random;
private final Table table;
private static int id = 0; // 蛋糕的流水号(所有厨师相同,同步的)
public MakerThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
//制作蛋糕-->放在桌上
Thread.sleep(random.nextInt(1000));
String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";
table.put(cake);
}
} catch (InterruptedException e) {
}
}
private static synchronized int nextId() {
return id++;
}
}
- EaterThread.java
客人
import java.util.Random;
public class EaterThread extends Thread {
private final Random random;
private final Table table;
public EaterThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
public void run() {
try {
while (true) {
//从桌上拿蛋糕-->吃掉
String cake = table.take();
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
}
}
}
- Table.java
桌子
public class Table {
//正式环境中推荐使用BlcokingQueue<String> buffer;
private final String[] buffer;
private int tail; // 下一个put的地方
private int head; // 下一个take的地方
private int count; // buffer内的蛋糕数
public Table(int count) {
this.buffer = new String[count];
this.head = 0;
this.tail = 0;
this.count = 0;
}
// 放置蛋糕
public synchronized void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
//以桌上可以摆的数量上限,减掉现在已经摆的个数,剩下的就是可以摆放蛋糕的空间,得到的值必须大于0
while (count >= buffer.length) {
System.out.println(Thread.currentThread().getName() + " wait BEGIN");
wait();
System.out.println(Thread.currentThread().getName() + " wait END");
}
buffer[tail] = cake;
//等价于下面三行,如果超过buffer的长度,就回到0
//tail++;
//if(tail >= buffer.length){
// tail =0;
//}
tail = (tail + 1) % buffer.length;
count++;
//因为桌子的状态变化了,所以要执行notifyAll,把wait中的线程全部唤醒
notifyAll();
}
// 获取蛋糕
public synchronized String take() throws InterruptedException {
//蛋糕至少有1个
while (count <= 0) {
System.out.println(Thread.currentThread().getName() + " wait BEGIN");
wait();
System.out.println(Thread.currentThread().getName() + " wait END");
}
String cake = buffer[head];
head = (head + 1) % buffer.length;
count--;
notifyAll();
System.out.println(Thread.currentThread().getName() + " takes " + cake);
return cake;
}
}
//清楚Table的clear方法
public synchronized void clear(){
//这段while语句只是为了表示被取走的蛋糕,不写也没关系
while(count>0){
String cake = buffer[head];
System.out.println(Thread.currentThread().getName() + " clears "+cake);
head = (head + 1 ) % buffer.length;
count--;
}
head = 0;
tail =0 ;
count = 0;
notifyAll();
}
##Producer-Consumer Pattern 的所有参与者##
- Data参与者-->String类蛋糕
- Producer(生产者)参与者-->MakerThread
- Consumer(消费者)参与者-->EaterThread
- Channel(通道)参与者-->Table
##知识点##
- 不可以直接传递吗?
这样一来,处理所花的时间,也得有Producer线程来负担,因此制作下一个数据的操作也必须延迟。这样会使程序的响应性变差。 直接调用该对方的方法,就好像厨师制作的蛋糕直接交给客人,等客人吃完才开始做下一个(或是比喻厨师在自己吃自己做的蛋糕)
- Channel参与者负荷派生的问题
桌上可以摆放的蛋糕最多可以有3个。厨师最多可以在桌上摆放3个蛋糕,如果要摆放4个以上,就必须等待到客人拿走蛋糕才行。如果客人慢吞吞的吃,那厨师就要等待很久。 如果增加桌上可以摆放的蛋糕数量呢?这么一来,就算客人吃的慢一点,厨师也可以不用等待,可以不断做出新蛋糕摆到桌上。桌上的蛋糕数(buffer字段的元素数),会直接影响到缓冲MakerThread与EaterThread间处理速度差的程度。 当然,如果客人吃蛋糕的平均速度比厨师做出的蛋糕的平均速度低,那桌上留下的蛋糕会越来越多,一旦时间后还是会达到buffer字段的上限。
- 线程共享互斥问题,观察的切入点放在“保护什么”上面,会比较容易找到问题的症结。
- 线程的合作要想“放在中间的东西”
- 线程的互斥要想“应该保护的东西”
仔细的思考不难发现,合作与互斥其实是表里一体的。线程为了协调合作,所以必须进行共享互斥,使共享的东西不会损坏。 而线程的共享互斥,也是为了让线程合作才进行的。 也就是说说,合作与互斥本来就具有深远的关系,因此上述口诀,可说是多线程程序中,解决问题的重要线索。
- 只有一个Consumer参与者时会如何
Consumer参与者为唯一时,处理channel参与者里存放的Data参与者的线程只有一条。如果consumer参与者为复数时,就必须小心consumer参与者的线程之间互相不能发生干涉。但如果consumer参与者只有一条,就不需要烦恼consumer参与者的线程之间的干涉了。(当然,consumer参与者和producer参与者之间的干涉还是需要注意的) 具体来说,也就是说只有consumer参与者的线程会访问的返回,就不需要考虑共享互斥了。这样可以提升程序的性能。 Produer参与者为复数时,而consumer参与者为单数的状况,就是所谓多线程的处理简化为单一线程。 Swing(JFC)Framework中,事件处理的部分,就是使用这个方法(多Producer参与者和单一Consumer参与者)。进行Swing事件处理的线程,称为 event-dispatching thread(事件分配处理线程)。这个线程相当于从channel参与者里的事件队列取出来事件来处理的Consumer参与者。 event-dispatching thread的数量只有一个,这种设计方式,可以是时间处理的程序好写的多,而且处理的速度也很快。
##练习问题##
- 加上可以清楚蛋糕的方法
- ClearThread.java
public class ClearThread extends Thread {
private final Table table;
public ClearThread(String name, Table table) {
super(name);
this.table = table;
}
public void run() {
try {
while (true) {
Thread.sleep(1000);
System.out.println("===== " + getName() + " clears =====");
table.clear();
}
} catch (InterruptedException e) {
}
}
}
- Main.java
public class Main {
public static void main(String[] args) {
Table table = new Table(3); // 建立一个能放三个蛋糕的桌子
new MakerThread("MakerThread-1", table, 31415).start();
new MakerThread("MakerThread-2", table, 92653).start();
new MakerThread("MakerThread-3", table, 58979).start();
new EaterThread("EaterThread-1", table, 32384).start();
new EaterThread("EaterThread-2", table, 62643).start();
new EaterThread("EaterThread-3", table, 38327).start();
new ClearThread("ClearThread-0", table).start();
}
}
- 修改上面的Main方法,可以10后结束线程
- Main.java
public class Main {
public static void main(String[] args) {
Table table = new Table(3); // 建立一个只能放三个蛋糕的桌子
Thread[] threads = {
new MakerThread("MakerThread-1", table, 31415),
new MakerThread("MakerThread-2", table, 92653),
new MakerThread("MakerThread-3", table, 58979),
new EaterThread("EaterThread-1", table, 32384),
new EaterThread("EaterThread-2", table, 62643),
new EaterThread("EaterThread-3", table, 38327),
};
// 启动线程
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
// 约休息10秒
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
System.out.println("***** interrupt *****");
//interrupt
for (int i = 0; i < threads.length; i++) {
threads[i].interrupt();
}
}
}
- 使繁重的工作可以取消
- Main.java
public class Main {
public static void main(String[] args) {
// 处理Host的大型处理的线程
Thread executor = new Thread() {
public void run() {
System.out.println("Host.execute BEGIN");
try {
Host.execute(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Host.execute END");
}
};
// 启动
executor.start();
// 休息约15秒
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
}
// 执行取消
System.out.println("***** interrupt *****");
executor.interrupt();
}
}
- Host.java
public class Host {
public static void execute(int count) throws InterruptedException {
for (int i = 0; i < count; i++) {
//依靠这里抛出异常,上层捕捉到异常后,程序停止
if (Thread.interrupted()) {
throw new InterruptedException();
}
doHeavyJob();
}
}
private static void doHeavyJob() {
// 以下为
// “不可取消的大型处理”的代用
// (进行约10秒的循环)
System.out.println("doHeavyJob BEGIN");
long start = System.currentTimeMillis();
while (start + 10000 > System.currentTimeMillis()) {
// busy loop
}
System.out.println("doHeavyJob END");
}
}
- notify替代notifyAll的情况会是怎么样?
notify方法只会从在waitset中等待的线程中调用一个。因此,无关的线程进入waitset时若正好碰上notify引发该线程的话,notify所要进行的“通知”就会不见了。
- LazyThread.java
public class LazyThread extends Thread {
private final Table table;
public LazyThread(String name, Table table) {
super(name);
this.table = table;
}
public void run() {
while (true) {
try {
synchronized (table) {
table.wait();
}
System.out.println(getName() + " is notified!");
} catch (InterruptedException e) {
}
}
}
}
- Table.java
- Main.java
public class Main {
public static void main(String[] args) {
Table table = new Table(3); // 建议一个只能放3个蛋糕的桌子
new MakerThread("MakerThread-1", table, 31415).start();
new MakerThread("MakerThread-2", table, 92653).start();
new MakerThread("MakerThread-3", table, 58979).start();
new EaterThread("EaterThread-1", table, 32384).start();
new EaterThread("EaterThread-2", table, 62643).start();
new EaterThread("EaterThread-3", table, 38327).start();
new LazyThread("LazyThread-1", table).start();
new LazyThread("LazyThread-2", table).start();
new LazyThread("LazyThread-3", table).start();
new LazyThread("LazyThread-4", table).start();
new LazyThread("LazyThread-5", table).start();
new LazyThread("LazyThread-6", table).start();
new LazyThread("LazyThread-7", table).start();
}
}