GoLang channel底层代码分析详解
来源:脚本之家
时间:2022-12-29 15:10:18 299浏览 收藏
本篇文章向大家介绍《GoLang channel底层代码分析详解》,主要包括Langchannel,具有一定的参考价值,需要的朋友可以参考一下。
以下源码都摘自 golang 1.16.15 版本。
1. channel 底层结构
Golang 中的 channel 对应的底层结构为 hchan 结构体(channel的源码位置在Golang包的 runtime/chan.go):
type hchan struct { qcount uint // buf当前元素的数量 dataqsiz uint // buf的容量 buf unsafe.Pointer // channel缓冲区,一个循环数组 elemsize uint16 // 元素大小 closed uint32 // channel关闭标记 elemtype *_type // element type sendx uint // 当下一次发送数据到channel时,数据存放到buf中的哪个index recvx uint // 当下一次从channel接收数据时,从buf的哪个index获取数据 recvq waitq // 等待接收数据的goroutine列表,双向链表 sendq waitq // 等待发送数据的goroutine列表,双向链表 lock mutex // 互斥锁,发送和接收操作前需要获取的锁,所以channel的发送和接收操作是互斥的 }
如果 dataqsiz == 0 时,则为无缓冲 channel,如果 dataqsiz > 0 时,则为有缓冲 channel。
其中 recvq 和 sendq 是一个双向链表结构,链表中的元素为 sudog 结构体,其中该结构体中保存了g,所以本质上recvq 和 sendq 是保存了等待接收/发送数据的goroutine列表。
channel 中的 recvq 和 sendq 的使用场景如下所示:
在从 channel 接收数据时 (data :=
在向 channel 发送数据时 (ch
type waitq struct { first *sudog last *sudog }
// sudog表示等待队列中的一个g,例如在一个channel中的发送/接收。 // sudog是必要的,因为g和同步对象的关系是多对多的,一个g可以在多个等待队列中,因此一个g会有很多个sudog, // 很多g可能在等待着同一个同步对象,因此一个对象可能有多个sudog。 // sudog是从一个特殊的池中分配的,使用acquireSudog和releaseSudog分配和释放它们。 type sudog struct { // 以下字段受此sudog阻塞的channel的hchan.lock保护 g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // 以下字段永远不会被同时访问 // 对于channel,waitlink只能被g访问 // 对于信号量,所有字段(包括上述字段)只有在持有semaRoot锁时才能访问。 acquiretime int64 releasetime int64 ticket uint32 // isSelect表示g正在参与选择,因此g.selectDone必须经过CAS处理,才能被唤醒 isSelect bool // success表示通过channel c的通信是否成功。 // 如果goroutine因为通过channel c传递了一个值而被唤醒,则为true // 如果因为c被关闭而唤醒,则为false success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
channel 结构图:
2. channel 的创建
// 无缓冲channel ch := make(chan int) // 缓冲大小为5的channel ch2 := make(chan int, 5)
创建 channel 的源码为runtime/chan.go文件中的 makechan 函数:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1 maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size注意这里返回的是 hchan 的指针,因此我们在函数间可以直接传递 channel,而不用传递channel的指针了。
另外,因为channel 的内存分配都用到了 mallocgc 函数,而 mallocgc 是负责堆内存分配的关键函数,因此可见 channel 是分配在堆内存上的。
3. channel 的发送流程
channel 的发送:
ch
channel 发送的源码对应 runtime/chan.go 的 chansend 函数:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果当前channel是nil if c == nil { // 如果不阻塞,则直接返回false if !block { return false } // 挂起当前goroutine gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // 这里访问了hchan结构中的closed, full函数内部访问了dataqsiz,recvq,qcount字段,这里没有加锁,是为什么呢? // 先说说这里判断的含义:如果不阻塞,且channel没有被关闭,且buf已满,则快速返回false,表示数据发送失败。 // 因为没有加锁,假如在判断c.closed == 0之后结果为true,在判断full之前,这时channel被其他goroutine关闭了, // 然后full函数返回了true,那么它会直接return false,这样子会有什么影响呢? // 其实并没有什么影响,在这种情况下返回false也是合理的,因为都是表示在不阻塞的情况下发送数据失败。 // 所以这里访问hchan里面的数据就没有加锁了 if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 锁住channel,可见channel是并发安全的 lock(&c.lock) // 如果channel已关闭,则panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 如果recvq等待接收队列中有值,则直接把值传给等待接收的goroutine,这样可以减少一次内存拷贝 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 如果recvq等待接收队列中没有值,且为有缓冲channel,则把数据copy到buf中 if c.qcount 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } // 如果唤醒后,发现 channel 被关闭,则关闭 panic(plainError("send on closed channel")) } return true }full 函数,用于判断当前channel是否还有坑位接收待发送的数据:
// 判断channel中是否还有位置存放数据 func full(c *hchan) bool { // 如果是非缓冲channel if c.dataqsiz == 0 { // 如果 recvq 中没有等待接收数据的 goroutine,则返回 true,表示已满,否则返回 false return c.recvq.first == nil } // 如果是有缓冲 channel,则判断buf是否已满 return c.qcount == c.dataqsiz }send 函数,在recvq中有等待接收数据的goroutine时会被调用:
// 在一个空的 channel c 中完成发送操作 // 把数据 ep 从发送者复制到接收者 sg 中 // 最后接收的 goroutine 会被唤醒 // channel c 一定是空的且被锁住的 // sg 一定是已经从 c 的 recvq 中出队了 // eq 一定是不等于 nil 的,且指向堆或者是调用者的栈 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // sg.elem 指向接收者存放接收数据的存放的位置 if sg.elem != nil { // 直接内存拷贝,从发送者拷贝到接收者内存 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g // 解锁 unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒接收数据的goroutine goready(gp, skip+1) }总结 channel 的发送流程:
判断 channel 是否是 nil,如果是,则会永久阻塞导致死锁报错
如果 channel 中 recvq 存在接收者 goroutine,则直接把需要发送的数据拷贝到接收 goroutine,这里其实是有sodog 的结构,里面保存了接受者goroutine的指针。
如果 recvq 中不存在接收者:
a. 如果 buf 没有满,则直接把数据拷贝到 buf 的 sendx 位置
b. 如果 channel 为无缓冲 channel 或 buf 已满,则把当前 goroutine 保存到 sendq 等待队列中,阻塞当前 goroutine
4. channel 的接收流程
channel 的接收:
data := data2, ok :=
channel 的接收分别有2个函数,其中一种是带”ok“返回值的,另外一种是不带"ok"返回值的。
- 带”ok"返回值的函数,该返回的布尔值为 true 时,并不表示当前通道还没有关闭,而是仅仅表示当前获取到的值是通道的正常生产出来的数据,而不是零值;当该布尔值为 false 时,表示当前的通道已经被关闭,并且获取到的值是零值。
- 不带"ok"返回值的函数,当 channel 被关闭时,就不能判断当前获取到的值是 channel 正常生产的值,还是零值了。
// 无返回值 func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } // 返回 bool 类型,如果返回false,表示 channel 已经被关闭,否则返回false。 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
不管是否返回 received,channel 的接收都调用了 chanrecv 函数:
// 从 channel c 中接收数据,并把数据复制到 ep 中。 // 在忽略接收数据的情况下,eq 可能是 nil,例如:empty, // 重新排序的读取可能会错误地表示成”open和empty“。 // 为了防止重排序,我们对这2个检查都使用原子加载,并依靠清空和关闭发生在同一个锁下的不同临界区。 // 当关闭带有阻塞发送的非缓冲channel,此假设失败,但这无论如何都是错误的条件。 if atomic.Load(&c.closed) == 0 { // 因为 channel 不能重新打开,所以在后面这里观察到 channel 没有被关闭,意味着它在第一次判断 empty 的时候也没有关闭。 // 这样就表现得像在第一次判断 empty 时,通道也没有关闭:if empty(c) && atomic.Load(&c.closed) == 0 {...} return } // 当执行到这里的时候,说明 channel 已经被关闭了。 // 这时重新检查通道是否还有其他待接收的数据,这些数据可能在第一次 empty 检查和通道关闭检查之间到达。 // 在这种情况下发送时,也需要按照连贯的顺序。 if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 获取锁 lock(&c.lock) // 如果 channel c 已经被关闭,且 buf 中无元素,将获取到零值 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 如果 sendq 中有元素 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 找到一个正在等待的发送者。 // 1.如果是无缓冲 channel,则直接把从发送者那里接收数据。 // 2.如果是有缓冲 channel,这时 sendq 中有元素,说明 buf 满了,发送者需要等待消费者消费 buf 数据后才能继续发送数据。 // 这时当前的 goroutine 会从 buf 的 recvx 位置接收数据,并且把刚刚获取到的发送者 sg 的发送数据拷贝到 buf 的 sendx 位置中。 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // sendq 中没有等待的发送者,且 buf 中有数据,则直接从 buf 中接收数据 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- // 解锁 unlock(&c.lock) return true, true } // 如果代码运行到这里,说明 channel 中没有数据可以接收了,接下来就要准备阻塞当前 goroutine 了 // 如果不阻塞,则快速返回 if !block { // 解锁 unlock(&c.lock) return false, false } // no sender available: block on this channel. // 构造sudog // 获取当前 goroutine 指针 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 把构造好的 sudog 入队 recvq c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 挂起当前 goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 如果 goroutine 被唤醒,会从这里开始继续执行 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
empty 函数用于判断从 channel c 中读取数据是否会阻塞:
func empty(c *hchan) bool { // c.dataqsiz 是不会被改变的. if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 }
recv 函数在 channel c 的 buf 是满的,且 sendq 中有等待发送的 goroutine 时会被调用:
// 这里分为 2 个部分: // 1.发送者 sg 待发送的值会被放入通道 buf 中,发送者被唤醒继续执行 // 2.接收方(当前 goroutine)接收的值写入 ep // 对于同步 channel(无缓冲),2 个值都是一样的 // 对于异步 channel(有缓冲),接收方从 channel buf 获取数据,发送方的数据放入 channel buf // channel c 一定是满的,且已被锁定,recv 用 unlockf 解锁 channel c。 // sg 一定已经从 sendq 出队 // 不等于 nil 的 ep 一定指向堆或调用者的栈 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // 非缓冲 channel,直接从发送方接收数据 recvDirect(c.elemtype, sg, ep) } } else { // 缓冲 channel,buf 已满 // 先从 buf 队列头部接收数据,然后把获取出来的发送方数据入队 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 从 buf 中复制数据到接收方 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 把发送方 sg 的数据复制到 buf 中 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g // 解锁 unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送方 goroutine goready(gp, skip+1) }
总结 channel 的接收流程:
判断 channel 是否是 nil,如果是,则会永久阻塞导致死锁报错如果 channel 中 sendq 有等待发送数据的 goroutine:
a. 如果是无缓存 channel,则直接把要发送的数据拷贝到接收者的 goroutine 中,并唤醒发送方 goroutine;
b. 如果是有缓存的 channel(说明此时recvd满了),则把 buf 中的 recvx 位置的数据拷贝到当前接收的goroutine,然后把 sendq 中第一个等待发送goroutine的数据拷贝到buf 中的 sendx 位置,并唤醒发送的goroutine如果 channel 中 sendq 没有等待发送数据的 goroutine:
a. 如果 buf 有数据,则把 buf 中的 recvx 位置的数据拷贝到当前的接收goroutine
b. 如果 buf 没有数据,则把当前 goroutine 加入 recvd 等待队列中,并挂起
5. channel 使用注意事项
最后啰嗦一下 channel 使用的注意事项,这也是在我们平常开发中容易忽略的:
- 一个 channel 不能多次 close,否则会导致 panic。
- 关闭一个 nil 的 channel,会导致 panic。
- 向一个已经 close 的 channel 发送数据,会导致 panic。
- 不要从一个 receiver 测关闭 channel,也不要在有多个 sender 时关闭 channel。在go语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。
- 如果监听的channel 已经关闭,还可以获取到 channel buf 中剩余的值,当接收完 buf 中的数据后,才会获取到零值。
今天关于《GoLang channel底层代码分析详解》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于golang的内容请关注golang学习网公众号!
-
215 收藏
-
322 收藏
-
210 收藏
-
108 收藏
-
367 收藏
-
419 收藏
-
234 收藏
-
155 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习