登录
首页 >  Golang >  Go教程

Mango Cache缓存管理库TinyLFU源码解析

来源:脚本之家

时间:2022-12-27 17:42:14 420浏览 收藏

来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习Golang相关编程知识。下面本篇文章就来带大家聊聊《Mango Cache缓存管理库TinyLFU源码解析》,介绍一下TinyLFU、MangoCache、缓存管理库,希望对大家的知识积累有所帮助,助力实战开发!

介绍

据官方所述,mango Cache是对Guava Cache基于go的部分实现,同时mangoCache参考了Caffeine以及go-tinylfu.

支持以下缓存管理策略:

  • LRU
  • Segmented LRU(默认)
  • TinyLFU(实验性)

本文将从源码对其进行解析,重点将放在loadingCache(一种可以自定义如何载入新内存的cache)上面.

整体架构

mango Cache的主体功能由localCache结构体(local.go)实现,

type localCache struct {
    cache cache // 真正存放数据的地方
    expireAfterAccess time.Duration //用户自定义的过期以及刷新时间
    expireAfterWrite  time.Duration
    refreshAfterWrite time.Duration
    policyName        string    //缓存管理策略
    onInsertion Func //钩子函数
    onRemoval   Func
    loader   LoaderFunc //自定义加载和重载函数
    reloader Reloader
    stats    StatsCounter //状态管理器
    cap int //缓存容量
    // 访问时的缓存管理策略
    accessQueue policy
    // 写入时的缓存管理策略,只有在expireAfterWrite或refreshAfterWrite被设置以后才启用
    writeQueue policy
    // 用于processEntries的事件队列
    events chan entryEvent
    // 记录距离上一次写期间发生的读次数,用于判断是否进行清扫
    readCount int32
    // 用于关闭由localCache创建的协程
    closing int32
    closeWG sync.WaitGroup
}

真正存储数据的结构体是cache(policy.go),所有缓存数据都存储在其中

type cache struct {
    size int64                  // 缓存大小
    segs [segmentCount]sync.Map // 分片sync.map, map[key]*entry
}

entry是存放一个缓存数据的单元

type entry struct {
   hash uint64  //key的hash值
   accessTime int64 
   writeTime int64 
   invalidated int32 //是否有效
   loading     int32 //是否处于载入中状态
   key   Key
   value atomic.Value // 存储的值
   // 下面的属性只被缓存管理策略操作,所以不需要原子性的访问
   accessList *list.Element //此entry目前所在的访问链表中的位置
   writeList *list.Element  //此entry目前所在的写入链表中的位置
   listID uint8             //此entry目前所在的缓存段(SLRU)
}

localCache使用事件队列来管理并发读写事件,事件队列是一个chan,其中流转着entryEvent(即数据以及对应的缓存事件),localCache通过processEntries协程来处理各种读写事件.之后将详细讲解mango Cache是如何进行读写操作的.

localCache的write、access等操作底层是通过操作cache、accessQueue以及writeQueue从而实现的,而localCache还会负责清扫过期数据的工作.

前面提到了mango Cache支持诸如LRU、SLRU以及TinyLFU等缓存管理策略,其在localCache中即为accessQueue以及writeQueue,负责对缓存的淘汰准入等操作.

初始化流程

mango Cache提供了两种cache---普通Cache以及LoadingCache,这两者都是接口,而localCache实现了这两个接口,由于LoadingCache继承了普通Cache,因此本文只讲解LoadingCache.

func NewLoadingCache(loader LoaderFunc, options ...Option) LoadingCache {
    c := newLocalCache()
    c.loader = loader
    for _, opt := range options {
        opt(c)
    }
    c.init()
    return c
}

NewLoadingCache函数先初始化一个LoadingCache,然后根据用户传入的自定义载入函数和一些配置来初始化LoadingCache.配置包括注册插入或者删除时触发的钩子函数以及过期时间等等.然后调用localCache.init.

func (c *localCache) init() {
    c.accessQueue = newPolicy(c.policyName)
    c.accessQueue.init(&c.cache, c.cap)
    if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 {
        c.writeQueue = &recencyQueue{}
    } else {
        c.writeQueue = discardingQueue{}
    }
    c.writeQueue.init(&c.cache, c.cap)
    c.events = make(chan entryEvent, chanBufSize)
    c.closeWG.Add(1)
    go c.processEntries()
}

localCache.init会根据用户传入的缓存管理策略名字来初始化accessQueue然后根据是否有写过期和写刷新配置来决定是否初始化写入队列.接着创建事件队列并开启事件处理协程.到此为止,cache启动完成.

读流程

LoadingCache的Get操作可以通过key获取缓存值,其流程为:

先从主缓存中查询entry

若未查询到entry,则记录miss并且调用用户自定义的load方法加载缓存值并返回

若查询到entry,先检查是否过期

  • 若过期且没有设置loader则直接向事件处理协程发送eventDelete
  • 若过期但设置了loader,则异步更新entry值

若没有过期则更新访问时间并向事件处理协程发送eventAccess然后记录命中

最后返回entry

func (c *localCache) Get(k Key) (Value, error) {
    en := c.cache.get(k, sum(k)) //计算key的hash并查询该key
    if en == nil {
        c.stats.RecordMisses(1)
        return c.load(k)
    }
    // 检查entry是否需要更新
    now := currentTime()
    if c.isExpired(en, now) {
        if c.loader == nil {    //如果没有设置加载器则直接删除该entry
            c.sendEvent(eventDelete, en)
        } else {
            //对于loadingCache,我们不删除这个entry
            //而是把它暂时留在缓存中,所以用户依旧可以读取到旧的缓存值
            c.setEntryAccessTime(en, now)
            c.refreshAsync(en)
        }
        c.stats.RecordMisses(1)
    } else {
        c.setEntryAccessTime(en, now)
        c.sendEvent(eventAccess, en)
        c.stats.RecordHits(1)
    }
    return en.getValue(), nil
}

需要注意一下这里的refreshAsync函数:

func (c *localCache) refreshAsync(en *entry) bool {
    if en.setLoading(true) {
        // 如果此entry没有在加载
        if c.reloader == nil {
            go c.refresh(en)
        } else {
            c.reload(en)
        }
        return true
    }
    return false
}

如果没有用户设置的重载器,就异步执行refresh,refresh函数实际上就是对entry进行加载.

而如果有重载器那么就同步执行用户自定义的reload函数.

写流程

loadingCache的Put操作与Get操作类似,流程如下:

先去主缓存查询key是否存在,若查询到对应的entry,那么直接更新entry

若没有查询到对应的entry,说明其不存在,因此根据key,value初始化一个新entry

如果缓存容量足够,则让主缓存存储该entry,此时会再次检查主存中是否有该entry(解决并发问题)

  • 若cen不为空,说明主缓存中已经存在该entry,直接修改该entry即可
  • 若cen为空,说明主缓存中还不存在该entry,那么就会在主缓存中存储该entry

最后向事件处理协程发送eventWrite事件

func (c *localCache) Put(k Key, v Value) {
   h := sum(k)
   en := c.cache.get(k, h)
   now := currentTime()
   if en == nil {
      en = newEntry(k, v, h)
      c.setEntryWriteTime(en, now)
      c.setEntryAccessTime(en, now)
       // 直接将新value添加进缓存(在缓存容量足够的时候)
      if c.cap == 0 || c.cache.len() 

事件处理机制

主流程

mango Cache通过entryEvent chan以及processEntries协程来处理并发读写事务

缓存事件一共四个,分别为写入、访问、删除以及关闭.每个业务协程通过向localCache的events chan发送entryEvent通知事件处理协程,进而实现并发读写.

而processEntries协程内,会不断从events chan内取出entryEvent并执行对应的操作,在write、access以及delete操作后会执行清理工作(具体清扫工作由expireEntries函数执行)

type event uint8
const (
    eventWrite event = iota
    eventAccess
    eventDelete
    eventClose
)
type entryEvent struct {
    entry *entry
    event event
}
func (c *localCache) processEntries() {
    defer c.closeWG.Done()
    for e := range c.events {
        switch e.event {
        case eventWrite:          //写入事务
            c.write(e.entry)
            c.postWriteCleanup()  //清理操作
        case eventAccess:         //访问事务
            c.access(e.entry)
            c.postReadCleanup()   //清理操作
        case eventDelete:
            if e.entry == nil {   //InvalidateAll函数中使用
                c.removeAll()
            } else {
                c.remove(e.entry) //移除单个entry
            }
            c.postReadCleanup()   //清理操作
        case eventClose:
            if c.reloader != nil {
                // 停止所有refresh工作
                c.reloader.Close()
            }
            c.removeAll()
            return
        }
    }
}

write

由于事件处理机制对于access、delete和write的操作类似,因此这里只讲解较为复杂的write操作:

首先通过调用底层访问队列以及写入队列的write方法

触发用户自定义的钩子函数

如果write方法返回值不为空,说明有entry被驱逐,

因此需要从写入队列将其删除,同时记录驱逐并触发用户自定义的钩子函数

func (c *localCache) write(en *entry) {
    ren := c.accessQueue.write(en)
    c.writeQueue.write(en)
    if c.onInsertion != nil {
        c.onInsertion(en.key, en.getValue())
    }
    if ren != nil { //有entry被驱逐出了缓存
        c.writeQueue.remove(ren)
        c.stats.RecordEviction()
        if c.onRemoval != nil {
            c.onRemoval(ren.key, ren.getValue())
        }
    }
}

后面将详细讲述底层访问队列以及缓存管理是如何实现的.

清理工作

前面讲到过每次进行完write、access以及delete操作后会执行清理工作.具体地,write操作会触发postWriteCleanup而access和delete操作会触发postReadCleanup.

postReadCleanup会根据当前距离上一次写的read操作次数是否达到清理工作阈值来决定是否清理,这个阈值是64,也就是说每隔64次read操作就会触发一次清理工作

而postWriteCleanup将在每一次write操作之后触发

真正的清理工作由expireEntries函数完成,它一次清理工作最多只会清理16个entry避免了对事件处理的长时间阻塞.

func (c *localCache) postReadCleanup() {
    if atomic.AddInt32(&c.readCount, 1) > drainThreshold {
        atomic.StoreInt32(&c.readCount, 0)
        c.expireEntries()
    }
}
// 在添加完entry以后再执行
func (c *localCache) postWriteCleanup() {
    atomic.StoreInt32(&c.readCount, 0)
    c.expireEntries()
}
//清理工作函数
func (c *localCache) expireEntries() {
    remain := drainMax
    now := currentTime()
    if c.expireAfterAccess > 0 {
        expiry := now.Add(-c.expireAfterAccess).UnixNano()
        c.accessQueue.iterate(func(en *entry) bool {
            if remain == 0 || en.getAccessTime() >= expiry {
                // 找到了第一个没有过期的entry或者清理entry数足够了
                return false
            }
            c.remove(en)
            c.stats.RecordEviction()
            remain--
            return remain > 0
        })
    }
    if remain > 0 && c.expireAfterWrite > 0 {
        ...
    }
    if remain > 0 && c.loader != nil && c.refreshAfterWrite > 0 {
        ...
    }
}

缓存管理

localCache在初始化过程中也初始化了缓存管理策略,由于localCache的writeQueue默认使用LRU缓存淘汰策略,而accessQueue支持LRU、SLRU以及TinyLFU三种缓存淘汰策略,本节将着重讲解accessQueue.

什么是LRU?

LRU是Least Recently Used的缩写,即最近最少使用.在mango Cache中LRU依靠go SDK自带的List(双向链表)实现,新缓存条目会被插入List头部,如果List内元素达到容量上限则删除List尾部的元素,此元素也正是最近最少使用的元素.LRU可以使得主缓存内的缓存条目永远是最近被访问的,不是最近访问的元素将被淘汰.

如果一个不是经常使用的数据,偶尔或者周期性的被使用,那么该数据会被加到LRU链表头部,而这种不经常使用的数据,放在链表头部,占用了空间;一直等到LRU淘汰完,才会被剔除链表;如果这种数据一次性过多,那么链表数据都是这种无用的数据,从而会导致缓存命中率低下,影响系统性能.

什么是SLRU?

Segmented LRU(SLRU)是一种LRU的改进,主要把在一个时间窗口内命中至少2次的记录和命中1次的单独存放,这样就可以把短期内较频繁的缓存元素区分开来.具体做法上,SLRU包含2个固定尺寸的LRU,一个叫Probation段(A1),一个叫Protection段(A2).新记录总是插入到A1中,当A1的记录被再次访问,就把它移到A2,当A2满了需要驱逐记录时,会把驱逐记录插入到A1中.

在mango Cache的实现中,Protection段与Probation段大小比为4:1.

SLRU是mango Cache的默认缓存管理策略

什么是TinyLFU?

TinyLFU是一种空间利用率很高的新的数据结构,可以在较大访问量的场景下近似的替代LFU的数据统计部分(meta-data),其采用类似布隆过滤器的位计数器来实现对每个key的访问次数的粗略统计.(由于哈希冲突的存在,位计数器的结果将偏大)

对于较为静态的访问分布,各种的缓存管理策略的差异是微不足道的,而对于偏倚较大的负载场景,TinyLFU体现出了较大的优势.即便是使用了TinyLFU准入策略进行优化的LRU也获得了较大的提升.

如果想要详细了解TinyLFU请阅读论文TinyLFU: A Highly Efficient Cache Admission Policy 以及一种简略实现讲解LRU、LFU、TinyLFU缓存算法

mango Cache中的TinyLFU

mango Cache的实现中,实际上是实现了改进版的TinyLFU也就是Window-TinyLFU,这个缓存数据结构也在论文中有讲解,简而言之就是缓存由window Cache、doorKeeper(布隆过滤器)、counter以及SLRU组成.主缓存使用SLRU驱逐策略和TinyLFU准入策略,window Cache仅使用LRU驱逐策略无准入策略.

主缓存中的SLRU策略的A1和A2区域被静态划分开来,80%空间被分配给热点元素(A2),被驱逐者则从20%的非热项(A1)中选取.任何到来的元素总是允许进入window Cache, window Cache的淘汰元素有机会进入主缓存,如果在经过TinyLFU准入策略检验后被主缓存接受,那么该元素将进入主缓存,此时Window-TinyLFU的淘汰者就是主缓存的淘汰者(从A1段选出),否则该元素将被window Cache淘汰.

type tinyLFU struct {
    filter  bloomFilter    // doorkeeper
    counter countMinSketch // 4bit计数器
    additions int   //目前采样数量
    samples   int   //采样窗口大小
    lru  lruCache   //window Cache
    slru slruCache  //主缓存
}

接下来本文将详细讲述mango Cache中tinyLFU的具体实现

counter

counter是实现TinyLFU的重点,对于访问次数的粗略统计就是通过此数据结构实现的.

const sketchDepth = 4   //hash次数
type countMinSketch struct {
    counters []uint64
    mask     uint32
}

可以看到, counter由一个uint64类型的数组和一个mask组成, 因为我们使用的是4次hash的4bit计数器,因此数组的每个元素实际上是4个计数器,可以通过位操作来实现访问.

counter的初始化

func (c *countMinSketch) init(width int) {
    size := nextPowerOfTwo(uint32(width)) >> 2 //size = (width * 4 * 4) bits / 64
    if size 

我们通过传入的width确定需要的计数器数量(size大小)

counter的使用

counter最关键的操作就是按hash增加计数器值以及根据hash估算该hash的计数器值:

func (c *countMinSketch) add(h uint64) {
    h1, h2 := uint32(h), uint32(h>>32)
    for i := uint32(0); i >32)
    var min uint8 = 0xFF
    for i := uint32(0); i 

里面的position、val以及inc函数都是对应的位操作,有兴趣的话可以进一步研究下.

需要注意的是estimate函数用于求给定hash对应计数器中最小计数器的值,这是因为有哈希碰撞的情况,因此计数器的值始终会大于等于真实的访问次数,因此这里采用多次hash取其中最小值的方法来减少误差.

同时,为了保证计数器的时效性,counter实现了新鲜度机制,将在一定条件下触发reset:

//将每个计数器的值减半
func (c *countMinSketch) reset() {
    for i, v := range c.counters {
        if v != 0 {
            c.counters[i] = (v >> 1) & 0x7777777777777777
        }
    }
}

lruCache

lruCache是LRU的实现,在mango Cache中是一个链表.

type lruCache struct {
    cache *cache
    cap   int
    ls    list.List
}
func (l *lruCache) init(c *cache, cap int) {
    l.cache = c
    l.cap = cap
    l.ls.Init()
}

slruCache

slruCache是SLRU的实现,在mango Cache中由两个链表组成.

const (
    protectedRatio = 0.8
)
type slruCache struct {
    cache *cache
    probationCap int
    probationLs  list.List
    protectedCap int
    protectedLs  list.List
}
func (l *slruCache) init(c *cache, cap int) {
    l.cache = c
    l.protectedCap = int(float64(cap) * protectedRatio)
    l.probationCap = cap - l.protectedCap
    l.probationLs.Init()
    l.protectedLs.Init()
}

slruCache中,值得注意的点是它的写入策略:

func (l *slruCache) write(en *entry) *entry {
    if en.accessList != nil {
        // entry已存在,直接修改该entry
        l.markAccess(en)
        return nil
    }
    // 尝试将新entry加入probation段
    cen := l.cache.getOrSet(en)
    if cen == nil {
        en.listID = probationSegment    //将其加入probation段
        en.accessList = l.probationLs.PushFront(en)
    } else {
        // 该entry已存在,直接修改它的值
        cen.setValue(en.getValue())
        cen.setWriteTime(en.getWriteTime())
        if cen.accessList == nil {
            //该entry已载入缓存但是还没有注册到SLRU管理的链表中
            cen.listID = probationSegment
            cen.accessList = l.probationLs.PushFront(cen)
        } else {
            l.markAccess(cen)
        }
    }
    // probation段超出容量但list并没有超出容量
    if l.probationCap > 0 && l.probationLs.Len() > l.probationCap &&
        l.length() > (l.probationCap+l.protectedCap) {
        // 移除并返回probation段尾部的entry
        en = getEntry(l.probationLs.Back())
        return l.remove(en)
    }
    return nil
}
//记录访问情况
func (l *slruCache) markAccess(en *entry) {
    if en.listID == protectedSegment {
        // entry位于在protected段
        l.protectedLs.MoveToFront(en.accessList)
        return
    }
    // 若entry位于probation段,则将其提升至protected段
    en.listID = protectedSegment
    l.probationLs.Remove(en.accessList)
    en.accessList = l.protectedLs.PushFront(en)
    if l.protectedCap > 0 && l.protectedLs.Len() > l.protectedCap {
        // protected段超出容量限制,则淘汰其尾部的entry将其加入probation段
        en = getEntry(l.protectedLs.Back())
        en.listID = probationSegment
        l.protectedLs.Remove(en.accessList)
        en.accessList = l.probationLs.PushFront(en)
    }
}

这样一来,就实现了SLRU的缓存管理策略.

filter

filter是一个布隆过滤器,这里不展开讲解其实现细节,只需要了解布隆过滤器可以通过key的hash来确定这个key是否在filter中,并且其只占用非常小的内存.如果想要详细了解布隆过滤器,可以参考这篇文章:布隆过滤器

type bloomFilter struct {
    numHashes uint32   // 每个元素使用的hash数量
    bitsMask  uint32   // 位向量大小
    bits      []uint64 // 过滤器位向量
}
//根据给定的插入元素数量以及假阳性率初始化布隆过滤器
func (f *bloomFilter) init(ins int, fpp float64) {
    ln2 := math.Log(2.0)
    factor := -math.Log(fpp) / (ln2 * ln2)
    numBits := nextPowerOfTwo(uint32(float64(ins) * factor))
    if numBits == 0 {
        numBits = 1
    }
    f.bitsMask = numBits - 1
    if ins == 0 {
        f.numHashes = 1
    } else {
        f.numHashes = uint32(ln2 * float64(numBits) / float64(ins))
    }
    size := int(numBits+63) / 64
    if len(f.bits) != size {
        f.bits = make([]uint64, size)
    } else {
        f.reset()
    }
}

TinyLFU的初始化

前面介绍了TinyLFU的各个组件,接下来将详细讲解TinyLFU是如何进行缓存管理的.

const (                                 //entry所处的缓存段
    admissionWindow uint8 = iota        
    probationSegment
    protectedSegment
)
const (
    samplesMultiplier        = 8        //采样窗口大小乘数
    insertionsMultiplier     = 2        //doorkeeper大小乘数
    countersMultiplier       = 1        //计数器大小乘数
    falsePositiveProbability = 0.1      //假阳性率
    admissionRatio           = 0.01     //window Cache占比
)
func (l *tinyLFU) init(c *cache, cap int) {
    if cap > 0 {
        // 只在容量有上限时开启doorkeeper
        l.samples = samplesMultiplier * cap     //采样窗口大小
        l.filter.init(insertionsMultiplier*cap, falsePositiveProbability)   //doorkeeper初始化
        l.counter.init(countersMultiplier * cap)    //计数器初始化
    }
    lruCap := int(float64(cap) * admissionRatio) 
    l.lru.init(c, lruCap)               //window Cache初始化
    l.slru.init(c, cap-lruCap)          //SLRU主存初始化
}

TinyLFU写入

TinyLFU的写入流程如下

首先写入window Cache

如果window Cache出现被淘汰的candidate,则将从SLRU中选取一个victim(如果SLRU未满,则不会产生victim,此时直接将candidate插入SLRU)

在计数器中查询candidate和victim的访问次数

  • 若candidate的访问次数较大,则将其插入SLRU,淘汰victim
  • 否则将candidate淘汰

根据此流程,我们可以发现被插入SLRU的entry的访问次数一定是较大的,而我们通过计数器实现了对entry访问次数的保存,这样就结合了LRU以及LFU的优点并且没有占用过多的内存,这正是TinyLFU最大的优势所在.

func (l *tinyLFU) write(en *entry) *entry {
    if l.lru.cap  victimFreq {
        return l.slru.write(candidate)
    }
    return candidate
}

TinyLFU访问

TinyLFU的访问很简单,只是根据entry所在的缓存段分别调用对应的access函数实现访问

func (l *tinyLFU) access(en *entry) {
    l.increase(en.hash)
    if en.listID == admissionWindow {
        l.lru.access(en)
    } else {
        l.slru.access(en)
    }
}

增加entry的访问次数

write函数中通过increase可以对entry的访问次数+1,下面分析一下increase函数:

func (l *tinyLFU) increase(h uint64) {
    if l.samples = l.samples {
        l.filter.reset()
        l.counter.reset()
        l.additions = 0
    }
    if l.filter.put(h) {
        l.counter.add(h)
    }
}

这里会判断已采样数量是否超出采样窗口,若超出了则会重置doorkeeper和计数器.如果entry在doorkeeper中存在,那么filter.put(h)将返回true,此时会调用counter.add(h)来增加entry的访问次数.

估计entry访问次数

write函数中通过estimate可以查询entry的访问次数,下面分析一下estimate函数:

func (l *tinyLFU) estimate(h uint64) uint8 {
    freq := l.counter.estimate(h)
    if l.filter.contains(h) {
        freq++
    }
    return freq
}   

已经在前面的章节中介绍过counter.estimate(),其根据hash值返回对应元素的计数器中最小的值以减少哈希碰撞的影响.这里需要注意的是l.filter.contains(h),它将在doorkeeper中查找该hash值是否存在,若存在需要将估计值+1(因为doorkeeper中的值也算访问次数)

总结

Mango Cache中还有统计模块来记录缓存的各方面运行状态(命中率,缓存驱逐等等),由于不是核心内容,因此就不在本文进行赘述了.

Mango Cache最值得学习的点就是事件处理机制和TinyLFU缓存管理策略,希望读者可以好好体会在并发状态下,如何实现安全且高效的缓存操作并借助TinyLFU实现较高的缓存命中率.

好了,本文到此结束,带大家了解了《Mango Cache缓存管理库TinyLFU源码解析》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多Golang知识!

声明:本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>
评论列表