基于Golang实现延迟队列(DelayQueue)
来源:脚本之家
时间:2023-02-16 15:22:20 321浏览 收藏
在Golang实战开发的过程中,我们经常会遇到一些这样那样的问题,然后要卡好半天,等问题解决了才发现原来一些细节知识点还是没有掌握好。今天golang学习网就整理分享《基于Golang实现延迟队列(DelayQueue)》,聊聊延迟、队列,希望可以帮助到正在努力赚钱的你。
背景
延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n))。
由于以上性质,延迟队列一般可以用于以下场景(定时任务、延迟任务):
- 缓存:用户淘汰过期元素
- 通知:在指定时间通知用户,比如会议开始前30分钟
- 订单:30分钟未支付取消订单
- 超时:服务器自动断开太长时间没有心跳的连接
其实在Golang中是自带定时器的,也就是time.After()
、time.AfterFunc()
等函数,它们的性能也是非常好的,随着Golang版本升级还会优化。但是对于某些场景来说确实不够方便,比如缓存场景我们需要能够支持随机删除定时器,随机重置过期时间,更加灵活的删除一小批过期元素。而且像Kafka的时间轮算法(TimeWheel)里面也用到了延迟队列,因此还是有必要了解下如何实现延迟队列。
原理
堆
延迟队列每次出队的是最小到期时间的元素,而堆就是用来获取最值的数据结构。使用堆我们可以实现O(log(n))时间复杂度添加元素和移除最小到期时间元素。
随机删除
有时候延迟队列还需要具有随机删除元素的能力,可以通过以下方式实现:
- 元素添加删除标记字段:堆中每个元素都添加一个删除标记字段,并把这个元素的地址返回给用户,用户就可以标记元素的这个字段为true,这样元素到达堆顶时如果判断到这个字段为true就会被清除,而延迟队列里的元素逻辑上是一定会到达堆顶的(因为时间会流逝)。这是一种懒删除的方式。
- 元素添加堆中下标字段(或用map记录下标):堆中每个元素都添加一个堆中下标字段,并把这个元素的地址返回给用户,这样我们就可以通过这个元素里面记录的下标快速定位元素在堆中的位置,从而删除元素。详细可以看文章如何实现一个支持O(log(n))随机删除元素的堆。
重置元素到期时间
如果需要重置延迟队列里面元素的到期时间,则必须知道元素在堆中的下标,因为重置到期时间之后必须对堆进行调整,因此只能是元素添加堆中下标字段
。
Golang实现
这里我们实现一个最简单的延迟队列,也就是不支持随机删除元素和重置元素的到期时间,因为有些场景只需要添加元素和获取到期元素这两个功能,比如Kafka中的时间轮,而且这种简单实现性能会高一点。
数据结构
主要的结构可以看到就是一个heap,Entry是每个元素在堆中的表示,Value是具体的元素值,Expired是为了堆中元素根据到期时间排序。
mutex是一个互斥锁,主要是保证操作并发安全。
wakeup是一个缓冲区长度为1的通道,通过它实现添加元素的时候唤醒等待队列不为空或者有更小到期时间元素加入的协程。(重点)
type Entry[T any] struct { Value T Expired time.Time // 到期时间 } // 延迟队列 type DelayQueue[T any] struct { h *heap.Heap[*Entry[T]] mutex sync.Mutex // 保证并发安全 wakeup chan struct{} // 唤醒通道 } // 创建延迟队列 func New[T any]() *DelayQueue[T] { return &DelayQueue[T]{ h: heap.New(nil, func(e1, e2 *Entry[T]) bool { return e1.Expired.Before(e2.Expired) }), wakeup: make(chan struct{}, 1), } }
实现原理
阻塞获取元素的时候如果队列已经没有元素,或者没有元素到期,那么协程就需要挂起等待。而被唤醒的条件是元素到期、队列不为空或者有更小到期时间元素加入。
其中元素到期协程在阻塞获取元素时发现堆顶元素还没到期,因此这个条件可以自己构造并等待。但是条件队列不为空和有更小到期时间元素加入则需要另外一个协程在添加元素时才能满足,因此必须通过一个中间结构来进行协程间通信,一般Golang里面会使用Channel来实现。
添加元素
一开始加了一个互斥锁,避免并发冲突,然后把元素加到堆里。
因为我们Take()操作,既阻塞获取元素操作,在不满足条件时会去等待wakeup通道,但是等待通道前必须释放锁,否则Push()无法写入新元素去满足条件队列不为空和有更小到期时间元素加入。而从释放锁后到开始读取wakeup通道这段时间是没有锁保护的,如果Push()在这期间插入新元素,为了保证通道不阻塞同时又能通知到Take()协程,我们的通道的长度需要是1,同时使用select+default保证在通道里面已经有元素的时候不阻塞Push()协程。
// 添加延迟元素到队列 func (q *DelayQueue[T]) Push(value T, delay time.Duration) { q.mutex.Lock() defer q.mutex.Unlock() entry := &Entry[T]{ Value: value, Expired: time.Now().Add(delay), } q.h.Push(entry) // 唤醒等待的协程 // 这里表示新添加的元素到期时间是最早的,或者原来队列为空 // 因此必须唤醒等待的协程,因为可以拿到更早到期的元素 if q.h.Peek() == entry { select { case q.wakeup阻塞获取元素
这里先判断堆是否有元素,如果有获取堆顶元素,然后判断是否已经到期,如果到期则直接出堆并返回。否则等待直到超时或者元素到期或者有新的元素到达。
这里在解锁之前会清空wakeup通道,这样可以保证下面读取的wakeup通道里的元素肯定是新加入的。
// 等待直到有元素到期 // 或者ctx被关闭 func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) { for { var expired *time.Timer q.mutex.Lock() // 有元素 if !q.h.Empty() { // 获取元素 entry := q.h.Peek() if time.Now().After(entry.Expired) { q.h.Pop() q.mutex.Unlock() return entry.Value, true } // 到期时间,使用time.NewTimer()才能够调用Stop(),从而释放定时器 expired = time.NewTimer(time.Until(entry.Expired)) } // 避免被之前的元素假唤醒 select { caseChannel方式阻塞读取
Golang里面可以使用Channel进行流式消费,因此简单包装一个Channel形式的阻塞读取接口,给通道一点缓冲区大小可以带来更好的性能。
// 返回一个通道,输出到期元素 // size是通道缓存大小 func (q *DelayQueue[T]) Channel(ctx context.Context, size int)使用方式
for entry := range q.Channel(context.Background(), 10) { // do something }性能测试
这里进行一个简单的性能测试,也就是先添加元素,然后等待到期后全部拿出来。
func BenchmarkPushAndTake(b *testing.B) { q := New[int]() b.ResetTimer() // 添加元素 for i := 0; i测试结果:
Benchmark-8 2331534 476.8 ns/op 76 B/op 1 allocs/op
总结
堆实现的延迟队列是一种实现起来比较简单的定时器(当然阻塞读取Take()是比较复杂的),由于时间复杂度是O(log(n)),因此可以满足定时任务数量不是特别多的场景。堆实现的延迟队列也是可以随机删除元素的,可以根据具体任务选择是否实现。如果对定时器性能要求比较敏感的话可以选择使用时间轮实现定时器,它可以在O(1)的时间复杂度添加和删除一个定时器,不过实现起来比较复杂(挖个坑,下篇文章实现)。
终于介绍完啦!小伙伴们,这篇关于《基于Golang实现延迟队列(DelayQueue)》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!
-
426 收藏
-
468 收藏
-
326 收藏
-
311 收藏
-
497 收藏
-
290 收藏
-
239 收藏
-
381 收藏
-
168 收藏
-
500 收藏
-
355 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习