本篇文章给大家分享的是有关java并发编程的入门过程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
1、入门介绍
1.1、实现线程的2种方式
package chapter2;public class MyThreadDemo1 { public static void main(String[] args) { new Thread1().start(); new Thread(new Thread2()).start(); }}class Thread1 extends Thread{ @Override public void run() { System.out.println(Thread.currentThread().getName()); }}class Thread2 implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); }}
1.2、线程的状态(生命周期)
1.2.1、线程的5种状态
新建(NEW):新创建了一个线程对象。
可运行(RUNNABLE):线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取cpu 的使用权 。
运行(RUNNING):可运行状态(runnable)的线程获得了cpu 时间片(timeslice) ,执行程序代码。
阻塞(BLOCKED):阻塞状态是指线程因为某种原因放弃了cpu 使用权,也即让出了cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu timeslice 转到运行(running)状态。阻塞的情况分三种:
(一). 等待阻塞:运行(running)的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。(二). 同步阻塞:运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。(三). 其他阻塞:运行(running)的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入可运行(runnable)状态。
死亡(DEAD):线程run()、main() 方法执行结束,或者因异常退出了run()方法,则该线程结束生命周期。死亡的线程不可再次复生。
1.2.2、线程运行状态图
1). 初始状态
实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了初始状态
2). 可运行状态
可运行状态只是说你资格运行,调度程序没有挑选到你,你就永远是可运行状态。
调用线程的start()方法,此线程进入可运行状态。
当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,这些线程也将进入可运行状态。
当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入可运行状态。
锁池里的线程拿到对象锁后,进入可运行状态。
3). 运行状态
线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。
4). 死亡状态
当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它死去。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦死亡,就不能复生。
在一个死去的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。
5). 阻塞状态
当前线程T调用Thread.sleep()方法,当前线程进入阻塞状态。
运行在当前线程里的其它线程t2调用join()方法,当前线程进入阻塞状态。
等待用户输入的时候,当前线程进入阻塞状态。
6). 等待队列(是Object里的方法,但影响了线程)
调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj) 代码段内。
与等待队列相关的步骤和图
线程1获取对象A的锁,正在使用对象A。
线程1调用对象A的wait()方法。
线程1释放对象A的锁,并马上进入等待队列。
锁池里面的对象争抢对象A的锁。
线程5获得对象A的锁,进入synchronized块,使用对象A。
线程5调用对象A的notifyAll()方法,唤醒所有线程,所有线程进入锁池。||||| 线程5调用对象A的notify()方法,唤醒一个线程,不知道会唤醒谁,被唤醒的那个线程进入锁池。
notifyAll()方法所在synchronized结束,线程5释放对象A的锁。
锁池里面的线程争抢对象锁,但线程1什么时候能抢到就不知道了。||||| 原本锁池+第6步被唤醒的线程一起争抢对象锁。
7). 锁池状态
当前线程想调用对象A的同步方法时,发现对象A的锁被别的线程占有,此时当前线程进入锁池状态。简言之,锁池里面放的都是想争夺对象锁的线程。
当一个线程1被另外一个线程2唤醒时,1线程进入锁池状态,去争夺对象锁。
锁池是在同步的环境下才有的概念,一个对象对应一个锁池。
1.3. sleep、yield、join、wait的比较
Thread.sleep(long millis),一定是当前线程调用此方法,当前线程进入阻塞,但不释放对象锁,millis后线程自动苏醒进入可运行状态。作用:给其它线程执行机会的最佳方式。
Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的cpu时间片,由运行状态变会可运行状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。
t.join()/t.join(long millis),当前线程里调用其它线程1的join方法,当前线程阻塞,但不释放对象锁,直到线程1执行完毕或者millis时间到,当前线程进入可运行状态。
obj.wait(),当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout)timeout时间到自动唤醒。
obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。
1.4、Thread.sleep(long time)
Thread.sleep(long time)方法的作用是让当前正在执行的线程休眠(让出CPU时间片)指定的毫秒数。
需要注意的是:
调用sleep()方法时,如果当前线程持有锁不会导致当前线程释放锁。
sleep不释放锁 线程是进入阻塞状态还是就绪状态?
答案是进入阻塞状态,确切的说Thread在Java的TIMED_WAITING状态(但这个状态其实并没那么重要,可以认为是java的内部细节,用户不用太操心)。往下一层,在不同OS上底层的sleep的实现细节不太一样。但是大体上就是挂起当前的线程,然后设置一个信号或者时钟中断到时候唤醒。sleep后的的Thread在被唤醒前是不会消耗任何CPU的(确切的说,大部分OS都会这么实现,除非某个OS的实现偷懒了)。这点上,wait对当前线程的效果差不多是一样的,也会暂停调度,等着notify或者一个超时的时间。期间CPU也不会被消耗。
2、多线程实现银行叫号排队
2.1 版本1
package chapter2.version1;public class Bank { public static void main(String[] args) { TicketWindow ticketWindow = new TicketWindow("1号窗口"); TicketWindow ticketWindow2 = new TicketWindow("2号窗口"); TicketWindow ticketWindow3 = new TicketWindow("3号窗口"); ticketWindow.start(); ticketWindow2.start(); ticketWindow3.start(); }}
package chapter2.version1;public class TicketWindow extends Thread { private static int MAX = 50; private static int current = 1; private String name; public TicketWindow(String name){ this.name = name; } @Override public void run() { while (current < MAX){ System.out.println("窗口:" + name +" 当前叫号:" + current); current++; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }}
此版本中注意TicketWindow 类的成员变量current 为static, 表示无论实例化这个类多少次,都共享同一份变量,因为这个变量是在类加载的时候就已经创建好的。为了让多个线程消耗同一个current 所以才定义为static的,不然每个线程的current都是各自的,与其他线程不相关。
以上代码把业务与线程紧密掺杂在一起,为了让多个线程访问同一份current把他定义为static明显是不合适的。
2.2 版本2
package chapter2.version2;public class Bank { public static void main(String[] args) { // 有线程安全问题, 多个线程同时访问到current变量 TicketWindow ticketWindow = new TicketWindow(); Thread windowThread1 = new Thread(ticketWindow, "1号窗口"); Thread windowThread2 = new Thread(ticketWindow, "2号窗口"); Thread windowThread3 = new Thread(ticketWindow, "3号窗口"); windowThread1.start(); windowThread2.start(); windowThread3.start(); }
}
package chapter2.version2;public class TicketWindow implements Runnable { private int MAX = 50; private int current = 1; @Override public void run() { while (current < MAX){ System.out.println("窗口:" + Thread.currentThread().getName() +" 当前叫号:" + current); current++; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }
}
这里TicketWindow 并不是继承Thread, 这样启动线程多次线程可以共享同一份TicketWindow 实例, 把业务逻辑与线程启动拆分清晰明了。 注:以上2个版本均存在线程安全性问题,这是由于current变量在run()方法可能被多个线程同时访问, 可能多个线程同时执行到
System.out.println("窗口:" + Thread.currentThread().getName() +" 当前叫号:" + current);
这行代码,导致不同的线程打印出了一样的叫号值,比如运行以上程序会输出如下结果:
3、守护线程
要点:守护线程会随着父线程的退出而退出, 守护线程适宜一些辅助性的工作,而不能把核心工作的线程设置为守护线程。
package chapter2;public class DeemonThreadDemo { public static void main(String[] args) { Thread thread = new Thread(){ @Override public void run() { while (true){ System.out.println("hello"); } } }; // 设置为守护线程 thread.setDaemon(true); thread.start(); System.out.println("主线程退出"); }}
从代码中可以看到thread 中的代码有while(true){ }语句, 在我们的映像中while(true)是死循环应该永远不退出,永远打印hello, 但是从输出结果可以看到运行一段时间后就没有再打印了,说明父线程结束执行了,子线程也随机退出了。
4、Thread.join()
Thread.join() 作用:调用join()方法的线程会等到他自己执行结束才会继续往后执行
Thread.join(time) 作用:调用join(time)方法的线程会等到他自己执行指定时间后就会继续往后执行,
4.1 无限等待
package chapter2;public class ThreadJoinDemo { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { for (int i =0; i < 500; i++){ System.out.println(Thread.currentThread().getName() + ":" + i); } }); Thread thread2 = new Thread(() -> { for (int i =0; i < 500; i++){ System.out.println(Thread.currentThread().getName() + ":" + i); } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("所有线程已结束执行"); }}
thread1 和thread2交互执行, 而main线程的打印结果一定是在2个线程全部执行完后才打印结果。
4.2 等待指定时间
package chapter2;public class ThreadJoinDemo2 { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() +"开始执行"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +"结束执行"); }); thread1.start(); // 虽然join了, 但是只会等待指定的时间,而不会无限等待 thread1.join(1000); System.out.println("主线程等到1000毫秒就执行到了,而无需等到thread1执行完毕"); }}
5、中断线程的几种方式
5.1、介绍
package chapter2;import org.junit.Test;public class ThreadInterrupt { public static void main(String[] args) { Thread thread = new Thread(() ->{ while (true){ try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("线程被打断"); e.printStackTrace(); // 中断后让其退出while循环, 这样来控制线程的退出 break; } } }); thread.start(); System.out.println(thread.isInterrupted()); // 打断thread线程的sleep, 让thread1不再继续sleep thread.interrupt(); System.out.println(thread.isInterrupted()); System.out.println("主线程退出"); } @Test public void test2(){ Object MONITOR = new Object(); Thread thread = new Thread(() ->{ while (true){ synchronized (MONITOR){ try { MONITOR.wait(100); } catch (InterruptedException e) { System.out.println("线程被打断" + Thread.interrupted()); e.printStackTrace(); } } } }); thread.start(); System.out.println(thread.isInterrupted()); thread.interrupt(); System.out.println(thread.isInterrupted()); System.out.println("主线程退出"); } @Test public void test3(){ Thread thread1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() +"开始执行"); while (true){ } }); Thread main = Thread.currentThread(); Thread thread2 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 目前是main线程在等待thread1线程,所以需要打断main线程,才能让main线程不再继续等待而是继续往后执行 main.interrupt(); System.out.println("interrupt"); }); thread1.start(); thread2.start(); try { // main线程会等待thread1线程执行完毕才能继续往后执行 thread1.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主线程执行到了"); }}
5.2、通过whilte(running) { }变量控制
package chapter2;import org.junit.Test;public class ThreadCloseDemo { class Worker implements Runnable{ private volatile boolean running = true; @Override public void run() { // 这里有问题, 假如doSomething()非常耗时, 下一次执行while (running)没有机会得到执行,没法立即中断 while (running){ // 假如执行doSomething()方法 } System.out.println("退出死循环"); } public void shutdown(){ this.running = false; } } @Test public void close1(){ Worker worker = new Worker(); Thread thread = new Thread(worker); thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } worker.shutdown(); System.out.println("主线程执行完毕"); }}
test1()的执行结果:
5.3、通过Thread.interrupt()控制
package chapter2;import org.junit.Test;public class ThreadCloseDemo2 { class Worker2 implements Runnable{ private volatile boolean running = true; @Override public void run() { while (true){ if (Thread.currentThread().isInterrupted()){ break; } // 这里有问题, 假如doSomething()非常耗时, 上面的代码根本没有机会得到执行,没法立即中断 // doSomething() } System.out.println("退出死循环"); } } @Test public void close2(){ Worker2 worker = new Worker2(); Thread thread = new Thread(worker); thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } thread.interrupt(); System.out.println("主线程执行完毕"); }}
5.4、暴力终止
package chapter2;import org.junit.Test;public class ThreadCloseDemo2 { class ThreadService{ private Thread executeThread; private boolean running = true; public void execute(Runnable task){ executeThread = new Thread(() -> { Thread workerThread = new Thread(task); workerThread.setDaemon(true); workerThread.start(); // try { // 让executeThread等待workerThread结束执行才往后执行 // 假如workerThread比较耗时,由于守护线程是依附于父线程的,假如这里不join, // 那么可能会产生workerThread还没有真正执行完,executeThread线程就执行完了,导致workerThread也跟着退出了 workerThread.join(); } catch (InterruptedException e) { // e.printStackTrace(); } //executeThread线程被打断等待 或者 workerThread正常执行结束 this.running = false; }); executeThread.start(); } public void shutdown(long timeout){ long beginTime = System.currentTimeMillis(); // workerThread还在执行 while (running){ // 判断是否等待超时 if (System.currentTimeMillis() - beginTime >= timeout){ // 等待超时了, 中断executeThread的等待会使this.running = false;执行到, // executeThread执行完毕导致workerThread守护线程跟着退出 executeThread.interrupt(); //退出while循环 break; } try { Thread.sleep(1); } catch (InterruptedException e) { // 说明main线程被打断了 e.printStackTrace(); break; } } } public boolean isRunning(){ return this.running; } } @Test public void test1() { ThreadService threadService = new ThreadService(); threadService.execute(() -> { // 这是一个非常非常耗时的操作 while (true){ } }); //让线程2秒后立即中断执行 long begin = System.currentTimeMillis(); threadService.shutdown(3000); long end = System.currentTimeMillis(); System.out.println("耗时: " + (end - begin)); System.out.println("主线程执行结果"); } @Test public void test2() { ThreadService threadService = new ThreadService(); threadService.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); //让线程2秒后立即中断执行 long begin = System.currentTimeMillis(); threadService.shutdown(3000); long end = System.currentTimeMillis(); System.out.println("耗时: " + (end - begin)); System.out.println("主线程执行结果"); }}
5.5、interrupt()、interrupted()、isInterrupted()区别与原理
5.5.1、结论
interrupt()方法:用于中断线程的,调用该方法的线程的状态将被置为"中断"状态。注意:线程中断仅仅是设置线程的中断状态位,不会停止线程。需要用户自己去监视线程的状态并做处理。支持线程中断的方法(也就是线程中断后会抛出InterruptedException的方法,比如Thread.sleep,以及Object.wait等方法)就是在监视线程的中断状态,一旦线程的中断状态被置为“中断状态”,就会抛出中断异常,并将线程的中断状态为设置为false。
interrupted():返回线程是否处于已中断状态并清除中断状态
isInterrupted():返回线程是否处于已中断状态
5.5.2、原理
以下内容来源于 https://my.oschina.net/itblog/blog/787024
public class Interrupt {public static void main(String[] args) throws Exception {Thread t = new Thread(new Worker());t.start();Thread.sleep(200);t.interrupt();System.out.println("Main thread stopped.");}public static class Worker implements Runnable {public void run() {System.out.println("Worker started.");try {Thread.sleep(500);} catch (InterruptedException e) {System.out.println("Worker IsInterrupted: " + Thread.currentThread().isInterrupted());}System.out.println("Worker stopped.");}}}
内容很简单:主线程main启动了一个子线程Worker,然后让worker睡500ms,而main睡200ms,之后main调用worker线程的interrupt方法去中断worker,worker被中断后打印中断的状态。 下面是执行结果:
Worker started.Main thread stopped.Worker IsInterrupted: falseWorker stopped.
Worker明明已经被中断,而isInterrupted()方法竟然返回了false,为什么呢?
在stackoverflow上搜索了一圈之后,发现有网友提到:可以查看抛出InterruptedException方法的JavaDoc(或源代码),于是我查看了Thread.sleep方法的文档,doc中是这样描述这个InterruptedException异常的:
InterruptedException - if any thread has interrupted the current thread. The interrupted status of the current thread is cleared when this exception is thrown.
注意到后面这句“当抛出这个异常的时候,中断状态已被清除”。所以isInterrupted()方法应该返回false。可是有的时候,我们需要isInterrupted这个方法返回true,怎么办呢?这里就要先说说interrupt, interrupted和isInterrupted的区别了:
interrupt方法是用于中断线程的,调用该方法的线程的状态将被置为"中断"状态。注意:线程中断仅仅是设置线程的中断状态位,不会停止线程。需要用户自己去监视线程的状态为并做处理。支持线程中断的方法(也就是线程中断后会抛出InterruptedException的方法,比如这里的sleep,以及Object.wait等方法)就是在监视线程的中断状态,一旦线程的中断状态被置为“中断状态”,就会抛出中断异常。这个观点可以通过这篇文章证实:
再来看看interrupted方法的实现:
public static boolean interrupted() { return currentThread().isInterrupted(true);}
和isInterrupted的实现:
public boolean isInterrupted() { return isInterrupted(false);}
这两个方法一个是static的,一个不是,但实际上都是在调用同一个方法,只是interrupted方法传入的参数为true,而iInterrupted传入的参数为false。那么这个参数到底是什么意思呢?来看下这个isInterrupted(boolean)方法的实现:
private native boolean isInterrupted(boolean ClearInterrupted);
这是一个native方法,看不到源码没有关系,参数名字ClearInterrupted已经清楚的表达了该参数的作用----是否清除中断状态。方法的注释也清晰的表达了“中断状态将会根据传入的ClearInterrupted参数值确定是否重置”。所以,静态方法interrupted将会清除中断状态(传入的参数ClearInterrupted为true),而实例方法isInterrupted则不会(传入的参数ClearInterrupted为false)。 回到刚刚的问题:很明显,如果要isInterrupted这个方法返回true,通过在调用isInterrupted方法之前再次调用interrupt()方法来恢复这个中断的状态即可:
public class Interrupt {public static void main(String[] args) throws Exception {Thread t = new Thread(new Worker());t.start();Thread.sleep(200);t.interrupt();System.out.println("Main thread stopped.");}public static class Worker implements Runnable {public void run() {System.out.println("Worker started.");try {Thread.sleep(500);} catch (InterruptedException e) {Thread curr = Thread.currentThread();//再次调用interrupt方法中断自己,将中断状态设置为“中断”curr.interrupt();System.out.println("Worker IsInterrupted: " + curr.isInterrupted());System.out.println("Worker IsInterrupted: " + curr.isInterrupted());System.out.println("Static Call: " + Thread.interrupted());//clear statusSystem.out.println("---------After Interrupt Status Cleared----------");System.out.println("Static Call: " + Thread.interrupted());System.out.println("Worker IsInterrupted: " + curr.isInterrupted());System.out.println("Worker IsInterrupted: " + curr.isInterrupted());}System.out.println("Worker stopped.");}}}
执行结果:
Worker started.Main thread stopped.Worker IsInterrupted: trueWorker IsInterrupted: trueStatic Call: true---------After Interrupt Status Cleared----------Static Call: falseWorker IsInterrupted: falseWorker IsInterrupted: falseWorker stopped.
从执行结果也可以看到,前两次调用isInterrupted方法都返回true,说明isInterrupted方法不会改变线程的中断状态,而接下来调用静态的interrupted()方法,第一次返回了true,表示线程被中断,第二次则返回了false,因为第一次调用的时候已经清除了中断状态。最后两次调用isInterrupted()方法就肯定返回false了。
那么,在什么场景下,我们需要在catch块里面中断线程(重置中断状态)呢?
答案是:如果不能抛出InterruptedException(就像这里的Thread.sleep语句放在了Runnable的run方法中,这个方法不允许抛出任何受检查的异常),但又想告诉上层调用者这里发生了中断的时候,就只能在catch里面重置中断状态了。
public class TaskRunner implements Runnable { private BlockingQueue<task> queue; public TaskRunner(BlockingQueue<task> queue) { this.queue = queue; } public void run() { try { while (true) { Task task = queue.take(10, TimeUnit.SECONDS); task.execute(); } } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); } }}
那么问题来了:为什么要在抛出InterruptedException的时候清除掉中断状态呢?
这个问题没有找到官方的解释,估计只有Java设计者们才能回答了。但这里的解释似乎比较合理:一个中断应该只被处理一次(你catch了这个InterruptedException,说明你能处理这个异常,你不希望上层调用者看到这个中断)。
参考地址:
https://my.oschina.net/itblog/blog/787024
http://stackoverflow.com/questions/7142665/why-does-thread-isinterrupted-always-return-false
http://stackoverflow.com/questions/2523721/why-do-interruptedexceptions-clear-a-threads-interrupted-status
http://www.ibm.com/developerworks/library/j-jtp05236/
http://blog.csdn.net/z69183787/article/details/25076033
6、synchronized
6.1、示例代码
package chapter2;public class SynchronizedDemo { private static final Object LOCK = new Object(); public static void main(String[] args) { Runnable runnable = () -> { synchronized (LOCK){ System.out.println(Thread.currentThread().getName() +"正在执行"); try { Thread.sleep(60_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +"执行结束"); } }; Thread thread1 = new Thread(runnable); Thread thread2 = new Thread(runnable); Thread thread3 = new Thread(runnable); thread1.start(); thread2.start(); thread3.start(); System.out.println("主线程执行结束"); }}
6.2、通过jconsole查看
通过在cmd输入jconsole命令显示如下界面:
选中本地进程中刚刚运行的java程序, 进入主界面
可以看到 Thread-0 、Thread-1、Thread-2运行的三个线程,点击对应线程右边可以看到“拥有者 Thread-0”
6.3、通过jps、jstack查看
启动程序后:
输入jps命令, 查看当前正在运行的java进程
输入jstack [pid] 可以看到有三个线程 Thread-1、Thread-2均处于BLOCKED(on object nonitor)状态、而Thread-0处于TIMED_WATING(sleepint)状态(因为Thread.sleep()的作用)
6.4、通过javap -c [xxxx.class]命令查看汇编指令
6.5 this锁
含义:指在方法上面加上synchronized 关键字
package chapter2;public class ThisLockDemo { public static void main(String[] args) { ThisLock thisLock = new ThisLock(); Thread thread1 = new Thread(() -> { thisLock.m1(); }); Thread thread2 = new Thread(() -> { thisLock.m2(); }); thread1.start(); thread2.start(); System.out.println("主线程结束执行"); }}class ThisLock{ public synchronized void m1(){ System.out.println(Thread.currentThread().getName() +" m1开始执行"); try { Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +" m1结束执行"); } public synchronized void m2(){ System.out.println(Thread.currentThread().getName() +" m2开始执行"); try { Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +" m2结束执行"); }}
运行结果:
可以看到Thread-1 m2的执行必须要等待Thread-0 m1执行完毕后才能执行, 说明同一个类在方法上面加上synchronized关键字所使用的的锁是this锁
6.6 class锁
介绍:在static方法上加synchronized关键所使用的的锁是class锁
package chapter2;public class ClassLockDemo { public static void main(String[] args) { ClassLock classLock1 = new ClassLock(); ClassLock classLock2 = new ClassLock(); Thread thread1 = new Thread(() -> { classLock1.m1(); }); Thread thread2 = new Thread(() -> { classLock2.m2(); }); thread1.start(); thread2.start(); System.out.println("主线程结束执行"); }}class ClassLock{ public static synchronized void m1(){ System.out.println(Thread.currentThread().getName() +" m1开始执行"); try { Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +" m1结束执行"); } public static synchronized void m2(){ System.out.println(Thread.currentThread().getName() +" m2开始执行"); try { Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() +" m2结束执行"); }}
可以看到Thread-1 m2的执行必须要等待Thread-0 m1执行完毕后才能执行, 说明同一个类的多个实例在static方法上面加上synchronized关键字所使用的的锁是class锁
6.7 死锁
package chapter2;public class DieLockDemo { public static void main(String[] args) { DieLock dieLock = new DieLock(); Thread thread1 = new Thread(() ->{ dieLock.doSomething1(); }); Thread thread2 = new Thread(() ->{ dieLock.doSomething2(); }); thread1.start(); thread2.start(); System.out.println("主线程执行完毕"); }}class DieLock{ private final Object LOCK1 = new Object(); private final Object LOCK2 = new Object(); public void doSomething1(){ synchronized (LOCK1){ try { // 代表一个耗时操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "获取到LOCK1, 等待LOCK2"); synchronized (LOCK2){ System.out.println("doSomething1"); } } } public void doSomething2(){ synchronized (LOCK2){ try { // 代表一个耗时操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "获取到LOCK2, 等待LOCK1"); synchronized (LOCK1){ System.out.println("doSomething2"); } } }}
6.8、synchronzied锁重入
关键字synchronized拥有锁重入的功能,也就是在使用synchronized时,当一个线程获得一个对象锁,再次请求该对象锁时是可以获得该对象的锁的。表现在一个synchronized方法 / 块的内部调用本类的其他synchronized方法 / 块时,是永远可以得到锁的。
package chapter2;public class SynchronizedReentrantDemo { public synchronized void methodA(){ System.out.println("methodA invoked "); this.methodB(); } public synchronized void methodB(){ System.out.println("methodB invoked "); } public static void main(String[] args) { SynchronizedReentrantDemo demo = new SynchronizedReentrantDemo(); demo.methodA(); }}
可以看到在synchronized方法methodsA中调用被synchronized修饰的methodB()也是可以的,即使methodA()还没有释放this锁,这证明了synchronzied锁是可以重入的
6.9、synchronized的原理
Java中synchronized的锁实现包括:偏向锁、轻量级锁、重量级锁(等待时间长)
1个对象的实例包括3部分:对象头、实例变量、填充数据
对象头:实现synchronized的锁对象的基础
实例变量: 存放类的属性数据信息,包括父类的属性信息,如果是数组的实例部分还包括数组的长度,这部分内存按4字节对齐。
填充数据:由于虚拟机要求对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐,这点了解即可。
synchronized使用的锁对象是存储在Java对象头里的,jvm中采用2个字来存储对象头(如果对象是数组则会分配3个字,多出来的1个字记录的是数组长度),其主要结构是由Mark Word 和 Class Metadata Address 组成,其结构说明如下表:
其中Mark Word在默认情况下存储着对象的HashCode、分代年龄、锁标记位等以下是32位JVM的Mark Word默认存储结构
由于对象头的信息是与对象自身定义的数据没有关系的额外存储成本,因此考虑到JVM的空间效率,Mark Word 被设计成为一个非固定的数据结构,以便存储更多有效的数据,它会根据对象本身的状态复用自己的存储空间,如32位JVM下,除了上述列出的Mark Word默认存储结构外,还有如下可能变化的结构:
其中轻量级锁和偏向锁是Java 6 对 synchronized 锁进行优化后新增加的,稍后我们会简要分析。这里我们主要分析一下重量级锁也就是通常说synchronized的对象锁,锁标识位为10,其中指针指向的是monitor对象(也称为管程或监视器锁)的起始地址。每个对象都存在着一个 monitor 与之关联,对象与其 monitor 之间的关系有存在多种实现方式,如monitor可以与对象一起创建销毁或当线程试图获取对象锁时自动生成,但当一个 monitor 被某个线程持有后,它便处于锁定状态。在Java虚拟机(HotSpot)中,monitor是由ObjectMonitor实现的,其主要数据结构如下(位于HotSpot虚拟机源码ObjectMonitor.hpp文件,C++实现的)
ObjectMonitor() { _header = NULL; _count = 0; //记录个数 _waiters = 0, _recursions = 0; _object = NULL; _owner = NULL; _WaitSet = NULL; //处于wait状态的线程,会被加入到_WaitSet _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; //处于等待锁block状态的线程,会被加入到该列表 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; }
ObjectMonitor中有两个队列,_WaitSet 和 _EntryList,用来保存ObjectWaiter对象列表( 每个等待锁的线程都会被封装成ObjectWaiter对象),_owner指向持有ObjectMonitor对象的线程,当多个线程同时访问一段同步代码时,首先会进入 _EntryList 集合,当线程获取到对象的monitor 后进入 _Owner 区域并把monitor中的owner变量设置为当前线程同时monitor中的计数器count加1,若线程调用 wait() 方法,将释放当前持有的monitor,owner变量恢复为null,count自减1,同时该线程进入 WaitSe t集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,以便其他线程进入获取monitor(锁)。如下图所示
由此看来,monitor对象存在于每个Java对象的对象头中(存储的指针的指向),synchronized锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因,同时也是notify/notifyAll/wait等方法存在于顶级对象Object中的原因(关于这点稍后还会进行分析),ok~,有了上述知识基础后,下面我们将进一步分析synchronized在字节码层面的具体语义实现。
6.10、synchronized底层实现原理
为了分析底层原理,我们编写1个最简单的类,代码如下:
package chapter2;public class SynchronizedDemo2 { public synchronized static void test1(){ System.out.println("test1"); } public synchronized void test2(){ System.out.println("test2"); } public void test3(){ synchronized (this){ System.out.println("test4"); } }}
6.10.1、synchronized方法底层原理:
方法级的同步是隐式,即无需通过字节码指令来控制的,它实现在方法调用和返回操作之中。JVM可以从方法常量池中的方法表结构(method_info Structure) 中的 ACC_SYNCHRONIZED 访问标志区分一个方法是否同步方法。当方法调用时,调用指令将会 检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先持有monitor(虚拟机规范中用的是管程一词), 然后再执行方法,最后再方法完成(无论是正常完成还是非正常完成)时释放monitor。在方法执行期间,执行线程持有了monitor,其他任何线程都无法再获得同一个monitor。如果一个同步方法执行期间抛 出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的monitor将在异常抛到同步方法之外时自动释放。下面我们看看字节码层面如何实现:
通过执行javap -v SynchroizedDemo2.class会输出如下字节码指令:
test1()方法对应的字节码指令:
test2()方法对应的字节码指令:
从字节码中可以看出,synchronized修饰的方法并没有monitorenter指令和monitorexit指令,取得代之的确实是ACC_SYNCHRONIZED标识,该标识指明了该方法是一个同步方法,JVM通过该ACC_SYNCHRONIZED访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。这便是synchronized锁在同步代码块和同步方法上实现的基本原理。同时我们还必须注意到的是在Java早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,而操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是为什么早期的synchronized效率低的原因。庆幸的是在Java 6之后Java官方对从JVM层面对synchronized较大优化,所以现在的synchronized锁效率也优化得很不错了,Java 6之后,为了减少获得锁和释放锁所带来的性能消耗,引入了轻量级锁和偏向锁,接下来我们将简单了解一下Java官方在JVM层面对synchronized锁的优化。
6.10.2、synchonized代码块底层实现原理:
test3()方法对应的字节码指令:
JSR133的解释:
synchronized 语句需要一个对象的引用;随后会尝试在该对象的管程上执行 lock 动 作,如果 lock 动作未能成功完成,将一直等待。当 lock 动作执行成功,就会运行synchronized 语句块中的代码。一旦语句块中的代码执行结束,不管是正常还是异 常结束,都会在之前执行 lock 动作的那个管程上自动执行一个 unlock 动作。
synchronized 方法在调用时会自动执行一个 lock 动作。在 lock 动作成功完成之前, 都不会执行方法体。如果是实例方法,锁的是调用该方法的实例(即,方法体执行 期间的 this)相关联的管程。如果是静态方法,锁的是定义该方法的类所对应的 Class 对象。一旦方法体执行结束,不管是正常还是异常结束,都会在之前执行 lock 动作的那个管程上自动执行一个 unlock 动作。
从字节码中可知同步语句块的实现使用的是monitorenter 和 monitorexit 指令,其中monitorenter指令指向同步代码块的开始位置,monitorexit指令则指明同步代码块的结束位置,当执行monitorenter指令时,当前线程将试图获取 objectref(即对象锁) 所对应的 monitor 的持有权,当 objectref 的 monitor 的进入计数器为 0,那线程可以成功取得 monitor,并将计数器值设置为 1,取锁成功。如果当前线程已经拥有 objectref 的 monitor 的持有权,那它可以重入这个 monitor (关于重入性稍后会分析),重入时计数器的值也会加 1。倘若其他线程已经拥有 objectref 的 monitor 的所有权,那当前线程将被阻塞,直到正在执行线程执行完毕,即monitorexit指令被执行,执行线程将释放 monitor(锁)并设置计数器值为0 ,其他线程将有机会持有 monitor 。值得注意的是编译器将会确保无论方法通过何种方式完成,方法中调用过的每条 monitorenter 指令都有执行其对应 monitorexit 指令,而无论这个方法是正常结束还是异常结束。为了保证在方法异常完成时 monitorenter 和 monitorexit 指令依然可以正确配对执行,编译器会自动产生一个异常处理器,这个异常处理器声明可处理所有的异常,它的目的就是用来执行 monitorexit 指令。从字节码中也可以看出多了一个monitorexit指令,它就是异常结束时被执行的释放monitor 的指令。
6.11、Java虚拟机对synchronized的优化
锁的状态总共有四种,无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁,但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级,关于重量级锁,前面我们已详细分析过,下面我们将介绍偏向锁和轻量级锁以及JVM的其他优化手段,这里并不打算深入到每个锁的实现和转换过程更多地是阐述Java虚拟机所提供的每个锁的核心优化思想,毕竟涉及到具体过程比较繁琐,如需了解详细过程可以查阅《深入理解Java虚拟机原理》。
6.11.1、偏向锁
偏向锁是Java 6之后加入的新锁,它是一种针对加锁操作的优化手段,经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,因此为了减少同一线程获取锁(会涉及到一些CAS操作,耗时)的代价而引入偏向锁。偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时,无需再做任何同步操作,即获取锁的过程,这样就省去了大量有关锁申请的操作,从而也就提供程序的性能。所以,对于没有锁竞争的场合,偏向锁有很好的优化效果,毕竟极有可能连续多次是同一个线程申请相同的锁。但是对于锁竞争比较激烈的场合,偏向锁就失效了,因为这样场合极有可能每次申请锁的线程都是不相同的,因此这种场合下不应该使用偏向锁,否则会得不偿失,需要注意的是,偏向锁失败后,并不会立即膨胀为重量级锁,而是先升级为轻量级锁。下面我们接着了解轻量级锁。
6.11.2、轻量级锁
倘若偏向锁失败,虚拟机并不会立即升级为重量级锁,它还会尝试使用一种称为轻量级锁的优化手段(1.6之后加入的),此时Mark Word 的结构也变为轻量级锁的结构。轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。需要了解的是,轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间访问同一锁的场合,就会导致轻量级锁膨胀为重量级锁。
6.11.3、自旋锁
轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。这是基于在大多数情况下,线程持有锁的时间都不会太长,如果直接挂起操作系统层面的线程可能会得不偿失,毕竟操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,因此自旋锁会假设在不久将来,当前的线程可以获得锁,因此虚拟机会让当前想要获取锁的线程做几个空循环(这也是称为自旋的原因),一般不会太久,可能是50个循环或100循环,在经过若干次循环后,如果得到锁,就顺利进入临界区。如果还不能获得锁,那就会将线程在操作系统层面挂起,这就是自旋锁的优化方式,这种方式确实也是可以提升效率的。最后没办法也就只能升级为重量级锁了。
6.11.4、锁消除
消除锁是虚拟机另外一种锁的优化,这种优化更彻底,Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间,如下StringBuffer的append是一个同步方法,但是在add方法中的StringBuffer属于一个局部变量,并且不会被其他线程所使用,因此StringBuffer不可能存在共享资源竞争的情景,JVM会自动将其锁消除。
public class StringBufferRemoveSync {public void add(String str1, String str2) {//StringBuffer是线程安全,由于sb只会在append方法中使用,不可能被其他线程引用//因此sb属于不可能共享的资源,JVM会自动消除内部的锁StringBuffer sb = new StringBuffer();sb.append(str1).append(str2);}public static void main(String[] args) {StringBufferRemoveSync rmsync = new StringBufferRemoveSync();for (int i = 0; i < 10000000; i++) {rmsync.add("abc", "123");}}}
7、wait、notify方法
7.1、含义(需仔细研读)
wait()、notify()是Object类的方法, 调用这2个方法前,执行线程必须已经获得改对象的对象锁,即只能在同步方法或同步代码块中调用wait() 或者notify()方法,如果调用这2个方法时没有获得对象锁将会抛出IllegalMonitorStateException异常
调用wait()方法后,当前线程立即释放已经获得的锁,并且将当前线程置入“预执行队列(WaitSet)中”, 并且在wait()所在的代码处停止执行,必须直到收到notify()方法的通知或者被中断执行当前线程才能被唤醒继续往wait()方法后面的代码执行
notitify()方法用来通知那些等待获取该对象锁的线程, 如果有多个线程等待,则由线程规划器随机挑选出一个处于wait状态的线程B,对其发出Notify通知,使B退出等待队列,处于就绪状态,被重新唤醒的线程B会尝试获取临界区的对象锁,被唤醒线程B在真正获取到锁后就会继续执行wait()后面的代码。需要说明的是,在执行notify()方法后,并不会使当前线程A马上释放对象锁,处于wait状态的线程B也不能马上获取对象锁,要等到执行notify()方法的线程A将程序执行完,也就是退出synchronized代码块后,当前线程A才会释放对象锁,但是释放之后并不代表线程B就一定会获取到对象锁,只是说此时A、B都有机会竞争获取到对象锁
如果notify()方法执行时,此时并没有任何线程处于wait状态,那么执行该方法相当于无效操作
notify()与notifyAll()的区别是:notify()方法每次调用时都只是从所有处于wait状态的线程中随机选择一个线程进入就绪状态,而notifyAll()则是使所有处于wait状态的线程全部退出等待队列,全部进入就绪状态,此处唤醒不等于所有线程都获得该对象的monitor,此时优先级最高的那个线程优先执行(获得对象锁),但也有可能是随机执行(获得对象锁),这要取决于jvm实现
7.2、生产者消费者模式
7.2.1、 版本1产生假死(全部进入等待wait状态)
package chapter2;import java.util.stream.Stream;public class ProducerAndConsumerVersion1 { private final Object LOCK = new Object(); private boolean isProduced; private int num = 0; public static void main(String[] args) { ProducerAndConsumerVersion1 version1 = new ProducerAndConsumerVersion1(); Stream.of("P1", "P2", "P3", "P4").forEach((item) ->{ new Thread(() ->{ while (true) { version1.produce(); } }, item).start(); }); Stream.of("C1").forEach(item ->{ new Thread(() ->{ while (true) { version1.consumer(); } }, item).start(); }); System.out.println("主线程执行结束"); } public void produce(){ synchronized (LOCK){ if (isProduced){ try { System.out.println("[" + Thread.currentThread().getName() +"] produce wait"); LOCK.wait(); System.out.println("[" + Thread.currentThread().getName() +"] produce wait after"); } catch (InterruptedException e) { e.printStackTrace(); } } else { num++; System.out.println("[" + Thread.currentThread().getName() + "] P ==>" + num); isProduced=true; LOCK.notify(); System.out.println("[" + Thread.currentThread().getName() + "] notify after"); } } } public void consumer(){ synchronized (LOCK){ if (isProduced){ System.out.println("[" + Thread.currentThread().getName() + "] C ==>" + num); isProduced = false; LOCK.notify(); System.out.println("[" + Thread.currentThread().getName() + "] notify after"); } else { try { System.out.println("[" + Thread.currentThread().getName() +"] consumer wait"); LOCK.wait(); System.out.println("[" + Thread.currentThread().getName() +"] consumer wait after"); } catch (InterruptedException e) { e.printStackTrace(); } } } }}
以上代码执行永远不会结束,所有线程最终都变为wait状态(没有发生死锁)
7.2.2、多消费者、多生产者正确版本
package chapter2;import java.util.stream.Stream;public class ProducerAndConsumerVersion2 { private final Object LOCK = new Object(); private boolean isProduced; private int num = 0; public static void main(String[] args) { ProducerAndConsumerVersion2 version1 = new ProducerAndConsumerVersion2(); Stream.of("P1", "P2", "P3", "P4").forEach((item) ->{ new Thread(() ->{ while (true) { version1.produce(); } }, item).start(); }); Stream.of("C1", "C2", "C3", "C4").forEach(item ->{ new Thread(() ->{ while (true) { version1.consumer(); } }, item).start(); }); System.out.println("主线程执行结束"); } public void produce(){ synchronized (LOCK){ while (isProduced){ try { System.out.println("[" + Thread.currentThread().getName() +"] produce wait"); LOCK.wait(); System.out.println("[" + Thread.currentThread().getName() +"] produce wait after"); } catch (InterruptedException e) { e.printStackTrace(); } } num++; System.out.println("[" + Thread.currentThread().getName() + "] P ==>" + num); isProduced=true; LOCK.notifyAll(); System.out.println("[" + Thread.currentThread().getName() + "] notify after"); } } public void consumer(){ synchronized (LOCK){ while (!isProduced){ try { System.out.println("[" + Thread.currentThread().getName() +"] consumer wait"); LOCK.wait(); System.out.println("[" + Thread.currentThread().getName() +"] consumer wait after"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("[" + Thread.currentThread().getName() + "] C ==>" + num); isProduced = false; LOCK.notifyAll(); System.out.println("[" + Thread.currentThread().getName() + "] notify after"); } }}
7.2.3、多生产者、多消费者, 阻塞, 错误版本,浪费执行机会
package chapter2;import java.util.LinkedList;import java.util.stream.Stream;public class ProducerAndConsumerVersion3 { private final Object LOCK = new Object(); private boolean isProduced; private int num = 0; public static void main(String[] args) { Container container = new Container(10); Producer producer = new Producer(container); Consumer consumer = new Consumer(container); Stream.of("P1", "P2", "P3", "P4").forEach((item) ->{ new Thread(() ->{ while (true) { producer.produce(); } }, item).start(); }); Stream.of("C1", "C2", "C3", "C4").forEach(item ->{ new Thread(() ->{ while (true) { consumer.consume(); } }, item).start(); }); System.out.println("主线程执行结束"); }}class Producer{ private Container container; private int i = 0; public Producer(Container container){ this.container = container; } public void produce(){ while (true){ synchronized (container){ if (container.isOverflow()){ try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else{ container.push(++i); System.out.println("[" + Thread.currentThread().getName() +"] produce " + i); container.notifyAll(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Consumer{ private Container container; public Consumer(Container container){ this.container = container; } public void consume(){ while (true){ synchronized (container){ if (container.isEmpty()){ try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 这里其实是有问题的, 假设A、B线程都执行到container.wait();处于wait状态, 此时生产者执行了container.notifyAll(); // A线程获得了对象锁, 从container.wait();处往后执行,但是由于这里用的是if..else结构, // 所以会导致A线程刚被唤醒获得了对象锁,又什么都不做,马上又释放了对象锁,假设A线程的CPU时间片刚好用完又让B获得了对象锁, // 可能出现后续都一直是B获得CPU实现片获得对象锁,而A明明之前获得过一次对象锁却啥事也不干,白白浪费了一次执行机会 else { Object value = container.pop(); System.out.println("[" + Thread.currentThread().getName() +"] consume " + value); container.notifyAll(); } } } }}class Container{ private LinkedList<object> storage; private int capticy; public Container(int capticy){ this.storage = new LinkedList<>(); this.capticy = capticy; } public void push(Object obj){ this.storage.addLast(obj); } public Object pop(){ return this.storage.removeFirst(); } public int size(){ return this.storage.size(); } public boolean isOverflow(){ return size() >= capticy; } public boolean isEmpty(){ return this.storage.isEmpty(); }}
7.2.4、完全正确版本
package 生产者消费者模式;import java.util.LinkedList;import java.util.stream.Stream;@SuppressWarnings("ALL")public class ProducerAndConsumerVersion4 { private final Object LOCK = new Object(); private boolean isProduced; private int num = 0; public static void main(String[] args) { Container2 container = new Container2(10); Producer2 producer = new Producer2(container); Consumer2 consumer = new Consumer2(container); Stream.of("P1", "P2", "P3", "P4").forEach((item) -> { new Thread(() ->{ while (true) { producer.produce(); } }, item).start(); }); Stream.of("C1", "C2", "C3", "C4").forEach(item ->{ new Thread(() -> { while (true) { consumer.consume(); } }, item).start(); }); System.out.println("主线程执行结束"); }}@SuppressWarnings("ALL")class Producer2 { private Container2 container; private int i = 0; public Producer2(Container2 container){ this.container = container; } public void produce(){ synchronized (container){ // 这里的while不能换成if // 假设container现在是满的,C线程消费了一个个,然后调用container.notifyAll();通知A、B线程唤醒 // A、B线程从container.wait();这一行代码处唤醒后处于就绪状态, A获得锁, 往container.wait();后面执行, // A执行 container.push(++i); 执行完后container满了,A执行完synchronized代码块释放锁, 紧接着B获取到锁一样从 // container.wait();往后执行,假如这里while换成if, 那么B就会又执行container.push(++i); // 导致容器满了仍然向container push数据,这样就出现错误数据了 while (container.isOverflow()) { try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } container.push(++i); System.out.println("[" + Thread.currentThread().getName() +"] produce " + i); container.notifyAll(); } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }}@SuppressWarnings("ALL")class Consumer2{ private Container2 container; public Consumer2(Container2 container){ this.container = container; } public void consume(){ synchronized (container){ // 这里的while不能换成if // 假设A、B线程都执行到container.wait();这行代码处,A、B处于wait状态, 此时生产者执行了container.notifyAll(); // 然后A线程获得了对象锁, 从container.wait();处往后执行,A调用container.pop()后container变为空的 // 假设这里while换成if, 那么会出现紧接着B获得了对象锁,一样地从从container.wait();处往后执行,但是container已经是空的了 // 任然调用container.pop()就会报出ArrayIndeOutOfBoundExeption了 while (container.isEmpty()){ try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object value = container.pop(); System.out.println("[" + Thread.currentThread().getName() +"] consume " + value); container.notifyAll(); } }}class Container2{ private LinkedList<object> storage; private int capticy; public Container2(int capticy){ this.storage = new LinkedList<>(); this.capticy = capticy; } public void push(Object obj){ this.storage.addLast(obj); } public Object pop(){ return this.storage.removeFirst(); } public int size(){ return this.storage.size(); } public boolean isOverflow(){ return size() >= capticy; } public boolean isEmpty(){ return this.storage.isEmpty(); }}
8 、捕获线程运行期间的异常
package chapter2;public class ExceptionCaught { public static void main(String[] args) { Thread mythread = new Thread(() ->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }, "mythread"); // 捕获异常 mythread.setUncaughtExceptionHandler((thread, throwable) ->{ System.out.println(thread.getName()); throwable.printStackTrace(); }); mythread.start(); mythread.interrupt(); }}
9、ThreadGroup
package chapter2;import java.util.stream.Stream;public class ThreadGroupDemo { public static void main(String[] args) { // main方法也是一个线程, 其名称为main System.out.println(Thread.currentThread().getName()); // main线程的线程组的名称是main System.out.println(Thread.currentThread().getThreadGroup().getName()); // 创建线程组tg1 ThreadGroup tg1 = new ThreadGroup("tg1"); new Thread(tg1, "t1"){ @Override public void run() { try { Thread.sleep(1000); //获取当前线程组的名称 System.out.println(this.getThreadGroup().getName()); //获取当前线程组的父线程组 System.out.println(this.getThreadGroup().getParent()); //获取当前线程组的父线程组的名称 System.out.println(this.getThreadGroup().getParent().getName()); //评估当前线程组的父线程组及子级的线程数量 System.out.println(this.getThreadGroup().getParent().activeCount()); System.out.println("--------------"); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); // 创建线程组tg2 ThreadGroup tg2 = new ThreadGroup("tg2"); tg2.setDaemon(true); Stream.of("T1", "T2", "T3").forEach(name ->{ new Thread(tg2, name){ @Override public void run() { System.out.println(this.getThreadGroup().getName()); System.out.println(tg1.getParent()); System.out.println(this.getThreadGroup().getParent().getName()); // 评估父线程组下的线程数量 System.out.println(this.getThreadGroup().getParent().activeCount()); Thread[] threads = new Thread[tg1.activeCount()]; this.getThreadGroup().enumerate(threads); Stream.of(threads).forEach(System.out::println); System.out.println("***************"); // 测试某个线程是parentOf(group)参数中group的父线程(直接或间接) System.out.println("parentOf: " + this.getThreadGroup().getParent().parentOf(this.getThreadGroup())); System.out.println(Thread.currentThread().getName() +" isDaemon:" + this.isDaemon()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); }); // 打断整个线程组下的所有线程 tg2.interrupt(); }}
10、单例设计模式
version1 (不好)
package chapter3.singleton;public class SingletonVersion1 { private static final SingletonVersion1 instance = new SingletonVersion1(); public static SingletonVersion1 getInstance(){ return instance; }}
version2 (错误)
package chapter3.singleton;public class SingletonVersion2 { private static SingletonVersion2 instance ; private SingletonVersion2(){ } public static SingletonVersion2 getInstance(){ if (instance == null){ instance = new SingletonVersion2(); } return instance; } }
version3 (不好)
package chapter3.singleton;public class SingletonVersion3 { private static SingletonVersion3 instance ; private SingletonVersion3(){ } public static synchronized SingletonVersion3 getInstance(){ if (instance == null){ instance = new SingletonVersion3(); } return instance; }}
version4 (不好)
jvm指令重排序可能NullPointerException
package chapter3.singleton;public class SingletonVersion4 { private static SingletonVersion4 instance ; private static Object LOCK = new Object(); private boolean init; private SomeObject someObject = null; private SingletonVersion4(){ init = true;// try {// Thread.sleep(3000);// } catch (InterruptedException e) {// e.printStackTrace();// } someObject = new SomeObject(); } public static SingletonVersion4 getInstance(){ if (instance == null){ synchronized (LOCK){ if (instance == null){ instance = new SingletonVersion4(); } } } return instance; } public void print(){ this.someObject.doSomething(); } public static void main(String[] args) { Thread t1 = new Thread(() ->{ SingletonVersion4.getInstance(); }, "t1"); Thread t2 = new Thread(() ->{ SingletonVersion4.getInstance().print(); }, "t2"); t1.start(); t2.start(); }}class SomeObject{ public void doSomething(){ System.out.println(Thread.currentThread().getName() + " doSomething..."); }}
version5 (正确、推荐)
package chapter3.singleton;public class SingletonVersion5 { private SingletonVersion5(){ } private static class Singleton{ private static final SingletonVersion5 INSTANCE = new SingletonVersion5(); } public static SingletonVersion5 getInstance(){ return Singleton.INSTANCE; }}
version 6 (正确)
package chapter3.singleton;public class SingletonVersion6 { private SingletonVersion6(){ } enum Singleton{ INSTANCE; private SingletonVersion6 singletonVersion6 = new SingletonVersion6(); public SingletonVersion6 getInstance(){ return singletonVersion6; } } public static SingletonVersion6 getInstance(){ return Singleton.INSTANCE.getInstance(); }}
11、volatile关键字
11.1 高并发的三个特性
原子性
例如 i = 9; 在16位计算机中 可能是16 16位分2次赋值的,比如低16位赋值成功、高16位赋值失败。
在一个或多个操作中,要么全部成功,要么全部失败,不能有中间状态。
a = 1; 保证原子性
a++; 非原子性,执行步骤:1:读取a; 2: 对a加1; 3:将结果赋值给a
a = 1+2 保证原子性, 编译期会确定值
a = a+1 ** 非原子性**,执行步骤:1:读取a; 2: 对a加1; 3:将结果赋值给a
证明 ++value 操作不是原子性的
package volatileDemo;public class AtomDemo { private static int initValue = 1; private static int maxValue = 500; public static void main(String[] args) { new Thread(() ->{ while (initValue < maxValue){ System.out.printf("t1 执行后结果 [%d] \n", ++initValue); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(() ->{ while (initValue < maxValue){ System.out.printf("t2 执行后结果 [%d] \n", ++initValue); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }}
可见性
volatile关键字会保证多线程下的内存可见性及指令执行的有序性
https://www.cnblogs.com/yanlong300/p/8986041.html
例子:
package volatileDemo;public class VolatileDemo { private static int initValue = 1; private static int MAX_VALUE = 50; public static void main(String[] args) { new Thread(() ->{ int localValue = initValue; while (localValue < MAX_VALUE){// System.out.printf("t1线程读取到initValue的值为 [%d] localValue:[%d]\n", initValue, localValue); if (localValue != initValue){ localValue = initValue; System.out.printf("initValue的值已被更新为 [%d]\n", initValue); }// else{// System.out.printf("t1线程读取到initValue的值为 [%d] localValue:[%d]\n", initValue, localValue);// } } System.out.println("t1线程结束执行"); }, "t1").start(); new Thread(() ->{ int localValue = initValue; while (initValue < MAX_VALUE){ localValue++; initValue = localValue; System.out.printf("更新initValue的值为 [%d]\n", initValue); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2线程结束执行"); }, "t2").start(); }}
当private static volatile int *initValue *= 1; 这句代码加上volatile 关键字就是正确的结果了
有序性
value = 3;void exeToCPUA(){ value = 10; # 可能发生重排序 value的赋值发生在isFinsh之后 isFinsh = true;}void exeToCPUB(){ if(isFinsh){ //value一定等于10?! assert value == 10; }}
试想一下开始执行时,CPU A保存着finished在E(独享)状态,而value并没有保存在它的缓存中。(例如,Invalid)。在这种情况下,value会比finished更迟地抛弃存储缓存。完全有可能CPU B读取finished的值为true,而value的值不等于10。 ** 即isFinsh的赋值在value赋值之前。**
这种在可识别的行为中发生的变化称为重排序(reordings)。注意,这不意味着你的指令的位置被恶意(或者好意)地更改。
它只是意味着其他的CPU会读到跟程序中写入的顺序不一样的结果。
为什么会有指令的重排序?
答案:因为为了使缓存能够得到更加合理地利用。
int a=1int b=2
省略一万行代码...
int c=a+b
最后一句放第三行就能让缓存更合理, 原因:cpu将a=1读入CPU高速缓存,然后将b=2读入高速缓存,由于CPU高速缓存的容量很小,所以当执行后面的一万行代码时CPU高速缓存满了,那么就会把a=1、b=2这2个缓存行覆盖掉,当真正执行int c= a+ b时由于CPU高速缓存里面没有数据那么CPU就要重新从主存读取数据然后计算,这样就出现了不必须的重复读取主存的操作,浪费CPU,通过重指令排序让int c=a+b放到第三行则可以缓存能够立即得到利用,将c=a+b的结果计算后可以立即回写主内存,避免后续a=1、b=2的缓存行被其他指令的缓存行覆盖
2、volatile的内存语义
12、比较并交换(CAS)
12.1、使用CAS与使用锁相比的好处
与锁相比,使用比较并交换(CAS)会使程序看起来更复杂,但由于其非阻塞性,它对死锁问题天生免疫,并且线程间的影响也远远比基于锁的方式小的多。更为重要的是,使用无锁的方式完全没有锁竞争带来的开销,也没有线程间频繁调度调来的开销,因此它比基于锁的方式拥有更优越的性能。
12.2、CAS原理
CAS算法的过程是:它包含3个参数CAS(realValue, expectValue, newValue), 其中realValue表示要更新的变量,expectValue表示预期值,newValue表示新值。仅当realValue等与expectValue值时,才将realValue的值变更为newValue,如果realValue与expectValue不相等,说明有其他线程已经更新做了更新,则当前线程什么也不做,最后CAS返回当前realValue的真实值。CAS是抱着乐观的态度去进行的,它总是认为自己可以完成操作,当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然允许失败的线程放弃操作。
简单的说,CAS需要你额外给出一个预期值,也就是你认为现在这个变量应该是什么样子的。如果变量不是你想象的那样,则说明已经被别人修改过了。你就重新读取,再次尝试修改就好了。
在硬件层面大部分的处理器都已支持原子化的CAS指令。在JDK5后,虚拟机便可以使用这个指令来进行原子化操作。
13.3、CAS实现计数器
package 自旋锁计数;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;public class Counter<psvm> {private AtomicInteger atomic = new AtomicInteger(0);private int i = 0;public void count(){i++;}public void safeCount(){while (true){// 1、多核处理器可能会同时运行到这行代码,单核处理器由于时间片分配算法T1执行到这行代码后CPU执行权被T2获取了, 线程T1、T2均通过get()方法返回0// 2、假如T1先执行atomic.compareAndSet(currentValue, ++currentValue)这行代码,// 由于currentValue和atomic的值一致,cas操作成功,atomic变成1,退出循环,// 3、然后T2继续执行atomic.compareAndSet(currentValue, ++currentValue);// 这行代码会发现atomic内部维护的value值1已经与currentValue的值0不相等,不会进行设置值操作// T2继续下次循环, 又执行atomic.get();获取到的currentValue为1, 再次执行compareAndSet时,// atomic为1和currentValue为1相等,成功进行cas操作,然后退出循环int currentValue = atomic.get();boolean success = atomic.compareAndSet(currentValue, ++currentValue);if (success){break;}}}public static void main(String[] args) {Counter counter = new Counter();List<thread> threadList = new ArrayList<>(500);for (int j = 0; j < 500; j++){Thread thread = new Thread(() ->{try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}counter.count();counter.safeCount();});threadList.add(thread);}threadList.stream().forEach(thread -> thread.start());threadList.forEach(thread -> {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("count: " + counter.i);System.out.println("safeCount:" + counter.atomic);}}
13.3、使用CAS实现无锁同步
package atomic;import java.util.concurrent.atomic.AtomicInteger;public class CasLock { private AtomicInteger lock = new AtomicInteger(0); //记录当前获取到锁的线程ID private Long getLockThreadId; public void tryLock(){ while (true){ boolean success = lock.compareAndSet(0, 1); if (success){ getLockThreadId = Thread.currentThread().getId(); break; } try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } public void unLock(){ long currentThreadId = Thread.currentThread().getId(); if (currentThreadId != getLockThreadId){ throw new IllegalStateException("未获取到锁,无需解锁"); } int value = lock.get(); if (value == 1){ lock.set(0); } }}
测试示例:
package atomic;import java.util.ArrayList;import java.util.BitSet;import java.util.List;import java.util.Random;public class CasLockDemo { private static int i = 0; public static void main(String[] args) { BitSet bitSet = new BitSet(); CasLock lock = new CasLock(); List<thread> threadList = new ArrayList<>(500); Random random = new Random(); for (int j =0 ; j < 50000; j++){ Thread t = new Thread(() -> { try { Thread.sleep(random.nextInt(20) + 200); } catch (InterruptedException e) { e.printStackTrace(); } lock.tryLock(); i++; if(bitSet.get(i)){ throw new RuntimeException("lock有问题"); } bitSet.set(i); System.out.println(Thread.currentThread().getName() + " get lock, i=" + i); lock.unLock(); }, "thread-" + j); threadList.add(t); } threadList.forEach(thread -> thread.start()); }}
13、atomic包原子操作类(无锁CAS)
13.1、AtomicInteger
主要方法:
示例:
package atomic;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;public class AtomicIntegerDemo { private AtomicInteger atomic = new AtomicInteger(0); private int i = 0; public void count(){ i++; } public void safeCount(){ atomic.incrementAndGet(); } public static void main(String[] args) { AtomicIntegerDemo counter = new AtomicIntegerDemo(); List<thread> threadList = new ArrayList<>(500); for (int j = 0; j < 500; j++){ Thread thread = new Thread(() ->{ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } counter.count(); counter.safeCount(); }); threadList.add(thread); } threadList.stream().forEach(thread -> thread.start()); threadList.forEach(thread -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("count: " + counter.i); System.out.println("safeCount:" + counter.atomic); }}
13.2、AtomicReference
AtomicReference,顾名思义,就是以原子方式更新对象引用
可以看到,AtomicReference持有一个对象的引用——value,并通过Unsafe类来操作该引用:
为什么需要AtomicReference?难道多个线程同时对一个引用变量赋值也会出现并发问题? 引用变量的赋值本身没有并发问题,也就是说对于引用变量var ,类似下面的赋值操作本身就是原子操作: Foo var = ... ; AtomicReference的引入是为了可以用一种类似乐观锁的方式操作共享资源,在某些情景下以提升性能。
我们知道,当多个线程同时访问共享资源时,一般需要以加锁的方式控制并发:
volatile Foo sharedValue = value;Lock lock = new ReentrantLock();lock.lock();try{ // 操作共享资源sharedValue}finally{ lock.unlock();}
上述访问方式其实是一种对共享资源加悲观锁的访问方式。 而AtomicReference提供了以无锁方式访问共享资源的能力,看看如何通过AtomicReference保证线程安全,来看个具体的例子:
package atomic;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicReference;public class AtomicReferenceCounter { public static void main(String[] args) throws InterruptedException { AtomicReference<integer> ref = new AtomicReference<>(new Integer(0)); List<thread> list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { Thread t = new Thread(new Task(ref), "Thread-" + i); list.add(t); } for (Thread t : list) { t.start(); t.join(); } // 打印2000 System.out.println(ref.get()); }}class Task implements Runnable{ private AtomicReference<integer> reference; public Task(AtomicReference<integer> reference){ this.reference = reference; } @Override public void run() { while (true){ Integer oldValue = reference.get(); boolean success = reference.compareAndSet(oldValue, oldValue + 1); if (success){ break; } } }}
该示例并没有使用锁,而是使用自旋+CAS的无锁操作保证共享变量的线程安全。1000个线程,每个线程对金额增加1,最终结果为2000,如果线程不安全,最终结果应该会小于2000。
通过示例,可以总结出AtomicReference的一般使用模式如下
AtomicReference<object> ref = new AtomicReference<>(new Object());Object oldCache = ref.get();// 对缓存oldCache做一些操作Object newCache = someFunctionOfOld(oldCache); // 如果期间没有其它线程改变了缓存值,则更新boolean success = ref.compareAndSet(oldCache , newCache);
上面的代码模板就是AtomicReference的常见使用方式,看下compareAndSet方法:
该方法会将入参的expect变量所指向的对象和AtomicReference中的引用对象进行比较,如果两者指向同一个对象,则将AtomicReference中的引用对象重新置为update,修改成功返回true,失败则返回false。也就是说,AtomicReference其实是比较对象的引用。
13.2、CAS操作可能存在的ABA问题
13.2.1、介绍
CAS操作可能存在ABA的问题,就是说: 假如一个值原来是A,变成了B,又变成了A,那么CAS检查时会发现它的值没有发生变化,但是实际上却变化了。
一般来讲这并不是什么问题,比如数值运算,线程其实根本不关心变量中途如何变化,只要最终的状态和预期值一样即可。
但是,有些操作会依赖于对象的变化过程,此时的解决思路一般就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A - 2B - 3A。
13.2.3、贵宾充值卡问题(ABA示例)
举例:有一家蛋糕店为了挽留客户,决定为贵宾卡里小于20元的客户一次性充值20元,刺激客户充值和消费,但条件是:每位客户只能被赠送一次。
package atomic;import java.util.concurrent.atomic.AtomicReference;public class CasABAPromblem { private static AtomicReference<integer> money = new AtomicReference<>(19); public static void main(String[] args) { ChargeMoneyWorker chargeMoneyWorker = new ChargeMoneyWorker(money); for (int i = 0; i < 3; i++){ new Thread(chargeMoneyWorker).start(); } ConsumeMoneyWorker consumeMoneyWorker = new ConsumeMoneyWorker(money); new Thread(consumeMoneyWorker).start(); }}class ChargeMoneyWorker implements Runnable{ private AtomicReference<integer> money; public ChargeMoneyWorker(AtomicReference<integer> money){ this.money = money; } @Override public void run() { while (true){ while (true){ Integer m = money.get(); if (m < 20){ boolean success = money.compareAndSet(m, m +20); if (success){ System.out.println("余额小于20元,充值成功, 充值后余额:" + money.get()); break; } } else {// System.out.println("余额大于20元,无需充值"); break; } } } }}class ConsumeMoneyWorker implements Runnable{ private AtomicReference<integer> money; public ConsumeMoneyWorker(AtomicReference<integer> money){ this.money = money; } @Override public void run() { while (true){ while (true){ Integer m = money.get(); if (m > 10){ boolean success = money.compareAndSet(m, m - 10); if (success){ System.out.println("成功消费10元, 余额:" + money.get()); break; } } else { System.out.println("余额不足10元"); } } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
从上面的输出可以到用户的账户被先后反复充值,其原因是用户的账户余额被反复修改,导致修改后又满足的原充值条件,使得充值线程无法正确判断该用户是否已经充值过。
虽然这种情况出现的概率不大,但是依然也是由可能出现的,因此当业务中确实出现这种问题,我们需要注意是否是我们的业务本身就不合理。JDK为我们考虑到了这种情况,使用AtomicStampedReference可以很好的解决这个问题。
13.3、AtomicStampedReference
AtomicStampedReference就是上面所说的加了版本号的AtomicReference。
13.3.1、AtomicStampedReference原理
先来看下如何构造一个AtomicStampedReference对象,AtomicStampedReference只有一个构造器:
可以看到,除了传入一个初始的引用变量initialRef外,还有一个initialStamp变量,initialStamp其实就是版本号(或者说时间戳),用来唯一标识引用变量。
在构造器内部,实例化了一个Pair对象,Pair对象记录了对象引用和时间戳信息,采用int作为时间戳,实际使用的时候,要保证时间戳唯一(一般做成自增的),如果时间戳如果重复,还会出现ABA的问题。
AtomicStampedReference的所有方法,其实就是Unsafe类针对这个Pair对象的操作。 和AtomicReference相比,AtomicStampedReference中的每个引用变量都带上了pair.stamp这个版本号,这样就可以解决CAS中的ABA问题了。
13.3.2、AtomicStampedReference使用示例
// 创建AtomicStampedReference对象,持有Foo对象的引用,初始为null,版本为0AtomicStampedReference<foo> asr = new AtomicStampedReference<>(null,0); int[] stamp=new int[1];Foo oldRef = asr.get(stamp); // 调用get方法获取引用对象和对应的版本号
int oldStamp=stamp[0]; // stamp[0]保存版本号
asr.compareAndSet(oldRef, null, oldStamp, oldStamp + 1) //尝试以CAS方式更新引用对象,并将版本号+1
上述模板就是AtomicStampedReference的一般使用方式,注意下compareAndSet方法:
我们知道,AtomicStampedReference内部保存了一个pair对象,该方法的逻辑如下:
如果AtomicStampedReference内部pair的引用变量、时间戳 与 入参expectedReference、expectedStamp都一样,说明期间没有其它线程修改过AtomicStampedReference,可以进行修改。此时,会创建一个新的Pair对象(casPair方法,因为Pair是Immutable类)。
但这里有段优化逻辑,就是如果 newReference == current.reference && newStamp == current.stamp,说明用户修改的新值和AtomicStampedReference中目前持有的值完全一致,那么其实不需要修改,直接返回true即可。
13.3.3、AtomicStampedReference解决贵宾卡多次充值问题
package atomic;import java.util.concurrent.atomic.AtomicStampedReference;public class ResolveCasABAProblem { private static AtomicStampedReference<integer> money = new AtomicStampedReference<>(19, 0); public static void main(String[] args) { ResolveChargeMoneyWorker chargeMoneyWorker = new ResolveChargeMoneyWorker(money); for (int i = 0; i < 3; i++){ new Thread(chargeMoneyWorker).start(); } ResolveConsumeMoneyWorker consumeMoneyWorker = new ResolveConsumeMoneyWorker(money); new Thread(consumeMoneyWorker).start(); }}class ResolveChargeMoneyWorker implements Runnable{ private AtomicStampedReference<integer> money; public ResolveChargeMoneyWorker(AtomicStampedReference<integer> money){ this.money = money; } @Override public void run() { while (true){ try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } while (true){ Integer m = money.getReference(); if (m < 20){ boolean success = money.compareAndSet(m, m +20, 0 , 1); if (success){ System.out.println("余额小于20元,充值成功, 充值后余额:" + money.getReference()); break; } } else {// System.out.println("余额大于20元,无需充值"); break; } } } }}class ResolveConsumeMoneyWorker implements Runnable{ private AtomicStampedReference<integer> money; public ResolveConsumeMoneyWorker(AtomicStampedReference<integer> money){ this.money = money; } @Override public void run() { while (true){ while (true){ Integer m = money.getReference(); int stamp = money.getStamp(); if (m > 10){ // 这里为什么不是给版本号加1呢? // 假如这里变成 boolean success = money.compareAndSet(m, m - 10, stamp, stamp + 1); // 考虑一种情况,用户账户本身有19元钱, 初始充值状态为0表示为充值, 用户先消费了10元,stamp变为1 // 这时用户还没有充值,账户金额也确实少于20元,这会导致充值线程扫描是发现stamp已变为1,就不会充值了 // 根本原因是用户是否消费与是否充值过无关,充值的状态不能由于其他因素改变 // 这个例子是《实战高并发程序设计》一书中的例子,书中例子没有考虑到这点, // 书中假想的情况是先充值后消费,但如果是先消费再充值就有问题了 boolean success = money.compareAndSet(m, m - 10, stamp, stamp); if (success){ System.out.println("成功消费10元, 余额:" + money.getReference()); break; } } else { System.out.println("余额不足10元"); } } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }}
情况1:先充值后消费
情况2:先消费,后充值
13.4、AtomicMarkableReference
AtomicMarkableReference是AtomicStampedReference的特殊化形式AtomicMarkableReference用于无需知道数据目前具体是哪个版本,只需要知道数据是否被更改过。
前面的客户账户充值例子ABA问题使用AtomicMarkableReference解决的代码如下:
package atomic;import java.util.concurrent.atomic.AtomicMarkableReference;public class AtomicMarkableReferenceDemo { private static AtomicMarkableReference<integer> money = new AtomicMarkableReference<>(19, false); public static void main(String[] args) { MarkableChargeMoneyWorker chargeMoneyWorker = new MarkableChargeMoneyWorker(money); for (int i = 0; i < 3; i++){ new Thread(chargeMoneyWorker).start(); } MarkableConsumeMoneyWorker consumeMoneyWorker = new MarkableConsumeMoneyWorker(money); new Thread(consumeMoneyWorker).start(); }}class MarkableChargeMoneyWorker implements Runnable{ private AtomicMarkableReference<integer> money; public MarkableChargeMoneyWorker(AtomicMarkableReference<integer> money){ this.money = money; } @Override public void run() { while (true){ try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } while (true){ Integer m = money.getReference(); if (m < 20){ boolean success = money.compareAndSet(m, m +20, false , true); if (success){ System.out.println("余额小于20元,充值成功, 充值后余额:" + money.getReference()); break; } } else {// System.out.println("余额大于20元,无需充值"); break; } } } }}class MarkableConsumeMoneyWorker implements Runnable{ private AtomicMarkableReference<integer> money; public MarkableConsumeMoneyWorker(AtomicMarkableReference<integer> money){ this.money = money; } @Override public void run() { while (true){ while (true){ Integer m = money.getReference(); boolean isMarked = money.isMarked(); if (m > 10){ // 消费不更改充值标识 boolean success = money.compareAndSet(m, m - 10, isMarked, isMarked); if (success){ System.out.println("成功消费10元, 余额:" + money.getReference()); break; } } else { System.out.println("余额不足10元"); } } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }}
13.4、AtomicIntegerArray
AtomicIntegerArray本质上是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性,它提供了以下几个核心API:
13.4.1、如果没有AtomicIntegerArray的错误示例
package atomic;public class BadAtomiceIntegerArrayDemo { public static void main(String[] args) throws InterruptedException { int[] vector = new int[1000]; for (int i = 0; i < vector.length; i++){ vector[i] = 0; } BadIncrement increment = new BadIncrement(vector); BadDecrement decrement = new BadDecrement(vector); Thread[] badThreadIncrements = new Thread[1000]; Thread[] badThreadDecrements = new Thread[1000]; for (int i =0 ; i < badThreadIncrements.length; i++){ badThreadIncrements[i] = new Thread(increment); badThreadDecrements[i] = new Thread(decrement); } for (int i =0 ; i < badThreadIncrements.length; i++){ badThreadIncrements[i].start(); badThreadDecrements[i].start(); } for (int i =0 ; i < badThreadIncrements.length; i++){ badThreadIncrements[i].join(); badThreadDecrements[i].join(); } for (int i =0 ; i < vector.length; i++){ if (vector[i] != 0){ System.out.println("Vector["+i+"] : " + vector[i]); } } System.out.println("main end"); }}class BadIncrement implements Runnable{ private int[] vector; public BadIncrement(int[] vector){ this.vector = vector; } @Override public void run() { for (int i = 0; i <vector.length; i++){ vector[i]++; } }}class BadDecrement implements Runnable{ private int[] vector; public BadDecrement(int[] vector){ this.vector = vector; } @Override public void run() { for (int i = 0; i <vector.length; i++){ vector[i]--; } }}
13.4.2、atomicatintegerarray的正确使用
package atomic;import java.util.concurrent.atomic.AtomicIntegerArray;public class AtomiceIntegerArrayDemo { public static void main(String[] args) throws InterruptedException { AtomicIntegerArray vector = new AtomicIntegerArray(1000); Increment increment = new Increment(vector); Decrement decrement = new Decrement(vector); Thread[] threadIncrements = new Thread[1000]; Thread[] threadDecrements = new Thread[1000]; for (int i =0 ; i < threadIncrements.length; i++){ threadIncrements[i] = new Thread(increment); threadDecrements[i] = new Thread(decrement); } for (int i =0 ; i < threadIncrements.length; i++){ threadIncrements[i].start(); threadDecrements[i].start(); } for (int i =0 ; i < threadIncrements.length; i++){ threadIncrements[i].join(); threadDecrements[i].join(); } for (int i =0 ; i < vector.length(); i++){ if (vector.get(i) != 0){ System.out.println("Vector["+i+"] : " + vector.get(i)); } } System.out.println("main end"); }}class Increment implements Runnable{ private AtomicIntegerArray vector; public Increment(AtomicIntegerArray vector){ this.vector = vector; } @Override public void run() { for (int i = 0; i <vector.length(); i++){ vector.getAndIncrement(i); } }}class Decrement implements Runnable{ private AtomicIntegerArray vector; public Decrement(AtomicIntegerArray vector){ this.vector = vector; } @Override public void run() { for (int i = 0; i <vector.length(); i++){ vector.getAndDecrement(i); } }}
工作原理: Increment任务:这个类使用getAndIncrement方法增加数组中所有元素的值 Decrement任务:这个类使用getAndDecrement方法减少数组中所有元素的值
在main方法中创建了1000个元素的AtomicIntegerArray数组, 执行了1000个Increment任务和1000个Decrement任务,在任务的结尾如果没有不一致的错误, 数组中所有元素的值都应该是0,执行程序后会看到程序只将最后的main end消息打印到控制台,因为所有元素值为0
13.5、atomicreferencearray
atomicreferencearray是针对普通自定义对象的数组的原子更新类的通用形式 下面的类实现了atomicintegerarray的功能
package atomic;import java.util.concurrent.atomic.AtomicReferenceArray;public class AtomicReferenceArrayDemo { public static void main(String[] args) throws InterruptedException { AtomicReferenceArray<Integer> vector = new AtomicReferenceArray(1000); for (int i =0; i < vector.length(); i++){ vector.set(i, 0); } ReferenceIncrement increment = new ReferenceIncrement(vector); ReferenceDecrement decrement = new ReferenceDecrement(vector); Thread[] threadIncrements = new Thread[1000]; Thread[] threadDecrements = new Thread[1000]; for (int i =0 ; i < threadIncrements.length; i++){ threadIncrements[i] = new Thread(increment); threadDecrements[i] = new Thread(decrement); } for (int i =0 ; i < threadIncrements.length; i++){ threadIncrements[i].start(); threadDecrements[i].start(); } for (int i =0 ; i < threadIncrements.length; i++){ threadIncrements[i].join(); threadDecrements[i].join(); } for (int i =0 ; i < vector.length(); i++){ if (vector.get(i) != 0){ System.out.println("Vector["+i+"] : " + vector.get(i)); } } System.out.println("main end"); }}class ReferenceIncrement implements Runnable{ private AtomicReferenceArray<Integer> vector; public ReferenceIncrement(AtomicReferenceArray<Integer> vector){ this.vector = vector; } @Override public void run() { for (int i = 0; i <vector.length(); i++){ // 不能用set, 用了set后就是不管内存现有真实值是多少,直接设置为新值, 在cpu时间片切换时会有问题 // 正确的应该是在原有值基础上加1// vector.set(i, current + 1); while (true){ int current = vector.get(i); if(vector.compareAndSet(i, current, current + 1)){ break; } } } }}class ReferenceDecrement implements Runnable{ private AtomicReferenceArray<Integer> vector; public ReferenceDecrement(AtomicReferenceArray<Integer> vector){ this.vector = vector; } @Override public void run() { for (int i = 0; i <vector.length(); i++){ while (true){ int current = vector.get(i); if(vector.compareAndSet(i, current, current - 1)){ break; } } } }}
13.6、atomicintegerfieldupdater
根据数据类型不同, updater有3种, 分别是atomicintegerfieldupdater、atomiclongfieldupdater、atomicreferencefieldupdater
示例场景: 假设某地要进行一次选举。现在模拟这个投票场景,如果选民投了候选人1票,就记为1,否则记为0,最终就是要统计某个选择被投票的次数。
package atomic; import java.util.arrays; java.util.concurrent.atomic.atomicinteger;java.util.concurrent.atomic.atomicintegerfieldupdater; public class AtomicIntegerFieldUpdaterDemo { private static AtomicIntegerFieldUpdater<candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); //作用是为了来验证AtomicIntegerFieldUpdater计算的正确性 private static AtomicInteger allScore = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { Candidate candidate = new Candidate(); Thread[] threads = new Thread[1000]; for (int i = 0; i < 1000; i++){ threads[i] = new Thread(() -> { // 模拟投票过程随机 if (Math.random() > 0.5){ scoreUpdater.incrementAndGet(candidate); allScore.incrementAndGet(); } }); } Arrays.stream(threads).forEach(thread -> thread.start()); for (int i =0 ; i < threads.length; i++){ threads[i].join(); } System.out.println("scoreUpdater:" + scoreUpdater.get(candidate)); System.out.println("allScore:" +allScore.get()); }}class Candidate{ int id; volatile int score;}
上述代码模拟了这个场景,候选人的得票数量记录在Candidate.score中,注意它是一个普通的volatile 变量,而volatile 变量并不会保证线程安全性,只会保证内存可见性和禁止重排序,代码中通过Math.random()来模拟随机投票过程, AtomicInteger allScore = new AtomicInteger(0);这一行代码用来验证 AtomicIntegerFieldUpdater计算的正确性, 运行这段程序会发现 allScore的值总是和scoreUpdater的值相等。
AtomicIntegerFieldUpdater使用注意事项:
Updater只能修改它可见范围内的变量,因为Updater使用反射获得这个变量,如果变量不可见就会报错,比如score声明为private的,就不行。
为了保证变量在多线程环境被正确的读取,它必须是volatile修饰的,如果不修饰也会报错。
由于CAS操作通过对象实例中的偏移量直接进行赋值,因此它不支持static字段(U.objectFieldOffset(field)不支持静态变量),如果被static修饰了也会报错
14、Unsafe
14.1、获取unsafe
package 自旋锁计数;import sun.misc.Unsafe;import java.lang.reflect.Field;public class UnsafeUtil{ public static Unsafe getUnsafe(){ try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); // 这里是null是因为theUnsafe属性是static静态属性 return (Unsafe) field.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException(); } }}
几种Couter计数的性能比较
package 自旋锁计数;import sun.misc.Unsafe;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;public class CounterDemo { private final int THREAD_COUNT = 1000; public static void main(String[] args) { CounterDemo counterDemo = new CounterDemo(); counterDemo.atomicCouterTest(); counterDemo.casCouterTest(); counterDemo.synchronizedCouterTest(); counterDemo.badCouterTest(); } public void atomicCouterTest(){ try{ AtomicIntegerCouter atomicIntegerCouter = new AtomicIntegerCouter(); CouterThread couterThread = new CouterThread(atomicIntegerCouter); List<thread> threadList = new ArrayList<>(); for (int j = 0; j < THREAD_COUNT; j++){ Thread thread = new Thread(couterThread); threadList.add(thread); } long start = System.currentTimeMillis(); threadList.stream().forEach(thread -> thread.start()); threadList.forEach(thread -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long time = System.currentTimeMillis() - start; System.out.println("atomicIntegerCouter: " + atomicIntegerCouter.getValue() + " 耗时: " + time); } catch (Exception e){ e.printStackTrace(); } } public void casCouterTest(){ try{ CasCounter casCounter = new CasCounter(); CouterThread couterThread = new CouterThread(casCounter); List<thread> threadList = new ArrayList<>(); for (int j = 0; j < THREAD_COUNT; j++){ Thread thread = new Thread(couterThread); threadList.add(thread); } long start = System.currentTimeMillis(); threadList.stream().forEach(thread -> thread.start()); threadList.forEach(thread -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long time = System.currentTimeMillis() - start; System.out.println("casCounter: " + casCounter.getValue() + " 耗时: " + time); } catch (Error e){ e.printStackTrace(); } } public void synchronizedCouterTest(){ try{ SynchronizedCouter synchronizedCouter = new SynchronizedCouter(); CouterThread couterThread = new CouterThread(synchronizedCouter); List<thread> threadList = new ArrayList<>(); for (int j = 0; j < THREAD_COUNT; j++){ Thread thread = new Thread(couterThread); threadList.add(thread); } long start = System.currentTimeMillis(); threadList.stream().forEach(thread -> thread.start()); threadList.forEach(thread -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long time = System.currentTimeMillis() - start; System.out.println("synchronizedCounter: " + synchronizedCouter.getValue() + " 耗时: " + time); } catch (Exception e){ e.printStackTrace(); } } public void badCouterTest(){ try{ BadCouter badCouter = new BadCouter(); CouterThread couterThread = new CouterThread(badCouter); List<thread> threadList = new ArrayList<>(); for (int j = 0; j < THREAD_COUNT; j++){ Thread thread = new Thread(couterThread); threadList.add(thread); } long start = System.currentTimeMillis(); threadList.stream().forEach(thread -> thread.start()); threadList.forEach(thread -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long time = System.currentTimeMillis() - start; System.out.println("badCouter: " + badCouter.getValue() + " 耗时: " + time); } catch (Exception e){ e.printStackTrace(); } }}class CouterThread implements Runnable{ private Counter counter; public CouterThread(Counter counter){ this.counter = counter; } @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } counter.incrementAndGet(); }}interface Counter{ int incrementAndGet(); int getAndIncrement(); int getValue();}class AtomicIntegerCouter implements Counter{ private AtomicInteger atomic = new AtomicInteger(0); @Override public int incrementAndGet(){ while (true){ // 1、多核处理器可能会同时运行到这行代码,单核处理器由于时间片分配算法T1执行到这行代码后CPU执行权被T2获取了, 线程T1、T2均通过get()方法返回0 // 2、假如T1先执行atomic.compareAndSet(currentValue, ++currentValue)这行代码, // 由于currentValue和atomic的值一致,cas操作成功,atomic变成1,退出循环, // 3、然后T2继续执行atomic.compareAndSet(currentValue, ++currentValue); // 这行代码会发现atomic内部维护的value值1已经与currentValue的值0不相等,不会进行设置值操作 // T2继续下次循环, 又执行atomic.get();获取到的currentValue为1, 再次执行compareAndSet时, // atomic为1和currentValue为1相等,成功进行cas操作,然后退出循环 int currentValue = atomic.get(); boolean success = atomic.compareAndSet(currentValue, ++currentValue); if (success){ return atomic.get(); } } } @Override public int getAndIncrement() { while (true){ int currentValue = atomic.get(); boolean success = atomic.compareAndSet(currentValue, ++currentValue); if (success){ return currentValue; } } } @Override public int getValue() { return atomic.get(); }}class CasCounter implements Counter{ private volatile int count = 0; private static final Unsafe unsafe = UnsafeUtil.getUnsafe(); private static long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset(CasCounter.class.getDeclaredField("count")); } catch (NoSuchFieldException e) { throw new Error(); } } @Override public int incrementAndGet() { while (true){ boolean success = unsafe.compareAndSwapInt(this, valueOffset, count, count + 1); if (success){ return count; } } } @Override public int getAndIncrement() { while (true){ boolean success = unsafe.compareAndSwapInt(this, valueOffset, count, count + 1); if (success){ return count; } } } @Override public int getValue() { return count; }}class BadCouter implements Counter{ private volatile int count = 0; @Override public int incrementAndGet() { return ++count; } @Override public int getAndIncrement() { return count++; } @Override public int getValue() { return count; }}class SynchronizedCouter implements Counter{ private volatile int count = 0; @Override public synchronized int incrementAndGet() { return ++count; } @Override public synchronized int getAndIncrement() { return count++; } @Override public int getValue() { return count; }}
多次运行示例代码,会发现基本上每次synchronizedCouter的耗时最短,这也说明了synchronized使用锁的方式性能并不一定低。
以上就是java并发编程的入门过程,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网行业资讯频道。