LongAdder介绍
1.Atomic原子类
Atomic的原子类内部使用的是CAS原则,CAS是一个乐观锁,但是如果是在高并发的情况下的话,多个线程不断地竞争
CAS的不断的自旋,非常耗CPU.
高并发环境下,value变量其实是一个热点,也就是多个线程竞争一个热点。
这时候就需要使用的 LongAdder 来替代 Atomic类
2.LongAdder原理
LongAdder的原理就是分散热点,将value分散到一个数组中,不同的线程去找自己的对应的Cell进行修改值,
各个线程对Cell进行CAS操作,这样热点就被分散了,冲突的概率小了,性能就提高了.
如果要返回实际的值,返回所有的数组中的值和base值就行.
2.查看LongAdder的add方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
一看到这种代码就头皮发麻
简单说明一下这几行代码作用,说多了,你也懒得看
第一个if作用:
如果还没有初始化cells数组,就去修改base值
如果修改Base值失败,说明多个线程对base值修改发生了竞争
第二个if作用:
判断有没有cells有没有值,有的话说明已经初始化过cells数组了
知道对应的桶位添加值或者修改值
我感觉你已经不知道我在说什么了!
反正你就知道,如果有多个线程发生了竞争,就去cells数组中找对应的桶位的cell添加或者修改值即可
3.longAccumulate方法
这里的代码特别的恶心,是能助眠的好代码,建议收藏!
简单说明一下几个核心的代码
3.1.创建Cell数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
Cell数组还没有进行初始化,
创建长度为2的Cell数组
在索引1的位置存储第一个Cell对象
3.2.创建桶位Cell对象
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
m - 1) & h
通过路由寻址公式找到对应的索引位置的桶位为空,然后把创建的Cell对象存储进去
更直白就是说,你要蹲坑了,你要去找坑,如果坑位没有人,你就进去
3.3.扩容Cell数组
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
数组长度不够时,2倍扩容
4.总结
你现在肯定是云里雾里,讲的都是什么东西.
你现在就只需要知道 LongAdder
通过 base 和 Cell数组 换取更高的性能
5.附上源码解析
当然这里你可以跳过了,不用看了
5.1.add方法
public void add(long x) {
// as 表示cells引用
// b 表示获取的base值
// v 表示期望值
// m 表示cells数组的长度
// a 表示当前线程命中cell单元格
Cell[] as; long b, v; int m; Cell a;
//条件一: --> true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中
// --> false 表示cells还没有初始化过,当前线程所有的数据都被写到base中
//条件二: --> false 表示cas替换值成功
// --> true 表示发生竞争了,可能需要重试 或者 扩容
if ((as = cells) != null || !casBase(b = base, b + x)) {
//什么时候会进来?
//1.true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中
//2.true cells还没有初始化,但是发生竞争了
// true -> 未竞争 false ->发生竞争
boolean uncontended = true;
//as == null || (m = as.length - 1) < 0 看做是一个条件
//条件一:true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,也就是通过第二个条件进来的
// false --> 说明cells已经初始化了,可以去寻找自己的cell进行赋值
//条件二: getProbe() 可以理解为获取当前线程的hash值, m表示cells长度-1 注意:cells的次方数一定是2的次方
// true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
// false --> 说明当前线程对应的下标cell不为空,下一步需要将 x,添加到cell中
//条件三: true --> 代表cas失败,代表当前线程对应的cell,有竞争
// false --> 表示cas成功
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//都哪些情况会调用这里的方法?
//1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的
//2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
//3.true --> 代表cas失败,代表当前线程对应的cell,有竞争
longAccumulate(x, null, uncontended);
}
}
5.2.longAccumulate方法
//wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//表示线程的hash值
int h;
//条件成立,说明当前线程还未分配hash值
if ((h = getProbe()) == 0) {
//给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
//取出当前线程的hash值赋值给h
h = getProbe();
//为什么?默认情况下,可定写入到了cell[0]的位置,不把他当做一次真正的竞争??
wasUncontended = true;
}
//表示扩容意向, false表示不会扩容 , true有可能会扩容
boolean collide = false; // True if last slot nonempty
//自旋
for (;;) {
//as 表示cells引用
//a 表示当前线程命中的cell
//n 表示cell的数组长度
//v 表示期望值
Cell[] as; Cell a; int n; long v;
//情况1:as不为空表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
//3.true --> 代表cas失败,代表当前线程对应的cell,有竞争[重试或者扩容]
//情况1.1: true 表示当前线程对应的下标位置的cell为null,需要创建cell
if ((a = as[(n - 1) & h]) == null) {
//true 表示当前锁未被占用,false-->表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell
//拿x创建cell
Cell r = new Cell(x); // Optimistically create
//条件一:true 表示当前锁未被占用,false-->表示锁被占用
//条件二:true表示当前线程获取锁成功,false表示当前线程获取锁失败,
if (cellsBusy == 0 && casCellsBusy()) {
//是否创建成功的标记
boolean created = false;
try { // Recheck under lock
//rs表示cells引用
//m 表示cells长度
//j 表示当前线程命中的下标
Cell[] rs; int m, j;
//条件一,条件二,恒成立的
//rs[j = (m - 1) & h] == null 为了防止其他线程已经初始化话过了,防止当前线程再次修改,覆盖掉原来的cell
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
//扩容意向改为false
collide = false;
}
//情况1.2
//wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//情况1.3 :当前线程rehash过后,然后新命中的cell不为空
//true --> 写成功 退出
//false -- > rehash过后命中的cell,也有竞争 这里为重试一次,再重试一次
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//情况1.4.
// 条件一: n >= NCPU true --> 扩容意向改为False ,表示不扩容了 false --> 表示cells还可以扩容
//条件二: true --> 表示其他线程已经扩容过了,当前线程rehash之后重试即可
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//默认开始为false 的, 设置为true,需要扩容,但是不一定真的发生扩容
else if (!collide)
collide = true;
//情况6.真正的扩容逻辑
//条件一: cellsBusy == 0 true --> 表示当前无锁状态,当前线程可以去竞争这把锁
//条件二:casCellsBusy() true --> 表示当前线程获取锁成功,可以执行扩容逻辑 ,false 表示有其他线程在做相关的操作
// 尝试两次之后
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
//n<<1 翻倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重置当前线程hash值
h = advanceProbe(h);
}
//情况2:前置条件,cells为进行初始化,as为null
//条件一: cellsBusy为true当前未加锁
//条件二: cells == as 为什么? 因为其他线程在你给as赋值之后修改了cells
//条件三: true 表示获取锁成功 ,会把cellBusy 设置为 1 , false 表示其他线程在持有这个锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
// 为什么还有一次判断呢?
//防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
if (init)
break;
}
//情况三:
//1.当前cellBusy加锁状态,表示其他线程正在初始化cells,所以当前线程累加到base
//2.cells被其他线程初始化之后,当前线程需要将数据累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
以上就是LongAdder原理及创建使用示例详解的详细内容,更多关于LongAdder创建使用的资料请关注编程网其它相关文章!