Contents
  1. 1. 适用场景
  2. 2. 生产和消费线程协同的实现方式:
    1. 2.1. synchronized(wait/notifyAll) 实现
    2. 2.2. ReentrantLock+producerCondition+consumerCondition+await()+signal()
    3. 2.3. BlockingQueue的实现

适用场景

两个或者多个线程之间存在依赖关系的时候,需要实现线程之间的协同管理。比如:

  1. 多个线程引用公用变量的时候,需要引入synchronized,实现公有变量的线程安全问题。
  2. 生产类和消费类线程的协同。生产和消费模型里面,只有库存>0才可以消费,只有库存<capacity才可以生产,不仅要解决线程安全问题,还需要协调生产线程和消费线程,生产一个产品后,要通知消费线程可以消费了;消费线程消费完一个消费,要通知生产线程可以生产了。
  3. 主线程和多个子线程之间的协同。多线程协同做一件事情的时候,比如下载,一个主线程启动5个子线程开始并发下载,需要使用CountdownLatch,5个线程均下载完毕,就通知主线程下载完成,相应用户。
  4. 多个子线程之间的协同。海量运算间的相互依赖,CycleBaire的使用场景,多个独立的线程进行运算第一步骤,大家都完成第一步骤后,才同时开始第二步骤的运算。

生产和消费线程协同的实现方式:

  • synchronized+lock.wait+lock.notifyAll()
  • ReentrantLock+producerCondition+consumerCondition+await()+signal()
  • BlockingQueue(put 、 take)
  1. 因为都是对一个Depot仓库的库存进行生产和消费操作,所以不管用什么方法,必须用同一个锁对象,否则线程就不安全。
  2. notify 和notifyAll有很大区别,一个只随机的唤醒一个线程开始争夺锁,notifyAll唤醒所有的线程开始争夺锁
  3. while循环配合notifyAll可以实现synchronized的生产、消费类线程的精准控制
  4. 整体思路:生产者线程启动,无限循环的开始生产产品,
  • 第一步获取对同一个锁对象进行加锁
  • 第二步嵌套while循环进行判断是否满足生产条件,不满足就阻塞等待唤醒,满足就生成
  • 第三步生成完之后通知消费者线程开始消费
  • 第四步释放锁
    while(true){
    lock.lock();
    while(depot.isFull()){
    produceCondition.await();
    }
    depot.incr();
    consumerCondition.signalAll();
    lock.unlock();
    }

synchronized(wait/notifyAll) 实现

  1. 要解决协调不同类型的线程,比如多个生产线程和多个消费线程的时候,就需要引入Object的wait和notifyAll,生产一个产品,调用lock.notifyAll使所有await的线程都收到通知,这个时候外面必须包一层while,因为如果当时有生产线程在阻塞状态,那么也一定会被唤醒。
  2. 如果只有一个生产者和只有1个消费者的话,只需要使用notify即可。也不需要while,因为生产线程通知的只可能是消费线程。
    生产者线程
    public void run(){
    while(true){
    synchronized (lock) {
    if(depot.isFull()){
    System.out.println(Thread.currentThread().getName()+" is full, i am going to wait");
    try {
    while(depot.isFull()){
    lock.wait();
    }
    } catch (InterruptedException e) {}
    System.out.println(Thread.currentThread().getName()+" i am active and got the lock ");
    }
    System.out.println(Thread.currentThread().getName()+" i am produce, now size is going to be "+(depot.getCount()+1));
    depot.incr();
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {}
    lock.notifyAll();
    }
    }
    }
    消费者线程
    public void run(){
    while(true){
    synchronized (lock) {
    if(depot.isEmpty()){
    System.out.println(Thread.currentThread().getName()+" is empty, i am going to wait");
    while(depot.isEmpty()){
    try {
    lock.wait();
    } catch (InterruptedException e) {}
    }
    System.out.println(Thread.currentThread().getName()+" i am consumer,i am active and got the lock");
    }
    System.out.println(Thread.currentThread().getName()+" i am consumer, now size is going to be "+(depot.getCount()-1));
    depot.decr();
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {}
    lock.notifyAll();
    }
    }
    }
    测试类
    public static void main(String[] args) {
    Depot depot=new Depot(10);
    final Object lock=new Object();
    Producer p1=new Producer(depot,lock);
    Producer p2=new Producer(depot,lock);
    Producer p3=new Producer(depot,lock);
    Consumer c1=new Consumer(depot,lock);
    Consumer c2=new Consumer(depot,lock);
    p1.start();
    p2.start();
    p3.start();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {}
    c1.start();
    c2.start();
    }

ReentrantLock+producerCondition+consumerCondition+await()+signal()

Condition的作用是精准化的通知某一类线程,比如生产线程群生产了一个产品,需要通知消费类的线程。
但是Object的notify就不能实现这个功能。一旦生产的线程notify触发的还是生产的线程, 那么消费者线程集群中,就可能不会知道可以消费了。最糟糕的结果就是,生产者每次生产后notify的都是生成者,一直到仓库满了,消费者也不知道可以进行消费,从而进入一种生产者因为满了不生产,而消费者集群集体阻塞,不知道有库存

  • 在这个ReentrantLock中,因为生产者和消费者使用了不同的condition,所以既可以使用signal,也可以使用signalAll。
  • 如果使用signalAll的话,必须嵌套while循环判断,防止多个消费者线程都被唤醒,直接生产
  1. 仓库代码:库存、容量、自增、自减

    public class DepotLock {
    private int capacity=0;
    private int count=0;
    DepotLock(int capacity){
    this.capacity=capacity;
    }
    public boolean isFull(){
    if(this.count>=this.capacity){
    return true;
    }
    return false;
    }
    public boolean isEmpty(){
    if(this.count<=0){
    return true;
    }
    return false;
    }
    public void incr(){
    this.count++;
    }
    public void decr(){
    this.count--;
    }
    }
  2. 生产线程:所有生产和消费线程公用一把锁和两个condition对象

    public class Producer extends Thread{
    ReentrantLock lock;
    DepotLock depot;
    Condition produceCondition;
    Condition consumerCondition;
    Producer(DepotLock depot,ReentrantLock lock,Condition produceCondition,Condition consumerCondition){
    this.depot=depot;
    this.lock=lock;
    this.produceCondition=produceCondition;
    this.consumerCondition=consumerCondition;
    }
    @Override
    public void run(){
    while(true){
    lock.lock();
    try{
    //如果满了就阻塞,唤醒之后,继续生产
    while(depot.isFull()){
    System.out.println(Thread.currentThread().getName()+" is full, i am going to wait");
    produceCondition.await();
    System.out.println(Thread.currentThread().getName()+" producer, i am active and i got the lock");
    }
    System.out.println(Thread.currentThread().getName()+" i am produce, now size is going to be "+(depot.getCount()+1));
    depot.incr();
    Thread.currentThread().sleep(200);
    consumerCondition.signalAll();
    } catch (Exception e) {
    e.printStackTrace();
    } finally{
    lock.unlock();
    }
    }
    }
    }
  3. 消费线程

    public class Consumer extends Thread{
    ReentrantLock lock;
    DepotLock depot;
    Condition produceCondition;
    Condition consumerCondition;
    Consumer(DepotLock depot,ReentrantLock lock,Condition produceCondition,Condition consumerCondition){
    this.depot=depot;
    this.lock=lock;
    this.produceCondition=produceCondition;
    this.consumerCondition=consumerCondition;
    }
    @Override
    public void run(){
    while(true){
    lock.lock();
    try{
    while(depot.isEmpty()){
    System.out.println(Thread.currentThread().getName()+" is empty, i am going to wait");
    consumerCondition.await();
    System.out.println(Thread.currentThread().getName()+" consumer , i am active and i got the lock");
    }
    System.out.println(Thread.currentThread().getName()+" i am consumer, now size is going to be "+(depot.getCount()-1));
    depot.decr();
    Thread.currentThread().sleep(200);
    produceCondition.signalAll();
    }catch (InterruptedException e) {
    e.printStackTrace();
    }finally{
    lock.unlock();
    }
    }
    }
    }
  4. 测试

    public static void main(String[] args) {
    DepotLock depot=new DepotLock(10);
    final ReentrantLock lock=new ReentrantLock();
    Condition produceCondition=lock.newCondition();
    Condition consumerCondition=lock.newCondition();
    Producer p1=new Producer(depot,lock,produceCondition,consumerCondition);
    Producer p2=new Producer(depot,lock,produceCondition,consumerCondition);
    p1.start();
    p2.start();
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Consumer c1=new Consumer(depot, lock, produceCondition, consumerCondition);
    Consumer c2=new Consumer(depot, lock, produceCondition, consumerCondition);
    c1.start();
    c2.start();
    }

BlockingQueue的实现

  1. 生产者集群和消费者集群公用一个阻塞队列,这个BlockingQueue既扮演仓库的角色保存插入进来的元素,又扮演加锁和协调者的身份。
  2. 示例代码

    生产者线程
    public void run(){
    while(true){
    try {
    Thread.sleep(500);
    queue.put("aaa");
    System.out.println("now the count is: "+ queue.size());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    消费者线程
    public void run(){
    while(true){
    String result;
    try {
    Thread.sleep(500);
    result = (String) queue.take();
    System.out.println("i am takeing a :"+ result);
    System.out.println("now the count is: "+ queue.size());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    测试类
    public static void main(String[] args) {
    LinkedBlockingQueue queue=new LinkedBlockingQueue(10);
    Producer p1=new Producer(queue);
    Producer p2=new Producer(queue);
    Consumer c1=new Consumer(queue);
    Consumer c2=new Consumer(queue);
    p1.start();
    p2.start();
    c1.start();
    c2.start();
    }
  3. 实现原理:就是一个封装了的一个Lock,2个condition,套路一样:

  • 第一步加锁
  • 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
  • 第三步调用signal()消费线程
  • 第四步释放锁
    public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    while (count == items.length)
    notFull.await();
    insert(e);
    } finally {
    lock.unlock();
    }
    }
    private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
    }
Contents
  1. 1. 适用场景
  2. 2. 生产和消费线程协同的实现方式:
    1. 2.1. synchronized(wait/notifyAll) 实现
    2. 2.2. ReentrantLock+producerCondition+consumerCondition+await()+signal()
    3. 2.3. BlockingQueue的实现