Concurrent:
BlockingQueue(阻塞队列)
ArrayBlockingQueue(指定容量,不可变),LinkeBlocingQueue(指定容量不可变,也可以不指定容量,默认Integer.Max_value)
PriorityBlockingQueue(根据实现的接口自定义排序,只有在逐个拿取的时候才有序)
SynchronousQueue(长度以1只能为1)
ConcurrentMap
ConcurrentHashMap
1.5 分桶加锁,1.8 CAS+红黑树来保证线程安全
ConcurrentNavigableMap(针对有序列表,有map.headMap map.subMap map.tailMap , 返回有序的在取出范围的map)
CountDownLatch 闭锁
CountDownLatch 以一个给定的数量初始化。 countDown() 每被调用一次,这一数 量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。
等待一定数量的线程完成。来执行其后的程序
CyclicBarrier 栅栏
它能够对处理一些算法的线程实现同
步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这 里,然后所有线程才可以继续做其他事情
Exchanger 交换机
类表示一种两个线程可以进行互相交换对象的会和点 ,只能两个线程之间交换数据
Semaphore 信号量
l acquire()
l release()
计数信号量由一个指定数量的 "许可" 初始化。每调用一次 acquire(),一个许可会 被调用线程取走。每调用一次 release(),一个许可会被返还给信号量。因此,在没 有任何 release() 调用时,最多有 N 个线程能够通过 acquire() 方法, N 是该信 号量初始化时的许可的指定数量。
线程池
ExecutorService
(executors.newFixedThreadPool,长任务场景,只有核心线程,没有临时线程,容纳无限多
,newCachedThreadPool(高并发短任务场景,没有核心线程,全部都是临时线程,处理任意多的线程)
,newSingleThreadPool
,newSchedulerThreadPool(有核心线程,有临时线程)
)
提交线程的方法
execute() 提交线程 没有返回值submit(Runnable) 提交线程,返Future 可以通过Future.get()得到该线程的状态但是如果该线程未执行完成,那么该方法阻塞
submit(Callable) 同上面类似,但是线程可以带有返回值
invokeAny(.....),随机选择线程执行一个
invokeAll(),自动执行所有的线程
关闭线程池:ExecutorService.shutdown();该方法不再接受线程池,等待所有线程执行完毕后,线程池结束。
ExecutorService.shutdown();立即关闭,退出任务,正在执行的线程可能会出错。
##Callable只能用线程池提交
Callable runnable :
返回值
异常,runnabel没有容错机制,callable有容错机制,可以将 异常抛给上层处理
Callable只能通过submit方法提交,runnable可以new 也可 以通过线程池提交
ReadWriteLock 读写锁,可以是公平,也可以是非公平的。该锁可以跨方法
读锁可以共享,写锁互斥
读锁 readLock().lock();
写锁 writeLock().lock();
package hgs.test;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.PriorityBlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.Semaphore;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantReadWriteLock;public class Test {public static void main(String[] args) throws InterruptedException, BrokenBarrierException {//ConcurrentMap//BlockingQueue<String> bq = new ArrayBlockingQueue<String>(4);//add(超过长度会报错) remove(没有元素会报错)//offer(如果可以插入返回true 否则 false) poll(如果没有元素返回null)//bq.offer("1");//bq.offer("1");//bq.offer("1");//bq.offer("1");//bq.offer("1");//String flag = bq.poll();//System.out.println(flag);//put take 阻塞式//bq.put("1");//bq.put("1");//bq.put("1");//bq.put("1");//bq.put("1");//bq.put("1");//bq.take();//offer(o,timeout,timeunit),poll(timeout,tameunit) 等待timeout时间,然后跳过//bq.poll(10, TimeUnit.SECONDS);//bq.element();//检查是否为空,是的话跑出异常//bq.peek();//阻塞//闭锁//栅栏//exchanger交换器//6.Semaphore 信号量//原始创建线程池,executors.newCacheThreadPool 的底层调用该方法//可重入锁 ReentrantLock 可重入读写锁ReentrantReadWriteLockReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();for (int i = 0 ;i<3;i++) {new WriteLockTest( rwLock ).start();} }}class WriteLockTest extends Thread{ReentrantReadWriteLock rwLock ;public WriteLockTest(ReentrantReadWriteLock rwLock ) {this.rwLock = rwLock;}@Overridepublic void run() {rwLock.writeLock().lock();System.out.println("reading......");try {Thread.sleep((long)(Math.random()*2000));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("done......");rwLock.writeLock().unlock();}}class ReadLockTest extends Thread{ReentrantReadWriteLock rwLock ;public ReadLockTest(ReentrantReadWriteLock rwLock ) {this.rwLock = rwLock;}@Overridepublic void run() {rwLock.readLock().lock();System.out.println("reading......");try {Thread.sleep((long)(Math.random()*2000));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("done......");rwLock.readLock().unlock();}}class ThreadPoolTest implements Runnable{ThreadPoolTest (){}@Overridepublic void run() { System.out.println(Thread.currentThread().getName()); try {Thread.sleep((long)(Math.random()*2000));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}class SemaphoreTest extends Thread{Semaphore s = null;public SemaphoreTest(Semaphore s) {this.s = s;}@Overridepublic void run() {try {s.acquire();System.out.println("aquire......");Thread.sleep((long)(Math.random()*3000));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("release......");s.release();}}class ExchangerTest implements Runnable{Exchanger<String> ex = null;public ExchangerTest(Exchanger<String> ex) {this.ex = ex;}@Overridepublic void run() {String my = Thread.currentThread().getName();String exstr = null;try {exstr = ex.exchange(my);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(my+" "+exstr);}}class BoyRun implements Runnable{CountDownLatch cdl ;public BoyRun(CountDownLatch cdl ) {this.cdl = cdl;}@Overridepublic void run() {System.out.println("cdl"+" +1");cdl.countDown();System.out.println("cdl"+" +1--");}}class GirlRan implements Runnable {CyclicBarrier cb ;public GirlRan(CyclicBarrier cb ) {this.cb = cb;}@Overridepublic void run() {System.out.println("cdl"+" +1");try {Thread.sleep((long)(Math.random()*5000));cb.await();} catch (InterruptedException | BrokenBarrierException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("cdl"+" +1--");}}class Boy implements Comparable<Boy>{String name;int age;public Boy(String name ,int age) {this.name = name;this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public int compareTo(Boy o) {return this.age - o.age;}@Overridepublic String toString() {return "Boy [name=" + name + ", age=" + age + "]";}}