线程协同
Contents
适用场景
两个或者多个线程之间存在依赖关系的时候,需要实现线程之间的协同管理。比如:
- 多个线程引用公用变量的时候,需要引入synchronized,实现公有变量的线程安全问题。
- 生产类和消费类线程的协同。生产和消费模型里面,只有库存>0才可以消费,只有库存<capacity才可以生产,不仅要解决线程安全问题,还需要协调生产线程和消费线程,生产一个产品后,要通知消费线程可以消费了;消费线程消费完一个消费,要通知生产线程可以生产了。
- 主线程和多个子线程之间的协同。多线程协同做一件事情的时候,比如下载,一个主线程启动5个子线程开始并发下载,需要使用CountdownLatch,5个线程均下载完毕,就通知主线程下载完成,相应用户。
- 多个子线程之间的协同。海量运算间的相互依赖,CycleBaire的使用场景,多个独立的线程进行运算第一步骤,大家都完成第一步骤后,才同时开始第二步骤的运算。
生产和消费线程协同的实现方式:
- synchronized+lock.wait+lock.notifyAll()
- ReentrantLock+producerCondition+consumerCondition+await()+signal()
- BlockingQueue(put 、 take)
- 因为都是对一个Depot仓库的库存进行生产和消费操作,所以不管用什么方法,必须用同一个锁对象,否则线程就不安全。
- notify 和notifyAll有很大区别,一个只随机的唤醒一个线程开始争夺锁,notifyAll唤醒所有的线程开始争夺锁
- while循环配合notifyAll可以实现synchronized的生产、消费类线程的精准控制
- 整体思路:生产者线程启动,无限循环的开始生产产品,
- 第一步获取对同一个锁对象进行加锁
- 第二步嵌套while循环进行判断是否满足生产条件,不满足就阻塞等待唤醒,满足就生成
- 第三步生成完之后通知消费者线程开始消费
- 第四步释放锁while(true){lock.lock();while(depot.isFull()){produceCondition.await();}depot.incr();consumerCondition.signalAll();lock.unlock();}
synchronized(wait/notifyAll) 实现
- 要解决协调不同类型的线程,比如多个生产线程和多个消费线程的时候,就需要引入Object的wait和notifyAll,生产一个产品,调用lock.notifyAll使所有await的线程都收到通知,这个时候外面必须包一层while,因为如果当时有生产线程在阻塞状态,那么也一定会被唤醒。
- 如果只有一个生产者和只有1个消费者的话,只需要使用notify即可。也不需要while,因为生产线程通知的只可能是消费线程。生产者线程public void run(){while(true){synchronized (lock) {if(depot.isFull()){System.out.println(Thread.currentThread().getName()+" is full, i am going to wait");try {while(depot.isFull()){lock.wait();}} catch (InterruptedException e) {}System.out.println(Thread.currentThread().getName()+" i am active and got the lock ");}System.out.println(Thread.currentThread().getName()+" i am produce, now size is going to be "+(depot.getCount()+1));depot.incr();try {Thread.sleep(100);} catch (InterruptedException e) {}lock.notifyAll();}}}消费者线程public void run(){while(true){synchronized (lock) {if(depot.isEmpty()){System.out.println(Thread.currentThread().getName()+" is empty, i am going to wait");while(depot.isEmpty()){try {lock.wait();} catch (InterruptedException e) {}}System.out.println(Thread.currentThread().getName()+" i am consumer,i am active and got the lock");}System.out.println(Thread.currentThread().getName()+" i am consumer, now size is going to be "+(depot.getCount()-1));depot.decr();try {Thread.sleep(100);} catch (InterruptedException e) {}lock.notifyAll();}}}测试类public static void main(String[] args) {Depot depot=new Depot(10);final Object lock=new Object();Producer p1=new Producer(depot,lock);Producer p2=new Producer(depot,lock);Producer p3=new Producer(depot,lock);Consumer c1=new Consumer(depot,lock);Consumer c2=new Consumer(depot,lock);p1.start();p2.start();p3.start();try {Thread.sleep(2000);} catch (InterruptedException e) {}c1.start();c2.start();}
ReentrantLock+producerCondition+consumerCondition+await()+signal()
Condition的作用是精准化的通知某一类线程,比如生产线程群生产了一个产品,需要通知消费类的线程。
但是Object的notify就不能实现这个功能。一旦生产的线程notify触发的还是生产的线程, 那么消费者线程集群中,就可能不会知道可以消费了。最糟糕的结果就是,生产者每次生产后notify的都是生成者,一直到仓库满了,消费者也不知道可以进行消费,从而进入一种生产者因为满了不生产,而消费者集群集体阻塞,不知道有库存
- 在这个ReentrantLock中,因为生产者和消费者使用了不同的condition,所以既可以使用signal,也可以使用signalAll。
- 如果使用signalAll的话,必须嵌套while循环判断,防止多个消费者线程都被唤醒,直接生产
仓库代码:库存、容量、自增、自减
public class DepotLock {private int capacity=0;private int count=0;DepotLock(int capacity){this.capacity=capacity;}public boolean isFull(){if(this.count>=this.capacity){return true;}return false;}public boolean isEmpty(){if(this.count<=0){return true;}return false;}public void incr(){this.count++;}public void decr(){this.count--;}}生产线程:所有生产和消费线程公用一把锁和两个condition对象
public class Producer extends Thread{ReentrantLock lock;DepotLock depot;Condition produceCondition;Condition consumerCondition;Producer(DepotLock depot,ReentrantLock lock,Condition produceCondition,Condition consumerCondition){this.depot=depot;this.lock=lock;this.produceCondition=produceCondition;this.consumerCondition=consumerCondition;}@Overridepublic void run(){while(true){lock.lock();try{//如果满了就阻塞,唤醒之后,继续生产while(depot.isFull()){System.out.println(Thread.currentThread().getName()+" is full, i am going to wait");produceCondition.await();System.out.println(Thread.currentThread().getName()+" producer, i am active and i got the lock");}System.out.println(Thread.currentThread().getName()+" i am produce, now size is going to be "+(depot.getCount()+1));depot.incr();Thread.currentThread().sleep(200);consumerCondition.signalAll();} catch (Exception e) {e.printStackTrace();} finally{lock.unlock();}}}}消费线程
public class Consumer extends Thread{ReentrantLock lock;DepotLock depot;Condition produceCondition;Condition consumerCondition;Consumer(DepotLock depot,ReentrantLock lock,Condition produceCondition,Condition consumerCondition){this.depot=depot;this.lock=lock;this.produceCondition=produceCondition;this.consumerCondition=consumerCondition;}@Overridepublic void run(){while(true){lock.lock();try{while(depot.isEmpty()){System.out.println(Thread.currentThread().getName()+" is empty, i am going to wait");consumerCondition.await();System.out.println(Thread.currentThread().getName()+" consumer , i am active and i got the lock");}System.out.println(Thread.currentThread().getName()+" i am consumer, now size is going to be "+(depot.getCount()-1));depot.decr();Thread.currentThread().sleep(200);produceCondition.signalAll();}catch (InterruptedException e) {e.printStackTrace();}finally{lock.unlock();}}}}测试
public static void main(String[] args) {DepotLock depot=new DepotLock(10);final ReentrantLock lock=new ReentrantLock();Condition produceCondition=lock.newCondition();Condition consumerCondition=lock.newCondition();Producer p1=new Producer(depot,lock,produceCondition,consumerCondition);Producer p2=new Producer(depot,lock,produceCondition,consumerCondition);p1.start();p2.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}Consumer c1=new Consumer(depot, lock, produceCondition, consumerCondition);Consumer c2=new Consumer(depot, lock, produceCondition, consumerCondition);c1.start();c2.start();}
BlockingQueue的实现
- 生产者集群和消费者集群公用一个阻塞队列,这个BlockingQueue既扮演仓库的角色保存插入进来的元素,又扮演加锁和协调者的身份。
示例代码
生产者线程public void run(){while(true){try {Thread.sleep(500);queue.put("aaa");System.out.println("now the count is: "+ queue.size());} catch (InterruptedException e) {e.printStackTrace();}}}消费者线程public void run(){while(true){String result;try {Thread.sleep(500);result = (String) queue.take();System.out.println("i am takeing a :"+ result);System.out.println("now the count is: "+ queue.size());} catch (InterruptedException e) {e.printStackTrace();}}}测试类public static void main(String[] args) {LinkedBlockingQueue queue=new LinkedBlockingQueue(10);Producer p1=new Producer(queue);Producer p2=new Producer(queue);Consumer c1=new Consumer(queue);Consumer c2=new Consumer(queue);p1.start();p2.start();c1.start();c2.start();}实现原理:就是一个封装了的一个Lock,2个condition,套路一样:
- 第一步加锁
- 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
- 第三步调用signal()消费线程
- 第四步释放锁public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();insert(e);} finally {lock.unlock();}}private void insert(E x) {items[putIndex] = x;putIndex = inc(putIndex);++count;notEmpty.signal();}