LongAdder的实现原理是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
AtomicLong的实现原理图:
LongAdder是JDK8新增的原子操作类,它提供了一种新的思路,既然AtomicLong的性能瓶颈是由于大量线程同时更新一个变量造成的,那么能不能把这个变量拆分出来,变成多个变量,然后让线程去竞争这些变量,最后合并即可?LongAdder的设计精髓就在这里,通过将变量拆分成多个元素,降低该变量的并发度,最后进行合并元素,变相的减少了CAS的失败次数。
LongAdder的实现原理图:
常用方法
public class LongAdder extends Striped64 implements Serializable { //构造方法 public LongAdder() { } //加1操作 public void increment(); //减1操作 public void decrement(); //获取原子变量的值 public long longValue(); }
下面给出一个简单的例子,模拟50线程同时进行更新
package com.xue.testLongAdder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAdder; public class Main { public static void main(String[] args) { LongAdder adder = new LongAdder(); ExecutorService threadPool = Executors.newFixedThreadPool(20); for (int i = 0; i < 50; i++) { Runnable r = () -> { adder.add(1); }; threadPool.execute(r); } threadPool.shutdown(); //若关闭线程池后,所有任务执行完毕,则isTerminated()返回true while (!threadPool.isTerminated()) { System.out.println(adder.longValue()); break; } } }
输出结果是50
其中,如果对线程池不熟悉的同学,可以先参考我的另外一篇文章说说线程池
原理解析
类图
LongAdder内部维护了一个Cell类型的数组,其中Cell是Striped64中的一个静态内部类。
Cell类
abstract class Striped64 extends Number { @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } } }
Cell用来封装被拆分出来的元素,内部用一个value字段保存当前元素的值,等到需要合并时,则累加所有Cell数组中的value。Cell内部使用CAS操作来更新value值,对CAS操作不熟悉的同学,可以参考我的另外一篇文章浅探CAS实现原理
可以注意到,Cell类被 @sun.misc.Contended注解修饰了,这个注解是为了解决伪共享问题的,什么是伪共享?
一个缓存行可以存储多个变量(存满当前缓存行的字节数);而CPU对缓存的修改又是以缓存行为最小单位的,在多线程情况下,如果需要修改“共享同一个缓存行的变量”,就会无意中影响彼此的性能,这就是伪共享(False Sharing)。
对伪共享还不理解的同学,可以参考这位大佬的文章伪共享(False Sharing)底层原理及其解决方式
而LongAdder采用的是Cell数组,而数组元素是连续的,因此多个Cell对象共享一个缓存行的情况非常普遍,因此这里@sun.misc.Contended注解对单个Cell元素进行字节填充,确保一个Cell对象占据一个缓存行,即填充至64字节。
关于如何确定一个对象的大小,可以参考我的另外一篇文章对象的内存布局,怎样确定对象的大小,这样可以算出来,还需要填充多少字节。
longValue()
longValue()返回累加后的值
public long longValue() { return sum(); } public long sum() { Cell[] as = cells; Cell a; long sum = base; //当Cell数组不为null时,进行累加后返回,否则直接返回基准数base if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
这可能是LongAdder中最简单的方法了,就不进行赘述了。什么,你要看复杂的?好的,这就来了。
increment()
public void increment() { add(1L); } public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用, //如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。 boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
其中longAccumulate()的解析如下:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0, // 说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员 // 静态私有变量probeGenerator的值,后面会详细将hash值的生成; //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况 //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组 //后争用的), // 是第一次执行base的cas累加操作失败; //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false, // 那么这里会将其重新置为true;第一次执行操作失败; //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0; int h; if ((h = getProbe()) == 0) { //初始化ThreadLocalRandom; ThreadLocalRandom.current(); // force initialization //将h设置为0x9e3779b9 h = getProbe(); //设置未竞争标记为true wasUncontended = true; } //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败; // collide=true代表有冲突,collide=false代表无冲突 boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; //这个主干if有三个分支 //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4) //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2) //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作, // 及cellbusy=1; // 则尝试将累加值通过cas累加到base上 //先看主分支一 if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { //cellsBusy == 0 代表当前没有线程cells数组做修改 if (cellsBusy == 0) { //将要累加的x值作为初始值创建一个新的Cell对象, Cell r = new Cell(x); //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁 if (cellsBusy == 0 && casCellsBusy()) { //标记Cell是否创建成功并放入到cells数组被hash的位置上 boolean created = false; try { Cell[] rs; int m, j; //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //将新的cell设置到该位置 rs[j] = r; created = true; } } finally { //去掉锁 cellsBusy = 0; } //生成成功,跳出循环 if (created) break; //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length] // 已经有其它线程设置了cell了, // 继续执行循环。 continue; } } //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false collide = false; } else if (!wasUncontended) //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。 wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 // 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支 collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { //检查cells是否已经被扩容 if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //为当前线程重新计算hash值 h = advanceProbe(h); //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0, // 先尝试获取cellsBusy锁。 }else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上 if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //解锁 cellsBusy = 0; } //如果init为true说明初始化成功,跳出循环 if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // Fall back on using base break; } }
看完上述内容,你们掌握LongAdder的实现原理是什么的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网行业资讯频道,感谢各位的阅读!