Go语言实现的可读性更高的并发神库详解
来源:脚本之家
时间:2023-02-25 09:25:31 458浏览 收藏
从现在开始,我们要努力学习啦!今天我给大家带来《Go语言实现的可读性更高的并发神库详解》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到并发、库、可读性等等知识点,如果在阅读本文过程中有遇到不清楚的地方,欢迎留言呀!我们一起讨论,一起学习!
- 更难出现goroutine泄漏
- 处理panic更友好
- 并发代码可读性高
从简介上看主要封装功能如下:
- 对
waitGroup
进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高 - 提供
panics.Catcher
封装recover
逻辑,统一捕获panic
,打印调用栈一些信息 - 提供一个并发执行任务的
worker
池,可以控制并发度、goroutine可以进行复用,支持函数签名,同时提供了stream
方法来保证结果有序 - 提供
ForEach
、map
方法优雅的处理切片
接下来就区分模块来介绍一下这个库;
WaitGroup的封装
Go语言标准库有提供sync.waitGroup
控制等待goroutine,我们一般会写出如下代码:
func main(){ var wg sync.WaitGroup for i:=0; i
上述代码我们需要些一堆重复代码,并且需要单独在每一个func中处理recover逻辑,所以conc
库对其进行了封装,代码简化如下:
func main() { wg := conc.NewWaitGroup() for i := 0; i < 10; i++ { wg.Go(doSomething) } wg.Wait() } func doSomething() { fmt.Println("test") }
conc
库封装也比较简单,结构如下:
type WaitGroup struct { wg sync.WaitGroup pc panics.Catcher }
其自己实现了Catcher
类型对recover逻辑进行了封装,封装思路如下:
type Catcher struct { recovered atomic.Pointer[RecoveredPanic] }
recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:
type RecoveredPanic struct { // The original value of the panic. Value any // The caller list as returned by runtime.Callers when the panic was // recovered. Can be used to produce a more detailed stack information with // runtime.CallersFrames. Callers []uintptr // The formatted stacktrace from the goroutine where the panic was recovered. // Easier to use than Callers. Stack []byte }
提供了Try方法执行方法,只会记录第一个panic的goroutine信息:
func (p *Catcher) Try(f func()) { defer p.tryRecover() f() } func (p *Catcher) tryRecover() { if val := recover(); val != nil { rp := NewRecoveredPanic(1, val) // 只会记录第一个panic的goroutine信息 p.recovered.CompareAndSwap(nil, &rp) } }
提供了Repanic()
方法用来重放捕获的panic:
func (p *Catcher) Repanic() { if val := p.Recovered(); val != nil { panic(val) } } func (p *Catcher) Recovered() *RecoveredPanic { return p.recovered.Load() }
waitGroup
对此也分别提供了Wait()
、WaitAndRecover()
方法:
func (h *WaitGroup) Wait() { h.wg.Wait() // Propagate a panic if we caught one from a child goroutine. h.pc.Repanic() } func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic { h.wg.Wait() // Return a recovered panic if we caught one from a child goroutine. return h.pc.Recovered() }
wait
方法只要有一个goroutine发生panic就会向上抛出panic,比较简单粗暴;
waitAndRecover
方法只有有一个goroutine发生panic就会返回第一个recover的goroutine信息;
总结:conc库对waitGrouop
的封装总体是比较不错的,可以减少重复的代码;
worker池
conc
提供了几种类型的worker池:
- ContextPool:可以传递context的pool,若有goroutine发生错误可以cancel其他goroutine
- ErrorPool:通过参数可以控制只收集第一个error还是所有error
- ResultContextPool:若有goroutine发生错误会cancel其他goroutine并且收集错误
- RestultPool:收集work池中每个任务的执行结果,并不能保证顺序,保证顺序需要使用stream或者iter.map;
我们来看一个简单的例子:
import "github.com/sourcegraph/conc/pool" func ExampleContextPool_WithCancelOnError() { p := pool.New(). WithMaxGoroutines(4). WithContext(context.Background()). WithCancelOnError() for i := 0; i
在创建pool时有如下方法可以调用:
p.WithMaxGoroutines()
配置pool中goroutine的最大数量p.WithErrors
:配置pool中的task是否返回errorp.WithContext(ctx)
:配置pool中运行的task当遇到第一个error要取消p.WithFirstError
:配置pool中的task只返回第一个errorp.WithCollectErrored
:配置pool的task收集所有error
pool的基础结构如下:
type Pool struct { handle conc.WaitGroup limiter limiter tasks chan func() initOnce sync.Once }
limiter是控制器,用chan来控制goroutine的数量:
type limiter chan struct{} func (l limiter) limit() int { return cap(l) } func (l limiter) release() { if l != nil { <-l } }
pool的核心逻辑也比较简单,如果没有设置limiter,那么就看有没有空闲的worker,否则就创建一个新的worker,然后投递任务进去;
如果设置了limiter,达到了limiter worker数量上限,就把任务投递给空闲的worker,没有空闲就阻塞等着;
func (p *Pool) Go(f func()) { p.init() if p.limiter == nil { // 没有限制 select { case p.tasks
这里work使用的是一个无缓冲的channel,这种复用方式很巧妙,如果goroutine执行很快避免创建过多的goroutine;
使用pool处理任务不能保证有序性,conc库又提供了Stream
方法,返回结果可以保持顺序;
Stream
Steam的实现也是依赖于pool
,在此基础上做了封装保证结果的顺序性,先看一个例子:
func ExampleStream() { times := []int{20, 52, 16, 45, 4, 80} stream := stream2.New() for _, millis := range times { dur := time.Duration(millis) * time.Millisecond stream.Go(func() stream2.Callback { time.Sleep(dur) // This will print in the order the tasks were submitted return func() { fmt.Println(dur) } }) } stream.Wait() // Output: // 20ms // 52ms // 16ms // 45ms // 4ms // 80ms }
stream
的结构如下:
type Stream struct { pool pool.Pool callbackerHandle conc.WaitGroup queue chan callbackCh initOnce sync.Once }
queue
是一个channel类型,callbackCh也是channel类型 - chan func():
type callbackCh chan func()
在提交goroutine
时按照顺序生成callbackCh传递结果:
func (s *Stream) Go(f Task) { s.init() // Get a channel from the cache. ch := getCh() // Queue the channel for the callbacker. s.queue
ForEach和map
ForEach
conc库提供了ForEach方法可以优雅的并发处理切片,看一下官方的例子:
conc库使用泛型进行了封装,我们只需要关注handle代码即可,避免冗余代码,我们自己动手写一个例子:
func main() { input := []int{1, 2, 3, 4} iterator := iter.Iterator[int]{ MaxGoroutines: len(input) / 2, } iterator.ForEach(input, func(v *int) { if *v%2 != 0 { *v = -1 } }) fmt.Println(input) }
ForEach内部实现为Iterator结构及核心逻辑如下:
type Iterator[T any] struct { MaxGoroutines int } func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) { if iter.MaxGoroutines == 0 { // iter is a value receiver and is hence safe to mutate iter.MaxGoroutines = defaultMaxGoroutines() } numInput := len(input) if iter.MaxGoroutines > numInput { // No more concurrent tasks than the number of input items. iter.MaxGoroutines = numInput } var idx atomic.Int64 // 通过atomic控制仅创建一个闭包 task := func() { i := int(idx.Add(1) - 1) for ; i
可以设置并发的goroutine数量,默认取的是GOMAXPROCS ,也可以自定义传参;
并发执行这块设计的很巧妙,仅创建了一个闭包,通过atomic控制idx,避免频繁触发GC;
map
conc库提供的map方法可以得到对切片中元素结果,官方例子:
使用map可以提高代码的可读性,并且减少了冗余代码,自己写个例子:
func main() { input := []int{1, 2, 3, 4} mapper := iter.Mapper[int, bool]{ MaxGoroutines: len(input) / 2, } results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 }) fmt.Println(results) // Output: // [false true false true] }
map的实现也依赖于Iterator,也是调用的ForEachIdx方法,区别于ForEach是记录处理结果;
总结
花了小半天时间看了一下这个库,很多设计点值得我们学习,总结一下我学习到的知识点:
- conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,提高了可读性,避免了冗余代码
- ForEach、Map方法可以更优雅的并发处理切片,代码简洁易读,在实现上Iterator中的并发处理使用atomic来控制只创建一个闭包,避免了GC性能问题
- pool是一个并发的协程队列,可以控制协程的数量,实现上也很巧妙,使用一个无缓冲的channel作为worker,如果goroutine执行速度快,避免了创建多个goroutine
- stream是一个保证顺序的并发协程队列,实现上也很巧妙,使用sync.Pool在提交goroutine时控制顺序,值得我们学习;
到这里,我们也就讲完了《Go语言实现的可读性更高的并发神库详解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于golang的知识点!
-
173 收藏
-
417 收藏
-
328 收藏
-
367 收藏
-
240 收藏
-
367 收藏
-
419 收藏
-
234 收藏
-
155 收藏
-
457 收藏
-
309 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习
-
- 愉快的奇迹
- 感谢大佬分享,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢老哥分享文章内容!
- 2023-03-28 22:35:46
-
- 虚幻的小伙
- 这篇技术贴太及时了,太全面了,真优秀,mark,关注up主了!希望up主能多写Golang相关的文章。
- 2023-03-07 17:23:23
-
- 直率的夕阳
- 很棒,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢作者大大分享博文!
- 2023-03-02 21:46:58
-
- 畅快的豆芽
- 这篇博文真及时,太全面了,很好,收藏了,关注up主了!希望up主能多写Golang相关的文章。
- 2023-03-02 14:08:21
-
- 还单身的柜子
- 太细致了,收藏了,感谢师傅的这篇技术贴,我会继续支持!
- 2023-02-27 17:11:14
-
- 危机的猫咪
- 这篇技术文章出现的刚刚好,大佬加油!
- 2023-02-26 23:10:56