介绍
据官方所述,mango Cache是对Guava Cache基于go的部分实现,同时mangoCache参考了Caffeine以及go-tinylfu.
支持以下缓存管理策略:
- LRU
- Segmented LRU(默认)
- TinyLFU(实验性)
本文将从源码对其进行解析,重点将放在loadingCache(一种可以自定义如何载入新内存的cache)上面.
整体架构
mango Cache的主体功能由localCache结构体(local.go)实现,
type localCache struct {
cache cache // 真正存放数据的地方
expireAfterAccess time.Duration //用户自定义的过期以及刷新时间
expireAfterWrite time.Duration
refreshAfterWrite time.Duration
policyName string //缓存管理策略
onInsertion Func //钩子函数
onRemoval Func
loader LoaderFunc //自定义加载和重载函数
reloader Reloader
stats StatsCounter //状态管理器
cap int //缓存容量
// 访问时的缓存管理策略
accessQueue policy
// 写入时的缓存管理策略,只有在expireAfterWrite或refreshAfterWrite被设置以后才启用
writeQueue policy
// 用于processEntries的事件队列
events chan entryEvent
// 记录距离上一次写期间发生的读次数,用于判断是否进行清扫
readCount int32
// 用于关闭由localCache创建的协程
closing int32
closeWG sync.WaitGroup
}
真正存储数据的结构体是cache(policy.go),所有缓存数据都存储在其中
type cache struct {
size int64 // 缓存大小
segs [segmentCount]sync.Map // 分片sync.map, map[key]*entry
}
entry是存放一个缓存数据的单元
type entry struct {
hash uint64 //key的hash值
accessTime int64
writeTime int64
invalidated int32 //是否有效
loading int32 //是否处于载入中状态
key Key
value atomic.Value // 存储的值
// 下面的属性只被缓存管理策略操作,所以不需要原子性的访问
accessList *list.Element //此entry目前所在的访问链表中的位置
writeList *list.Element //此entry目前所在的写入链表中的位置
listID uint8 //此entry目前所在的缓存段(SLRU)
}
localCache使用事件队列来管理并发读写事件,事件队列是一个chan,其中流转着entryEvent(即数据以及对应的缓存事件),localCache通过processEntries协程来处理各种读写事件.之后将详细讲解mango Cache是如何进行读写操作的.
localCache的write、access等操作底层是通过操作cache、accessQueue以及writeQueue从而实现的,而localCache还会负责清扫过期数据的工作.
前面提到了mango Cache支持诸如LRU、SLRU以及TinyLFU等缓存管理策略,其在localCache中即为accessQueue以及writeQueue,负责对缓存的淘汰准入等操作.
初始化流程
mango Cache提供了两种cache---普通Cache以及LoadingCache,这两者都是接口,而localCache实现了这两个接口,由于LoadingCache继承了普通Cache,因此本文只讲解LoadingCache.
func NewLoadingCache(loader LoaderFunc, options ...Option) LoadingCache {
c := newLocalCache()
c.loader = loader
for _, opt := range options {
opt(c)
}
c.init()
return c
}
NewLoadingCache函数先初始化一个LoadingCache,然后根据用户传入的自定义载入函数和一些配置来初始化LoadingCache.配置包括注册插入或者删除时触发的钩子函数以及过期时间等等.然后调用localCache.init.
func (c *localCache) init() {
c.accessQueue = newPolicy(c.policyName)
c.accessQueue.init(&c.cache, c.cap)
if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 {
c.writeQueue = &recencyQueue{}
} else {
c.writeQueue = discardingQueue{}
}
c.writeQueue.init(&c.cache, c.cap)
c.events = make(chan entryEvent, chanBufSize)
c.closeWG.Add(1)
go c.processEntries()
}
localCache.init会根据用户传入的缓存管理策略名字来初始化accessQueue然后根据是否有写过期和写刷新配置来决定是否初始化写入队列.接着创建事件队列并开启事件处理协程.到此为止,cache启动完成.
读流程
LoadingCache的Get操作可以通过key获取缓存值,其流程为:
先从主缓存中查询entry
若未查询到entry,则记录miss并且调用用户自定义的load方法加载缓存值并返回
若查询到entry,先检查是否过期
- 若过期且没有设置loader则直接向事件处理协程发送eventDelete
- 若过期但设置了loader,则异步更新entry值
若没有过期则更新访问时间并向事件处理协程发送eventAccess然后记录命中
最后返回entry
func (c *localCache) Get(k Key) (Value, error) {
en := c.cache.get(k, sum(k)) //计算key的hash并查询该key
if en == nil {
c.stats.RecordMisses(1)
return c.load(k)
}
// 检查entry是否需要更新
now := currentTime()
if c.isExpired(en, now) {
if c.loader == nil { //如果没有设置加载器则直接删除该entry
c.sendEvent(eventDelete, en)
} else {
//对于loadingCache,我们不删除这个entry
//而是把它暂时留在缓存中,所以用户依旧可以读取到旧的缓存值
c.setEntryAccessTime(en, now)
c.refreshAsync(en)
}
c.stats.RecordMisses(1)
} else {
c.setEntryAccessTime(en, now)
c.sendEvent(eventAccess, en)
c.stats.RecordHits(1)
}
return en.getValue(), nil
}
需要注意一下这里的refreshAsync函数:
func (c *localCache) refreshAsync(en *entry) bool {
if en.setLoading(true) {
// 如果此entry没有在加载
if c.reloader == nil {
go c.refresh(en)
} else {
c.reload(en)
}
return true
}
return false
}
如果没有用户设置的重载器,就异步执行refresh,refresh函数实际上就是对entry进行加载.
而如果有重载器那么就同步执行用户自定义的reload函数.
写流程
loadingCache的Put操作与Get操作类似,流程如下:
先去主缓存查询key是否存在,若查询到对应的entry,那么直接更新entry
若没有查询到对应的entry,说明其不存在,因此根据key,value初始化一个新entry
如果缓存容量足够,则让主缓存存储该entry,此时会再次检查主存中是否有该entry(解决并发问题)
- 若cen不为空,说明主缓存中已经存在该entry,直接修改该entry即可
- 若cen为空,说明主缓存中还不存在该entry,那么就会在主缓存中存储该entry
最后向事件处理协程发送eventWrite事件
func (c *localCache) Put(k Key, v Value) {
h := sum(k)
en := c.cache.get(k, h)
now := currentTime()
if en == nil {
en = newEntry(k, v, h)
c.setEntryWriteTime(en, now)
c.setEntryAccessTime(en, now)
// 直接将新value添加进缓存(在缓存容量足够的时候)
if c.cap == 0 || c.cache.len() < c.cap {
cen := c.cache.getOrSet(en)
if cen != nil {
cen.setValue(v)
c.setEntryWriteTime(cen, now)
en = cen
}
}
} else {
// 更新entry
en.setValue(v)
c.setEntryWriteTime(en, now)
}
c.sendEvent(eventWrite, en)
}
//当entry在缓存中存在,则返回该entry,否则存储该entry
func (c *cache) getOrSet(v *entry) *entry {
seg := c.segment(v.hash)
en, ok := seg.LoadOrStore(v.key, v)
if ok {
return en.(*entry)
}
atomic.AddInt64(&c.size, 1)
return nil
}
事件处理机制
主流程
mango Cache通过entryEvent chan以及processEntries协程来处理并发读写事务
缓存事件一共四个,分别为写入、访问、删除以及关闭.每个业务协程通过向localCache的events chan发送entryEvent通知事件处理协程,进而实现并发读写.
而processEntries协程内,会不断从events chan内取出entryEvent并执行对应的操作,在write、access以及delete操作后会执行清理工作(具体清扫工作由expireEntries函数执行)
type event uint8
const (
eventWrite event = iota
eventAccess
eventDelete
eventClose
)
type entryEvent struct {
entry *entry
event event
}
func (c *localCache) processEntries() {
defer c.closeWG.Done()
for e := range c.events {
switch e.event {
case eventWrite: //写入事务
c.write(e.entry)
c.postWriteCleanup() //清理操作
case eventAccess: //访问事务
c.access(e.entry)
c.postReadCleanup() //清理操作
case eventDelete:
if e.entry == nil { //InvalidateAll函数中使用
c.removeAll()
} else {
c.remove(e.entry) //移除单个entry
}
c.postReadCleanup() //清理操作
case eventClose:
if c.reloader != nil {
// 停止所有refresh工作
c.reloader.Close()
}
c.removeAll()
return
}
}
}
write
由于事件处理机制对于access、delete和write的操作类似,因此这里只讲解较为复杂的write操作:
首先通过调用底层访问队列以及写入队列的write方法
触发用户自定义的钩子函数
如果write方法返回值不为空,说明有entry被驱逐,
因此需要从写入队列将其删除,同时记录驱逐并触发用户自定义的钩子函数
func (c *localCache) write(en *entry) {
ren := c.accessQueue.write(en)
c.writeQueue.write(en)
if c.onInsertion != nil {
c.onInsertion(en.key, en.getValue())
}
if ren != nil { //有entry被驱逐出了缓存
c.writeQueue.remove(ren)
c.stats.RecordEviction()
if c.onRemoval != nil {
c.onRemoval(ren.key, ren.getValue())
}
}
}
后面将详细讲述底层访问队列以及缓存管理是如何实现的.
清理工作
前面讲到过每次进行完write、access以及delete操作后会执行清理工作.具体地,write操作会触发postWriteCleanup而access和delete操作会触发postReadCleanup.
postReadCleanup会根据当前距离上一次写的read操作次数是否达到清理工作阈值来决定是否清理,这个阈值是64,也就是说每隔64次read操作就会触发一次清理工作
而postWriteCleanup将在每一次write操作之后触发
真正的清理工作由expireEntries函数完成,它一次清理工作最多只会清理16个entry避免了对事件处理的长时间阻塞.
func (c *localCache) postReadCleanup() {
if atomic.AddInt32(&c.readCount, 1) > drainThreshold {
atomic.StoreInt32(&c.readCount, 0)
c.expireEntries()
}
}
// 在添加完entry以后再执行
func (c *localCache) postWriteCleanup() {
atomic.StoreInt32(&c.readCount, 0)
c.expireEntries()
}
//清理工作函数
func (c *localCache) expireEntries() {
remain := drainMax
now := currentTime()
if c.expireAfterAccess > 0 {
expiry := now.Add(-c.expireAfterAccess).UnixNano()
c.accessQueue.iterate(func(en *entry) bool {
if remain == 0 || en.getAccessTime() >= expiry {
// 找到了第一个没有过期的entry或者清理entry数足够了
return false
}
c.remove(en)
c.stats.RecordEviction()
remain--
return remain > 0
})
}
if remain > 0 && c.expireAfterWrite > 0 {
...
}
if remain > 0 && c.loader != nil && c.refreshAfterWrite > 0 {
...
}
}
缓存管理
localCache在初始化过程中也初始化了缓存管理策略,由于localCache的writeQueue默认使用LRU缓存淘汰策略,而accessQueue支持LRU、SLRU以及TinyLFU三种缓存淘汰策略,本节将着重讲解accessQueue.
什么是LRU?
LRU是Least Recently Used的缩写,即最近最少使用.在mango Cache中LRU依靠go SDK自带的List(双向链表)实现,新缓存条目会被插入List头部,如果List内元素达到容量上限则删除List尾部的元素,此元素也正是最近最少使用的元素.LRU可以使得主缓存内的缓存条目永远是最近被访问的,不是最近访问的元素将被淘汰.
如果一个不是经常使用的数据,偶尔或者周期性的被使用,那么该数据会被加到LRU链表头部,而这种不经常使用的数据,放在链表头部,占用了空间;一直等到LRU淘汰完,才会被剔除链表;如果这种数据一次性过多,那么链表数据都是这种无用的数据,从而会导致缓存命中率低下,影响系统性能.
什么是SLRU?
Segmented LRU(SLRU)是一种LRU的改进,主要把在一个时间窗口内命中至少2次的记录和命中1次的单独存放,这样就可以把短期内较频繁的缓存元素区分开来.具体做法上,SLRU包含2个固定尺寸的LRU,一个叫Probation段(A1),一个叫Protection段(A2).新记录总是插入到A1中,当A1的记录被再次访问,就把它移到A2,当A2满了需要驱逐记录时,会把驱逐记录插入到A1中.
在mango Cache的实现中,Protection段与Probation段大小比为4:1.
SLRU是mango Cache的默认缓存管理策略
什么是TinyLFU?
TinyLFU是一种空间利用率很高的新的数据结构,可以在较大访问量的场景下近似的替代LFU的数据统计部分(meta-data),其采用类似布隆过滤器的位计数器来实现对每个key的访问次数的粗略统计.(由于哈希冲突的存在,位计数器的结果将偏大)
对于较为静态的访问分布,各种的缓存管理策略的差异是微不足道的,而对于偏倚较大的负载场景,TinyLFU体现出了较大的优势.即便是使用了TinyLFU准入策略进行优化的LRU也获得了较大的提升.
如果想要详细了解TinyLFU请阅读论文TinyLFU: A Highly Efficient Cache Admission Policy 以及一种简略实现讲解LRU、LFU、TinyLFU缓存算法
mango Cache中的TinyLFU
mango Cache的实现中,实际上是实现了改进版的TinyLFU也就是Window-TinyLFU,这个缓存数据结构也在论文中有讲解,简而言之就是缓存由window Cache、doorKeeper(布隆过滤器)、counter以及SLRU组成.主缓存使用SLRU驱逐策略和TinyLFU准入策略,window Cache仅使用LRU驱逐策略无准入策略.
主缓存中的SLRU策略的A1和A2区域被静态划分开来,80%空间被分配给热点元素(A2),被驱逐者则从20%的非热项(A1)中选取.任何到来的元素总是允许进入window Cache, window Cache的淘汰元素有机会进入主缓存,如果在经过TinyLFU准入策略检验后被主缓存接受,那么该元素将进入主缓存,此时Window-TinyLFU的淘汰者就是主缓存的淘汰者(从A1段选出),否则该元素将被window Cache淘汰.
type tinyLFU struct {
filter bloomFilter // doorkeeper
counter countMinSketch // 4bit计数器
additions int //目前采样数量
samples int //采样窗口大小
lru lruCache //window Cache
slru slruCache //主缓存
}
接下来本文将详细讲述mango Cache中tinyLFU的具体实现
counter
counter是实现TinyLFU的重点,对于访问次数的粗略统计就是通过此数据结构实现的.
const sketchDepth = 4 //hash次数
type countMinSketch struct {
counters []uint64
mask uint32
}
可以看到, counter由一个uint64类型的数组和一个mask组成, 因为我们使用的是4次hash的4bit计数器,因此数组的每个元素实际上是4个计数器,可以通过位操作来实现访问.
counter的初始化
func (c *countMinSketch) init(width int) {
size := nextPowerOfTwo(uint32(width)) >> 2 //size = (width * 4 * 4) bits / 64
if size < 1 {
size = 1
}
c.mask = size - 1
if len(c.counters) == int(size) {
c.clear()
} else {
c.counters = make([]uint64, size)
}
}
我们通过传入的width确定需要的计数器数量(size大小)
counter的使用
counter最关键的操作就是按hash增加计数器值以及根据hash估算该hash的计数器值:
func (c *countMinSketch) add(h uint64) {
h1, h2 := uint32(h), uint32(h>>32)
for i := uint32(0); i < sketchDepth; i++ { //这里使用折叠法求得hash值
idx, off := c.position(h1 + i*h2)
c.inc(idx, (16*i)+off)
}
}
// 根据给定hash值返回对应计数器中最小的那个计数器值
func (c *countMinSketch) estimate(h uint64) uint8 {
h1, h2 := uint32(h), uint32(h>>32)
var min uint8 = 0xFF
for i := uint32(0); i < sketchDepth; i++ {
idx, off := c.position(h1 + i*h2)
count := c.val(idx, (16*i)+off)
if count < min {
min = count
}
}
return min
}
里面的position、val以及inc函数都是对应的位操作,有兴趣的话可以进一步研究下.
需要注意的是estimate函数用于求给定hash对应计数器中最小计数器的值,这是因为有哈希碰撞的情况,因此计数器的值始终会大于等于真实的访问次数,因此这里采用多次hash取其中最小值的方法来减少误差.
同时,为了保证计数器的时效性,counter实现了新鲜度机制,将在一定条件下触发reset:
//将每个计数器的值减半
func (c *countMinSketch) reset() {
for i, v := range c.counters {
if v != 0 {
c.counters[i] = (v >> 1) & 0x7777777777777777
}
}
}
lruCache
lruCache是LRU的实现,在mango Cache中是一个链表.
type lruCache struct {
cache *cache
cap int
ls list.List
}
func (l *lruCache) init(c *cache, cap int) {
l.cache = c
l.cap = cap
l.ls.Init()
}
slruCache
slruCache是SLRU的实现,在mango Cache中由两个链表组成.
const (
protectedRatio = 0.8
)
type slruCache struct {
cache *cache
probationCap int
probationLs list.List
protectedCap int
protectedLs list.List
}
func (l *slruCache) init(c *cache, cap int) {
l.cache = c
l.protectedCap = int(float64(cap) * protectedRatio)
l.probationCap = cap - l.protectedCap
l.probationLs.Init()
l.protectedLs.Init()
}
slruCache中,值得注意的点是它的写入策略:
func (l *slruCache) write(en *entry) *entry {
if en.accessList != nil {
// entry已存在,直接修改该entry
l.markAccess(en)
return nil
}
// 尝试将新entry加入probation段
cen := l.cache.getOrSet(en)
if cen == nil {
en.listID = probationSegment //将其加入probation段
en.accessList = l.probationLs.PushFront(en)
} else {
// 该entry已存在,直接修改它的值
cen.setValue(en.getValue())
cen.setWriteTime(en.getWriteTime())
if cen.accessList == nil {
//该entry已载入缓存但是还没有注册到SLRU管理的链表中
cen.listID = probationSegment
cen.accessList = l.probationLs.PushFront(cen)
} else {
l.markAccess(cen)
}
}
// probation段超出容量但list并没有超出容量
if l.probationCap > 0 && l.probationLs.Len() > l.probationCap &&
l.length() > (l.probationCap+l.protectedCap) {
// 移除并返回probation段尾部的entry
en = getEntry(l.probationLs.Back())
return l.remove(en)
}
return nil
}
//记录访问情况
func (l *slruCache) markAccess(en *entry) {
if en.listID == protectedSegment {
// entry位于在protected段
l.protectedLs.MoveToFront(en.accessList)
return
}
// 若entry位于probation段,则将其提升至protected段
en.listID = protectedSegment
l.probationLs.Remove(en.accessList)
en.accessList = l.protectedLs.PushFront(en)
if l.protectedCap > 0 && l.protectedLs.Len() > l.protectedCap {
// protected段超出容量限制,则淘汰其尾部的entry将其加入probation段
en = getEntry(l.protectedLs.Back())
en.listID = probationSegment
l.protectedLs.Remove(en.accessList)
en.accessList = l.probationLs.PushFront(en)
}
}
这样一来,就实现了SLRU的缓存管理策略.
filter
filter是一个布隆过滤器,这里不展开讲解其实现细节,只需要了解布隆过滤器可以通过key的hash来确定这个key是否在filter中,并且其只占用非常小的内存.如果想要详细了解布隆过滤器,可以参考这篇文章:布隆过滤器
type bloomFilter struct {
numHashes uint32 // 每个元素使用的hash数量
bitsMask uint32 // 位向量大小
bits []uint64 // 过滤器位向量
}
//根据给定的插入元素数量以及假阳性率初始化布隆过滤器
func (f *bloomFilter) init(ins int, fpp float64) {
ln2 := math.Log(2.0)
factor := -math.Log(fpp) / (ln2 * ln2)
numBits := nextPowerOfTwo(uint32(float64(ins) * factor))
if numBits == 0 {
numBits = 1
}
f.bitsMask = numBits - 1
if ins == 0 {
f.numHashes = 1
} else {
f.numHashes = uint32(ln2 * float64(numBits) / float64(ins))
}
size := int(numBits+63) / 64
if len(f.bits) != size {
f.bits = make([]uint64, size)
} else {
f.reset()
}
}
TinyLFU的初始化
前面介绍了TinyLFU的各个组件,接下来将详细讲解TinyLFU是如何进行缓存管理的.
const ( //entry所处的缓存段
admissionWindow uint8 = iota
probationSegment
protectedSegment
)
const (
samplesMultiplier = 8 //采样窗口大小乘数
insertionsMultiplier = 2 //doorkeeper大小乘数
countersMultiplier = 1 //计数器大小乘数
falsePositiveProbability = 0.1 //假阳性率
admissionRatio = 0.01 //window Cache占比
)
func (l *tinyLFU) init(c *cache, cap int) {
if cap > 0 {
// 只在容量有上限时开启doorkeeper
l.samples = samplesMultiplier * cap //采样窗口大小
l.filter.init(insertionsMultiplier*cap, falsePositiveProbability) //doorkeeper初始化
l.counter.init(countersMultiplier * cap) //计数器初始化
}
lruCap := int(float64(cap) * admissionRatio)
l.lru.init(c, lruCap) //window Cache初始化
l.slru.init(c, cap-lruCap) //SLRU主存初始化
}
TinyLFU写入
TinyLFU的写入流程如下
首先写入window Cache
如果window Cache出现被淘汰的candidate,则将从SLRU中选取一个victim(如果SLRU未满,则不会产生victim,此时直接将candidate插入SLRU)
在计数器中查询candidate和victim的访问次数
- 若candidate的访问次数较大,则将其插入SLRU,淘汰victim
- 否则将candidate淘汰
根据此流程,我们可以发现被插入SLRU的entry的访问次数一定是较大的,而我们通过计数器实现了对entry访问次数的保存,这样就结合了LRU以及LFU的优点并且没有占用过多的内存,这正是TinyLFU最大的优势所在.
func (l *tinyLFU) write(en *entry) *entry {
if l.lru.cap <= 0 { //若容量无限,则直接对SLRU写入
return l.slru.write(en)
}
l.increase(en.hash) //entry访问次数+1
candidate := l.lru.write(en) //window Cache中被淘汰的entry
if candidate == nil {
return nil
}
victim := l.slru.victim() //SLRU中下一个将被淘汰的entry
if victim == nil {
return l.slru.write(candidate)
}
candidateFreq := l.estimate(candidate.hash)
victimFreq := l.estimate(victim.hash)
if candidateFreq > victimFreq {
return l.slru.write(candidate)
}
return candidate
}
TinyLFU访问
TinyLFU的访问很简单,只是根据entry所在的缓存段分别调用对应的access函数实现访问
func (l *tinyLFU) access(en *entry) {
l.increase(en.hash)
if en.listID == admissionWindow {
l.lru.access(en)
} else {
l.slru.access(en)
}
}
增加entry的访问次数
write函数中通过increase可以对entry的访问次数+1,下面分析一下increase函数:
func (l *tinyLFU) increase(h uint64) {
if l.samples <= 0 {
return
}
l.additions++
if l.additions >= l.samples {
l.filter.reset()
l.counter.reset()
l.additions = 0
}
if l.filter.put(h) {
l.counter.add(h)
}
}
这里会判断已采样数量是否超出采样窗口,若超出了则会重置doorkeeper和计数器.如果entry在doorkeeper中存在,那么filter.put(h)将返回true,此时会调用counter.add(h)来增加entry的访问次数.
估计entry访问次数
write函数中通过estimate可以查询entry的访问次数,下面分析一下estimate函数:
func (l *tinyLFU) estimate(h uint64) uint8 {
freq := l.counter.estimate(h)
if l.filter.contains(h) {
freq++
}
return freq
}
已经在前面的章节中介绍过counter.estimate(),其根据hash值返回对应元素的计数器中最小的值以减少哈希碰撞的影响.这里需要注意的是l.filter.contains(h),它将在doorkeeper中查找该hash值是否存在,若存在需要将估计值+1(因为doorkeeper中的值也算访问次数)
总结
Mango Cache中还有统计模块来记录缓存的各方面运行状态(命中率,缓存驱逐等等),由于不是核心内容,因此就不在本文进行赘述了.
Mango Cache最值得学习的点就是事件处理机制和TinyLFU缓存管理策略,希望读者可以好好体会在并发状态下,如何实现安全且高效的缓存操作并借助TinyLFU实现较高的缓存命中率.
以上就是Mango Cache缓存管理库TinyLFU源码解析的详细内容,更多关于Mango Cache TinyLFU的资料请关注编程网其它相关文章!