一、什么是ConcurrentHashMap
ConcurrentHashMap
和HashMap
一样,是一个存放键值对的容器。使用hash
算法来获取值的地址,因此时间复杂度是O(1)。查询非常快。
同时,ConcurrentHashMap
是线程安全的HashMap
。专门用于多线程环境。
二、ConcurrentHashMap和HashMap以及Hashtable的区别
2.1 HashMap
HashMap
是线程不安全的,因为HashMap
中操作都没有加锁,因此在多线程环境下会导致数据覆盖之类的问题,所以,在多线程中使用HashMap
是会抛出异常的。
2.2 HashTable
HashTable
是线程安全的,但是HashTable
只是单纯的在put()
方法上加上synchronized
。保证插入时阻塞其他线程的插入操作。虽然安全,但因为设计简单,所以性能低下。
2.3 ConcurrentHashMap
ConcurrentHashMap
是线程安全的,ConcurrentHashMap
并非锁住整个方法,而是通过原子操作和局部加锁的方法保证了多线程的线程安全,且尽可能减少了性能损耗。
由此可见,HashTable
可真是一无是处…
三、ConcurrentHashMap原理
这一节专门介绍ConcurrentHashMap
是如何保证线程安全的。如果想详细了解ConcurrentHashMap
的数据结构,请参考HashMap
。
3.1 volatile修饰的节点数组
请看源码
//ConcurrentHashMap使用volatile修饰节点数组,保证其可见性,禁止指令重排。transient volatile Node[] table;
再看看HashMap
是怎么做的
//HashMap没有用volatile修饰节点数组。transient Node[] table;
显然,HashMap
并不是为多线程环境设计的。
3.2 ConcurrentHashMap的put()方法
//put()方法直接调用putVal()方法public V put(K key, V value) { return putVal(key, value, false);}//所以直接看putVal()方法。final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) {K ek;if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break;}Node pred = e;if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break;} } } else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null;}
我来给大家讲解一下步骤把。
public V put(K key, V value) {
首先,put()
方法是没有用synchronized
修饰的。
for (Node[] tab = table;;)
新插入一个节点时,首先会进入一个死循环,
情商高的就会说,这是一个乐观锁
进入乐观锁后,
if (tab == null || (n = tab.length) == 0) tab = initTable();
如果tab
未被初始化,则先将tab
初始化。此时,这轮循环结束,因为被乐观锁锁住,开始下一轮循环。
第二轮循环,此时tab
已经被初始化了,所以跳过。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin}
接下来通过key
的hash
值来判断table中是否存在相同的key,如果不存在,执行casTabAt()
方法。
注意,这个操作时不加锁的,看到里面的那行注释了么// no lock when adding to empty bin
。位置为空时不加锁。
这里其实是利用了一个CAS操作。
CAS(Compare-And-Swap):比较并交换
这里就插播一个小知识,CAS就是通过一个原子操作,用预期值去和实际值做对比,如果实际值和预期相同,则做更新操作。
如果预期值和实际不同,我们就认为,其他线程更新了这个值,此时不做更新操作。
而且这整个流程是原子性的,所以只要实际值和预期值相同,就能保证这次更新不会被其他线程影响。
好了,我们继续。
既然这里用了CAS操作去更新值,那么就存在两者情况。
- 实际值和预期值相同
相同时,直接将值插入,因为此时是线程安全的。好了,这时插入操作完成。使用break;
跳出了乐观锁。循环结束。 - 实际值和预期值不同
不同时,不进行操作,因为此时这个值已经被其他线程修改过了,此时这轮操作就结束了,因为还被乐观锁锁住,进入第三轮循环。
第三轮循环中,前面的判断又会重新执行一次,我就跳过不说了,进入后面的判断。
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
这里判断的是tab
的状态,MOVED
表示在扩容中,如果在扩容中,帮助其扩容。帮助完了后就会进行第四轮循环。
终于,来到了最后一轮循环。
else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent)e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }}
上面的判断都不满足时,就会进入最后的分支,这条分支表示,key
的hash
值位置不为null
(之前的判断是hash
值为null
时直接做插入操作),表示发生了hash
冲突,此时节点就要通过链表的形式存储这个插入的新值。Node
类是有next
字段的,用来指向链表的下一个位置,新节点就往这插。
synchronized (f) {
看,终于加排它锁了,只有在发生hash冲突的时候才加了排它锁。
if (tabAt(tab, i) == f) {if (fh >= 0) {
重新判断当前节点是不是第二轮判断过的节点,如果不是,表示节点被其他线程改过了,进入下一轮循环,
如果是,再次判断是否在扩容中,如果是,进入下一轮循环,
如果不是,其他线程没改过,继续走,
for (Node e = f;; ++binCount) {
for循环,循环遍历这个节点上的链表,
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break;}
找到一个hash值相同,且key也完全相同的节点,更新这个节点。
如果找不到
if ((e = e.next) == null) {pred.next = new Node(hash, key, value, null); break;}
往链表最后插入这个新节点。因为在排他锁中,这些操作都可以直接操作。终于到这插入就基本完成了。
总结
做插入操作时,首先进入乐观锁,
然后,在乐观锁中判断容器是否初始化,
如果没初始化则初始化容器,
如果已经初始化,则判断该hash
位置的节点是否为空,如果为空,则通过CAS操作进行插入。
如果该节点不为空,再判断容器是否在扩容中,如果在扩容,则帮助其扩容。
如果没有扩容,则进行最后一步,先加锁,然后找到hash
值相同的那个节点(hash冲突),
循环判断这个节点上的链表,决定做覆盖操作还是插入操作。
循环结束,插入完毕。
3.3 ConcurrentHashMap的get()方法
//ConcurrentHashMap的get()方法是不加锁的,方法内部也没加锁。public V get(Object key)
看上面这代码,ConcurrentHashMap
的get()
方法是不加锁的,为什么可以不加锁?因为table
有volatile
关键字修饰,保证每次获取值都是最新的。
//Hashtable的get()是加锁的,所以性能差。public synchronized V get(Object key)
再看看Hashtable
,差距啊。
四、使用场景
嗯,多线程环境下,更新少,查询多时使用的话,性能比较高。
乐观锁嘛,认为更新操作时不会被其他线程影响。所以时候再更新少的情况下性能高。
对你有帮助吗?点个赞吧~
来源地址:https://blog.csdn.net/qq_42068856/article/details/126091526