0.前言
channel(通道)用于goroutine(协程)之间的通信。它提供了一种在不同协程之间传递数据的机制。channel是一种类型安全的、阻塞的、先进先出(FIFO)的数据结构,确保发送的数据按照发送的顺序接收。Go语言提供通过通信来共享内存,而不是通过共享内存来通信
1. 基本数据结构
channel的底层源码和相关实现在src/runtime/chan.go中(本文代码全为go version go1.20.5 darwin/amd64)
hchan是Channel底层数据结构对应的结构体。如代码【Code1.1 Go Channel底层数据结构hchan】
type hchan struct {
qcount uint // 循环数组中的元素数量,长度
dataqsiz uint // 循环数组的大小,容量
// channel分为无缓冲和有缓冲channel两种
// 有缓冲的channel使用ring buffer(环形缓冲区)来缓存写入的数据,本质是循环数组
// 为什么是循环数组?普通数组容量固定、更适合指定的空间,且弹出元素时,元素需要全部前移
buf unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区)
elemsize uint16 // 元素的大小
closed uint32 // 是否关闭的标志,0:未关闭,1:已关闭
elemtype *_type // channel中的元素类型
// 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和当前写的下标位置
sendx uint // 下一次写的位置
recvx uint // 下一次读的位置
// 尝试读/写channel时被阻塞的goroutine
recvq waitq // 读等待队列
sendq waitq // 写等待队列
// 互斥锁,保证读写channel时的并发安全问题
lock mutex
}
1.1 sendx&recvx
sendx:下一次需要写的位置
recvx:下一次需要读的位置
循环队列需要利用空闲单元法来解决队空或队满时都存在的front ==rear带来的二义性问题
对上述Pictrue1.2所示的循环队列有如下性质
- 当初始化队列为空时,front==rear==0
- 入队,rear+1,指向队尾的下一个存储单元,为了实现循环利用取模运算rear =(rear + 1)% max
- 出队,front+1,指向下一个队首,实现循环front =(front + 1)% max
- 判断队满:(Q.rear+1)% Q.max == Q.front
hchan中的字段qcount记录了循环队列中数据的个数,因此可以在不浪费一个存储单元的情况下,解决队空或队满时存在的front==rear产生的二义性问题
- 队空:front==rear && qcount == 0
- 队满:front==rear && qcount ≠ 0
1.2 sendq&recvq
sendq和recvq分别表示阻塞在当前channel上的发送者goroutine和接收者goroutine。其实现为双链表,双链表对应的结构体为waitq,双链表中的节点为sudog。双链表结构体如代码【Code1.2 被阻塞在channel上的双链表】
type waitq struct{
first *sudog
last *sudog
}
双链表的节点是sudog,sudog是goroutine抽象出来的结构体,一个sudog代表一个在等待队列中的g。sudog主要记录了哪个协程正在等待;等待发送/接收的数据在哪里;等待的是哪个channel;因为g与同步对象关系是多对多的。一个g可以出现在许多等待队列上,即一个g访问多个同步对象,因此一个g可能有很多sudog。多个g可能正在等待同一个同步对象,因此一个对象的sendq和recvq可能有许多sudog。双链表中的节点sudog如代码【Code1.3 goroutine抽象出来的结构体】
//src/runtime/chan.go
type sudog struct{
g *g //记录哪个协程在等待
next *sudog
prev *sudog
elem unsafe.Pointer // 等待发送/接收的数据在哪里
...
c *chan //等待的是哪个channel
}
2.channel的创建
创建channel实际上就是在内存中实例化出一个hchan的结构体,并返回一个指向该结构体的指针,所以channel是引用类型。我们在使用channel时,在函数之间的传递的即为此指针。
2.1流程图
创建channel时主要分为两大块:边界检查和分配内存,分配内存的流程如下
- 如果是无缓冲channel,直接给hchan结构体分配内存并返回指针。
- 如果是有缓冲channel,但元素不包含指针类型,则一次性为hchan结构体和底层循环数组分配连续内存并返回指针。(需要连续内存空间)
- 如果是有缓冲channel,且元素包含指针类型,则分别分配hchan结构体内存和底层循环数组的内存并返回指针。(可以利用内存碎片)
2.2 makechan()源码
创建channel的主要实现是在makechan()函数中:
如代码【Code2.1 makechan()函数源码】
func makechan(t *chantype, size int) *hchan {
// 获取元素类型
elem := t.elem
// compiler checks this but be safe.
// 元素的大小必须小于64K
// 编译器已经检查了这一点,但是为了安全起见再次进行检查
if elem.size >= 1 maxAlign {
throw("makechan: bad alignment")
}
// 计算所需要的内存大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 检查是否溢出,所需内存是否超过maxAlloc - hchanSize 或者 size是否小于0
if overflow || mem > maxAlloc-hchanSize || size
2.3 小结
- 元素大小不能超过 65536 字节,也即 64K;
- 元素的对齐大小不能超过
maxAlign
也即 8 字节; - 计算出来的所需内存不能超过限制;
创建channel时的策略
- 如果是无缓冲channel,直接为hchan结构体分配内存并返回指针。
- 如果是有缓冲channel,但元素不包含指针类型,则一次性为hchan结构体和底层循环数组分配连续内存并返回指针。(需要连续内存空间)
- 如果是有缓冲channel,且元素包含指针类型,则分别分配hchan结构体内存和底层循环数组的内存并返回指针。(可以利用内存碎片)
3. 发送数据
3.1 总流程图
向channel中发送数据主要分为两大块:边界检查和数据发送,数据发送流程主要如下所示:
- 如果channel的读等待队列中存在接收者goroutine,则为同步发送:
- 无缓冲channel,不用经过channel直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。
- 有缓冲channel,但是元素个数为0,不用经过channel(假装经过channel)直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。
- 如果channel的读等待队列中不存在接收者goroutine:
- 如果底层循环数组未满,那么把发送者携带的数据入队队尾,此为异步发送
- 如果底层循环数组已满或者是无缓冲channel,那么将当前goroutine加入写等待队列,并将其挂起,等待被唤醒,此为阻塞发送
3.2 同步发送
3.2.1 流程图
3.2.2 源码
如代码【Code3.1 同步发送源码1】
// 加锁
lock(&c.lock)
......
// 从接收者队列recvq中取出一个接收者,接收者不为空的情况下,直接将数据传递给该接收者
if sg := c.recvq.dequeue(); sg != nil {
// c: channel
// sg:从recvq中取出来的接收者
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
如代码【Code3.2 同步发送源码2】
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 无缓存通道和有缓存通道的处理逻辑
if raceenabled {
// 无缓冲通道的处理逻辑
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
// 假装经过channel
// 相当于循环列表的rear指针向前进1
c.recvx++
// 队列数组中最后一个元素已经读取,则再次从头开始读取数据
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 将ep直接复制到接收者sg中
if sg.elem != nil {
// 复制数据到sg中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 接收者对应的goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 使接收者goroutine变成runnable状态,唤醒goroutine
goready(gp, skip+1)
}
// 将src值复制到dst中
func typedmemmove(typ *_type, dst, src unsafe.Pointer) {
if dst == src {
return
}
...
memmove(dst, src, typ.size)
...
}
- 调用sendDirect()函数将数据拷贝到接受变量的内存地址上
- 调用goready()将等待接收的阻塞goroutine的状态改变成Grunnable。下一轮调度时会唤醒这个接收的goroutine
3.3 异步发送
3.3.1 流程图
3.3.2 源码
如代码【Code3.3 异步发送源码】
// 缓冲队列中的元素个数小于队列的大小
// 说明缓冲队列中还有空间
if c.qcount
如果qcount还没有满,则调用chanbuf()获取sendx索引的元素指针值。调用typedmemmove()方法将发送的值拷贝到缓冲区buf中。拷贝完成,需要维护sendx索引下标值和qcount个数。这里将buf缓冲区设计成环形的,索引值如果到了队尾,下一个位置重新回到队头。
3.4 阻塞发送
3.4.1 流程图
3.4.2 源码
如代码【Code3.4 阻塞发送源码】
// 获取当前的goroutine,用于绑定给一个sudog
gp := getg()
// 获取一个sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 设置sudog发送的数据
mysg.elem = ep
mysg.waitlink = nil
// 设置sudog绑定的goroutine
mysg.g = gp
mysg.isSelect = false
// 设置sudog绑定的channel
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将发送者入队sendq
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
// 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel。
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 最后,KeepAlive()确保发送的值保持活跃状态,直到接收者将其复制出来
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
3.5 chansend()源码
发送数据的源码如【Code3.5 发送数据源码】
发送操作在编译时转换为 chansend
函数:
chansend
接收 4 个参数:
-
c
是一个指向hchan
类型的指针,表示要接收数据的通道; -
ep
是一个unsafe.Pointer
类型的指针,用于接收接收到的数据; -
block
表示接收操作的模式。如果block
为true
,为阻塞模式,即发送操作将会阻塞,直到有接收者接收元素;如果block
为false
,为非阻塞模式,即发送操作不会阻塞,如果通道已满,发送操作会立即返回; -
callerpc
:发送操作的调用者的程序计数器值。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
3.6 小结
向channel中发送数据主要分为两大块:边界检查和数据发送,数据发送流程主要如下所示:
- 如果channel的读等待队列中存在接收者goroutine,则为同步发送:
- 无缓冲channel,不用经过channel直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。
- 有缓冲channel,但是元素个数为0,不用经过channel(假装经过channel)直接将数据发送给第一个等待接收的goroutine,并将其唤醒等待调度。
- 如果channel的读等待队列中不存在接收者goroutine:
- 如果底层循环数组未满,那么把发送者携带的数据入队队尾,此为异步发送
- 如果底层循环数组已满或者是无缓冲channel,那么将当前goroutine加入写等待队列,并将其挂起,等待被唤醒,此为阻塞发送
4. 接收数据
4.1 总流程图
接收数据的流程主要由两大块组成:边界检查和接收数据,其中接收数据的处理逻辑如下。
- 如果 channel 的写等待队列存在发送者 goroutine,此为同步接收
- 如果是无缓冲 channel,直接从第一个发送者 goroutine 那里把数据拷贝给接收变量,唤醒发送的 goroutine;
- 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者 goroutine 的数据拷贝到 buf 循环数组队尾,唤醒发送的 goroutine;
- 如果 channel 的写等待队列不存在发送者 goroutine:
- 如果循环数组 buf 非空,将循环数组 buf 的队首元素拷贝给接收变量,此为异步接收
- 如果循环数组 buf 为空,将当前 goroutine 加入读等待队列,并挂起等待唤醒,此为阻塞接收
4.2 同步接收
4.2.1 流程图
4.2.2 源码
如代码【Code4.1 同步接收源码1】
// 如果发送队列中存在发送者,返回(true,true)
lock(&c.lock)
if sg := c.sendq.dequeue(); sg != nil {
// 找到一个等待的发送者,如果缓冲区大小为0,则直接接收发送者的值
// 否则,从队列头部接收,并将发送者的值添加到队列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
如代码【Code4.2 同步接收源码2】
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 创建的channel是无缓存buf
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 直接将数据从发送者复制过去,即直接接收发送者的数据
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 返回buf中待被接收的数据的指针
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// 复制队列中的数据给接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者的数据加入到buf中
typedmemmove(c.elemtype, qp, sg.elem)
// 更新recvx
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 更新sendx
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送者,下一轮参与调度
goready(gp, skip+1)
}
如代码【Code4.3 同步接收源码3】
//返回buffer中下标为i的指针
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
4.3 异步接受
4.3.1 流程图
4.3.2 源码
如代码【Code4.4 异步接收源码】
if c.qcount > 0 {
// 待接收数据的指针
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 直接从队列中接收数据
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 更新recvx
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 更新qcount
c.qcount--
// 释放锁
unlock(&c.lock)
// 返回(true,true)
return true, true
}
4.4 阻塞接收
4.4.1 流程图
4.4.2 源码
如代码【Code4.5 阻塞接收源码】
// 获取当前的goroutine,用于绑定给一个sudog
gp := getg()
// 返回一个sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// sudog绑定接收的数据
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
// sudog绑定当前的goroutine
mysg.g = gp
mysg.isSelect = false
// sudog绑定当前的channel
mysg.c = c
gp.param = nil
// 将接收者入队recvq中
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
// 挂起当前的goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
// 进行接收
return true, success
4.5 chanrecv()源码
接收操作在编译时转换为 chanrecv
函数。如代码【Code4.6 chanrecv()函数】
chanrecv
的参数跟 chansend
几乎一致,返回值有 2 个,分别是 selected
,received
。selected
表示是否执行了接收操作,received
表示是否成功收到了数据。
- 如果
selected = false
:表示没有进行接收操作; - 如果
selected = true
:表示进行了接收操作: - 如果
received = false
:表示虽然接收操作成功,但没有接收到实际的数据; - 如果
received = true
:表示接收操作成功,并且接收到实际的数据。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
4.6 小结
接收数据的流程主要由两大块组成:边界检查和接收数据,其中接收数据的处理逻辑如下。
- 如果 channel 的写等待队列存在发送者 goroutine,此为同步接收
- 如果是无缓冲 channel,直接从第一个发送者 goroutine 那里把数据拷贝给接收变量,唤醒发送的 goroutine;
- 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者 goroutine 的数据拷贝到 buf 循环数组队尾,唤醒发送的 goroutine;
- 如果 channel 的写等待队列不存在发送者 goroutine:
- 如果循环数组 buf 非空,将循环数组 buf 的队首元素拷贝给接收变量,此为异步接收
- 如果循环数组 buf 为空,将当前 goroutine 加入读等待队列,并挂起等待唤醒,此为阻塞接收
5. 关闭channel
关闭操作在编译时转换为closechan()函数
5.1 流程图
关闭channel的总体流程如下
- 边界检查
- 从recvq释放所有的readers
- 从sendq释放所有的writers(会产生panic)
- 唤醒所有的readers和writers
5.2 边界检查
5.2.1 源码
如代码【Code5.1 关闭Channel-异常检查源码】
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
关闭一个channel有2点需要注意,当Channel是一个nil空指针或者关闭一个已经关闭的channel时,Go语言运行时都会直接panic。上述2种情况都不存在时,即可标记channel状态为close。
5.3 释放readers和writers
5.3.1 释放readers源码
如代码【Code5.2 关闭Channel-释放readers源码】
// 创建待唤醒列表
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
回收接收者的 sudog。将所有的接收者 readers 的 sudog从等待队列(recvq)加入到待清除队列 glist 中。注意这里是先回收接收者。就算从一个 close 的 channel 中读取值,不会发生 panic,顶多读到一个默认零值。
5.3.2 释放senders源码
如代码【Code5.3 关闭Channel-释放senders源码】
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
再回收发送者 writers。回收步骤和回收接收者是完全一致的,将发送者的等待队列 sendq 中的 sudog 放入待清除队列 glist 中。注意这里可能会产生 panic。在第3章发送数据中分析过,往一个 close 的channel 中发送数据,会产生 panic,这里不再赘述。
5.4 协程调度
最后一步是更改goroutine的状态:唤醒所有readers和writers
如代码【Code5.4 关闭Channel-唤醒所有readers和writers源码】
最后会为所有被阻塞的 goroutine 调用 goready 触发调度。将所有 glist 中的 goroutine 状态从_Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。
5.5 closechan()源码
如代码【Code5.5 关闭Channel源码】
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
// 获取锁
lock(&c.lock)
// 如果通道已关闭,则panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
// 关闭channel
c.closed = 1
// 创建待唤醒列表
var glist gList
// 释放所有的接收者
for {
// 从接收者队列中出队一个等待的接收者
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
// 清空接收者携带的元素值
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
// 设置释放时间为当前的CPU时间
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将接受者的goroutine添加到待唤醒列表中
glist.push(gp)
}
// 释放所有的发送者(这些发送者将会panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 在释放通道锁之后,唤醒所有的goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
5.6 小结
关闭channel的流程如下:
- 异常检查
- 从recvq释放所有readers
- 从sendq释放所有writers(会产生panic)
- 唤醒所有readers和writers
6. QA
讨论是学习最有效的方式。接下来留几个QA,是我在学习Go channel底层实现过程中的一些疑问。大家可以在评论区一起展开头脑风暴。还望各路大神不吝赐教。
6.1 QA
Q:Go Channel不再被使用的时候,是否会被GC回收
A:Golang 的垃圾回收机制对于 Channel 也适用。如果一个 Channel 不再被任何 Goroutine 使用,那么它所占用的内存空间就可以被回收。Golang 的垃圾回收是自动进行的,不需要程序员手动操作。
func main() {
runtime.GC()
stats := runtime.MemStats{}
runtime.ReadMemStats(&stats)
println(stats.HeapInuse)
run()
runtime.GC()
runtime.ReadMemStats(&stats)
println(stats.HeapInuse)
}
func run() {
c := make(chan int, 10)
for i := 0; i
6.2 QA
Q:elemtype *_type表示channel中的元素类型。这个*_type在go语言里有哪些主要用处。
6.3 QA
Q:发送数据中的KeepAlive源码用途是什么?代码注释说是让数据保持活跃,这个活跃怎么理解?
6.4 QA
Q:无缓冲channel有哪些有缓冲channel无法替代的场景,用代码解答。
6.5 QA
Q:gopark goready是否涉及内核态切换?(这个偷懒不想看源码了)
文章来源于互联网:Golang分享(一):channel底层原理