并发编程-工具类
参考资料
BlockQueue详细介绍
https://blog.csdn.net/qq_38872310/article/details/80832703
BlockQueue接口
解决的问题:在一个容量有限的仓库里面,实现满了就挂起生产线程,空了就挂起消费线程的兼顾性能和安全的数据结构 – 阻塞队列
定义了插入抽象方法:
add()、 put()、 offer()、offer(time) 分别是:失败抛出异常、失败线程阻塞直到成功、快速返回无论是否成功、定时尝试插入
remove()、take()、pull()、pull(timeout)
ArrayBlockQueue
疑问:
- 插入和删除是否可以并行进行?
- size函数加锁吗?
基于定长数组的有界队列,初始化的时候必须制定队列的大小,1个锁,2个condition对象
final Object[] items;int takeIndex;int putIndex;int count;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;严格的先进先出队列,基于takeIndex、putIndex、length、count等参数实现FIFO
实现原理:就是一个封装了的一个Lock,2个condition,套路一样:
- 第一步加锁
- 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产
- 第三步调用signal()消费线程
- 第四步释放锁public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;- 第一步加锁lock.lockInterruptibly();try {- 第二步while循环判断是否满足生产条件,不满足就阻塞,满足就生产while (count == items.length)notFull.await();insert(e);} finally {- 第四步释放锁lock.unlock();}}private void insert(E x) {items[putIndex] = x;putIndex = inc(putIndex);++count;- 第三步调用signal()消费线程notEmpty.signal();}
LinkedBlockQueue
- LinkedBlockQueue内部是基于单向链表的队列,可以设置capacity来实现有界队列,也可以不设置,默认是无界队列。无界队列的时候,put、add、offer就是一样的啊,因为没有限制,所以插入肯定成功.
- 内部的实现机制是2把锁,各有一个condition,通过AtomicInteger来指定库存,CAS操作来自增或者自减库存,从而实现线程安全
- 生产线程和消费线程可以并行的进行操作,提升性能
生产线程和消费线程的交互和常规的方式不一样,生产线程只在当前库存为0的时候,才会触发 消费线程的signal(),进行了性能优化,因为当库存不为0的时候,不会有消费线程阻塞。
static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}private transient Node<E> head;private transient Node<E> last;private final int capacity;private final AtomicInteger count = new AtomicInteger(0);private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();private void enqueue(Node<E> node) {last = last.next = node;}put的过程
- 对putLock加锁
- while循环判断是否库存满了,满了就阻塞,未满就入队,然后CAS的方式自增并返回老的库存值
- 如果新库存仍然未满,会触发其它的生产线程生产
- 释放putLock锁
- 如果原库存为0,会触发其它的消费线程开始消费
|
SynchronousQueue(同步队列)
- 没有容量的一个特殊队列
- 执行阻塞的方法 put 和 take的时候正常,所有的put方法有一个阻塞的公平队列或者非公平队列,所有的take操作也有一个类似的队列
- 执行非阻塞的方法 add()和remove()方法时,必须有对应的take和put方法阻塞着,不然就会报错
- peek(返回首位但是不删除元素)永远返回null,因为不存储元素public static void main(String[] args) throws Exception {final SynchronousQueue<String> queue = new SynchronousQueue<String>();//初始化不能带长度Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {try {String str = queue.take(); //线程1在获取,这是阻塞的,当线程2一添加,线程1就获取,因为SynchronousQueue是没有容量的System.out.println(str);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {queue.add("abcd"); //线程2往队列里添加元素}});t2.start();}
PriorityBlockingQueue
- 有优先级但是无界的阻塞队列,类似于List,支持自动扩容,可以指定初始化大小,也可以不指定。实际是一个Arr[]
- 内部有一个最小堆,插入和取出的时候,都要构建堆有序
- add()的时候不会报错,因为容量无限
- 支持元素实现Compare接口,或者PriorityBlockingQueue初始化的时候传入一个Compare接口实现类,两种方式进行比较优先级PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>(); //因为是无界队列,初始化可以不定义长度Task t1 = new Task();t1.setId(1);t1.setName("任务 1");Task t2 = new Task();t2.setId(4);t2.setName("任务 2");Task t3 = new Task();t3.setId(3);t3.setName("任务 3");queue.add(t2);queue.add(t3);queue.add(t1);queue.take(); // 取出来最小的 t1 ,任务1
DelayQueue
- 内部有一个PriorityBlockingQueue,最先到期的元素放在堆顶。
- 里面的元素必须要实现Delayed接口入队等待线程DelayQueue< Student> students = new DelayQueue<Student>();students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));出队消费线程while(!Thread.interrupted()){students.take().run();}
ConcurrentHashMap
绝对的线程安全同时支持并发的读写
- 分段锁技术,支持不同段的并发写
- 写的时候,默认非公平锁,锁一个分段,提升效率,锁的过程是先tryLock() 失败一定次数后,再调用阻塞的lock()期间会遍历
- 读的时候是都volatle值,保证是最新的数据
CopyOnWriteArrayList
对比的特点
- ArrayList 是线程不安全的,比如两个线程同时修改一个数据,会导致修改被覆盖的问题
- Vector是追求绝对的线程安全和所见即所得,插入的时候枷锁,读的时候也加锁,确保当前没有其他线程在修改这个数组
- CopyOnWriteArrayList对插入加锁,但是读的时候要求没那么严格,只是确保最终一致性,也就是其他线程提交完之后的重新读取是最新值
自身的特点:
- 消耗内存,需要一个相同的拷贝数组
- 不能确保查询的时候是即时最新的数据
lock() lockInterruptibly()
区别:lockInterruptibly()会更好的响应中断操作而不是急于获取锁,程序对lockInterruptibly锁的控制性更强。
lock加锁过程是:先CAS尝试获取锁,获取失败之后,再忙等待阻塞式尝试获取锁,直到获取成功后,再check这个等待过程中是否有被中断的操作做出相应的响应
lockInterruptibly() 方法是尝试获取锁的每个过程当中都去检查该线程是否有被终端操作,有就抛出异常,没有就继续尝试获取锁
并发工具类(基于ReentrantLock实现的工具类)
http://www.importnew.com/21889.html
CountDownLatch(分页下载,2个方法配合)
await方法阻塞一个或者多个线程,countDown()计数到一定次数后,所有阻塞线程再一起执行。
- 比如主线程开启4个页面的sheet下载操作,直到4个独立的线程都执行完毕后再回到主线程打印,下载完毕
- 比如主线程调用 countdownLatch.await方法,阻塞主线程,一直到所有子线程完成某种初始化后,主线程再往下执行public static void main(String[] args) {//所有线程阻塞,然后统一开始final CountDownLatch begin = new CountDownLatch(1);//主线程阻塞,直到所有分线程执行完毕final CountDownLatch end = new CountDownLatch(5);for(int i = 0; i < 5; i++){Thread thread = new Thread(new Runnable() {@Overridepublic void run() {try {begin.await();System.out.println(Thread.currentThread().getName() + " 起跑");Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + " 到达终点");end.countDown();} catch (InterruptedException e) {e.printStackTrace();}}});thread.start();}try {System.out.println("1秒后统一开始");Thread.sleep(1000);begin.countDown();end.await();System.out.println("停止比赛");} catch (InterruptedException e) {e.printStackTrace();}}
CyclicBarrier(操场集合到齐后开始比赛,一个方法await)
阻塞引用的所有的子线程,直到满足一定的条件,所有的子线程才回复工作。但是和CDL不同,更简洁,子线程调用 CyclicBarrier.await()的时候,不仅阻塞当前子线程,而且计数会减少1。不需要额外调用别的方法。
通过它可以实现让一组线程等待至某个状态之后再全部同时执行。这个阻塞的是所有的子线程,一直到都完成大家在一起往下执行。而不是阻塞主线程
支持回调方法
semaphore(信号量,10个人,5个茅坑,挨个排队上厕所,acquire() 阻塞等待 和release()释放资源)
限制最多N个线程同时运行,超出的线程争夺会阻塞,直到有线程release。这和线程池的maxCore类似但是不一样。
http://blog.csdn.net/sinat_36246371/article/details/53872412
AQS原理
- AQS是锁的实现者,通过用AQS简化了锁的实现屏蔽了同步状态管理,线程的排队,等待唤醒的底层操作。简而言之,锁是面向使用者,AQS是锁的具体实现者。
- AQS通过trylock方法差异化实现公平锁和非公平锁的争夺策略
- AQS通过参数EXCLUSIVE和SHARE来实现共享锁(CountLockDown)和独占锁(ReentraintLock)
- 有一个FIFO的双向链表队列,只要进入到该队列当中不管是不是公平锁,都只有header节点有机会争夺锁的权利
- 维护一个status的volitile的字段,标志当前锁的状态
公平锁(独占锁)
获取锁的过程:先trylock获取锁,当锁state=0且队列前没有人排队 或者该锁的拥有者是自己重入时直接获取锁。获取锁失败的话,则把自己加入到CLH队列当中,并且轮询式继续反复trylock过程。
相当于整个CLH队列的所有对象都在各自的线程中轮询尝试:有没有到我这个过程
非公平锁(独占锁)
在进入CHL之前有2次机会进行插队的CAS争夺锁,都失败进入CLH队列当中时,就必须等到自己成了头结点时才有机会争夺锁
ThreadLocal
如果多个线程之间为了传递运行时参数可以通过引用相同的Runnable实现类,在实现类里面定义公用的变量,从而实现运行时的参数传递
例如下面的,100个线程,只允许10个线程自增number,通过引用相同的runnable实现类 test,从而实现运行时的参数传递number,从而可以动态的协调线程的执行逻辑,同时公用的变量是线程不安全的。public static void main(String[] args) throws Exception {Test2 test=new Test2(0);for(int i=0;i<100;i++){new Thread(test).start();}}public class Test2 implements Runnable{int number=0;Test2(int number){this.number=number;}@Overridepublic void run() {if(number==10){System.out.println("i am full");}else{number++;System.out.println(Thread.currentThread().getName()+"number is"+number);}}}为了解决公用变量不安全的情况,可以通过ThreadLocal来定义某些变量为线程独有的,从而实现线程隔离
例如下面的,对线程私有变量 number进行threadlocal处理,对于公用变量 sharedNumber不做处理,实现线程共享传递参数public class SeqCount {private static ThreadLocal<Integer> number = new ThreadLocal<Integer>(){public Integer initialValue() {return 0;}};int sharedNumber=0;public int nextSeq(){number.set(number.get() + 1);return number.get();}public static void main(String[] args){SeqCount seqCount = new SeqCount();for(int i=0;i<4;i++){new SeqThread(seqCount).start();}}private static class SeqThread extends Thread{private SeqCount seqCount;SeqThread(SeqCount seqCount){this.seqCount = seqCount;}public void run() {for(int i = 0 ; i < 3 ; i++){System.out.println(Thread.currentThread().getName() + " seqCount :" + seqCount.nextSeq());seqCount.sharedNumber++;System.out.println(Thread.currentThread().getName() + " shareNumber :" + seqCount.sharedNumber);}}}}ThreadLocal实现线程隔离原理:每个Thread独立拥有一个ThreadLocalMap,实现线程之间变量的隔壁,然后set、get都是对这个ThreadLocalMap进行操作,其中的key就是申明的ThreadLocal,线性探测法实现的HashMap的数据结构
Thread :每个Thread只有一个独立绑定的ThreadLocalMap,线程独享Thread t = Thread.currentThread();ThreadLocalMap map = getMap(t);ThreadLocalMap:每个ThreadLocalMap拥有一个线性探测的HashMap结构,每个Entry的key就是一个ThreadLocalprivate Entry getEntry(ThreadLocal key) {int i = key.threadLocalHashCode & (table.length - 1);Entry e = table[i];if (e != null && e.get() == key)return e;elsereturn getEntryAfterMiss(key, i, e);}ThreadLocal : 每个Thread可以定义多个ThreadLocalprivate static ThreadLocal<Integer> number = new ThreadLocal<Integer>(){public Integer initialValue() {return 0;}};private static ThreadLocal<Integer> number2 = new ThreadLocal<Integer>(){public Integer initialValue() {return 1;}};