Go Channel (底层实现逻辑)
Go Channel (底层实现逻辑)
通道(channel)是Go语言中提供协程间通信的独特方式,传统的多线程编程比较困难,常常需要开发者了解一些底层的细节(例如互斥锁、条件变量及内存屏障等)。而通过通道交流的方式,Go语言屏蔽了底层实现的诸多细节,使得并发编程更加简单快捷。将通道作为Go语言中的一等公民,是Go语言遵循CSP并发编程模式的结果,这种模型最重要的思想是通过通道来传递消息。同时,通道借助Go语言调度器的设计,可以高效实现通道的堵塞/唤醒,进一步实现通道多路复用的机制。
不要通过共享内存来通信,通过通信来共享内存。
——罗勃·派克
Channel读取、接收数据流程

Channel源代码分析
channel运行时数据结构
其中sendx指向了当前buf(通道的缓冲区)中的发送序号,若sendx等于dataqsiz(通道循环队列)的大小,则表示当前buf中已经被置满,此时sendx置0,开启下一轮向buf的循环写入。其中recvx指向了当前buf中的接收序号,若recvx等于dataqsiz(通道循环队列)的大小,则表示当前buf中最后一个数据已经被取出,此时recvx置0,开启下一轮从buf的循环读取。
// 通道在运行时是一个特殊的hchan结构体
type hchan struct {// 当前通道队列中的数据个数qcount uint // total data in the queue// 循环队列的大小dataqsiz uint // size of the circular queue// 数据缓冲区// 写入时:如果读取等待队列中没有正在等待的协程,但是该通道是带缓冲区的,并且当前缓冲区没有满,则向当前缓冲区中写入当前元素。// 读取时:如果队列中没有正在等待写入的协程,但是该通道是带缓冲区的,并且当前缓冲区中有数据,则读取该缓冲区中的数据,并将数据写入当前的读取协程中。buf unsafe.Pointer // points to an array of dataqsiz elements// 通道中元素的大小elemsize uint16// 通道是否关闭closed uint32// 通道元素的类型elemtype *_type // element type// 记录buf中的发送序号sendx uint // 记录buf中的接收序号recvx uint // 读取的堵塞协程队列,每个协程对应一个sudog结构,它是对协程的封装,包含了准备获取的写成中的元素指针等// 当通道无缓冲区或者当前缓冲区没有数据则代表当前协程的sudog结构需要放入recvq链表末尾,并且当前协程陷入休眠状态,等待被唤醒重新执行// 写入时:当有读取的协程正在等待时,直接从该协程链表中,获取第一个协程,并将元素直接复制到对应的协程中,同时唤醒被堵塞的协程recvq waitq // 写入的阻塞协程队列,// 若当前通道无缓冲区或者当前缓冲区已满,则代表当前协程的sudog结构需要放入sendq链表末尾中,并且当前协程陷入休眠状态,等待被唤醒重新执行// 读取时:当有等待写入的协程时,直接从等待的写入的协程链表中获取第一个协程,并将写入的元素直接复制到当前协程中,同时唤醒被堵塞的写入协程sendq waitq // 锁,并发保护lock mutex
}
channel运行时数据结构hchan的创建
// chan 初始化函数 t 表示通道类型,size 代表通道中元素的大小。
// 当分配的大小为0时,只用在内存中分配hchan结构体大小即可
// 当通道的元素中不包含指针时,连续分配hchan结构体大小+size元素大小,当通道的元素中包含指针时,需要单独分配内存空间,
// 因为当元素中包含指针时,需要单独分配空间才能正常进行垃圾回收
func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}// 计算需要为通道中元素分配的大小mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}var c *hchanswitch {case mem == 0:// 无缓冲通道// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// 有缓冲,元素为非指针类型// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// 有缓冲,元素为指针类型// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}// 通道元素的大小c.elemsize = uint16(elem.size)// 通道元素的类型c.elemtype = elem// 通道中循环队列的大小c.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}
向Channel发送数据
- 向一个nil channel发送数据会导致当前g的永久阻塞。
- 向已关闭的channel发送数据会触发panic。
- 若当前channel的接收者等待队列(recvq)中有等待者,向该等待者g发送数据,并唤醒该g。
- 若当前缓冲区未满且接收者等待队列(recvq)中无等待者,将数据放入buf中。
- 向缓冲已满或者为无缓冲channel发送数据,当前g会阻塞在,channel上,并等待接收者唤醒该g。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 向一个空channel发送数据if c == nil {// 如果block为false,则协程不进入休眠状态if !block {return false}//当前g进入休眠 g->g0gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}............// 部分race代码省略............// 若当前缓冲区已满,但是设置block=false,则直接返回if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 向一个已关闭的通道发送数据,触发错误if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 有正在等待的读取协程,即接收等待队列存在sudog,则直接将值复制到该协程,并唤醒该协程if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 若当前buf中的元素数量,小于缓冲区总大小,将要发送的元素排入队列if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.// 从buf中获取当前sendx指向的地址qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}// 将数据ep指针指向的内存,复制值到qp(sendx当前指向的内存)typedmemmove(c.elemtype, qp, ep)// sendx后移c.sendx++// 若队列满了,将sendx置0,以防止下一次写入0号位置,开始循环利用空间,再次写入数据将陷入等待,直到0号位置被取出后,才能继续写入if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 若当前chan缓冲区已满,且不需要阻塞当前g(发送方)if !block {unlock(&c.lock)return false}// 缓冲区已满,且block(是否阻塞)=true, 则阻塞当前g// Block on the channel. Some receiver will complete our operation for us.gp := getg()// 从当前p中获取一个sudog结构体mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// 设置sudog状态,同时将sudog结构体绑定至当前gmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将sudog结构体,放入当前channel的发送等待队列中c.sendq.enqueue(mysg)// 标记当前g阻塞在channel上atomic.Store8(&gp.parkingOnChan, 1)// chanparkcommit g唤醒时调用的函数,用来恢复g的状态,当前g进入休眠状态,执行到此处,当前g已经被切换至g0了,后续的代码是在,// 等待着接收者g将其唤醒gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// 当前g被唤醒时从此处开始执行后续的回收操作// 设置数据ep为可达数据,即能够被gc回收KeepAlive(ep)// someone woke us up.// 当前g在休眠过程中有其他g修改了当前g的waiting(sudog)if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nil// 将sudog结构回收到当前p的sudogCache中releaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
从Channel中接收数据
- 从nil channel中接收数据,当前g将被永久阻塞。
- 从关闭的channel中接收数据,将会读取零值。
- 若当前channel的发送者等待队列(sendq)中有等待者,从该等待者g接收数据,并唤醒该g。
- 若当前等待发送队列(sendq)队列中无发送等待者,且buf中有数据,从buf中读取数据。
- 若当前channel的buf中无数据或当前channel为无缓冲channel,阻塞当前接收者g,并等待发送者g唤醒该g。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 若当前channel为nilif c == nil {if !block { // 调用函数时,设置了不阻塞g,直接返回 false,falsereturn}// 阻塞ggopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// 若通道已经关闭,返回false,falseif atomic.Load(&c.closed) == 0 {return}if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 通道已关闭if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {// 清除接收的内存,也就是从一个closed的channel中读取数据为零值typedmemclr(c.elemtype, ep)}return true, false}// 若有等待写入的协程,从等待队列中取出第一个sudog,if sg := c.sendq.dequeue(); sg != nil {// 将数据复制啊到该协程,并唤醒该协程recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// 若当前channel中buf中有数据if c.qcount > 0 {// Receive directly from queue// 从buf中读取recvx处的数据qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}// 若读取地址不为空if ep != nil {// 将pq指针指向的队列中的内存数据,复制到接收地址eptypedmemmove(c.elemtype, ep, qp)}// 清空qp指向的内存数据typedmemclr(c.elemtype, qp)c.recvx++// 若接收序号已经接收了一轮,则将序号置0if c.recvx == c.dataqsiz {c.recvx = 0}// 一个数据被取出,计数-1c.qcount--unlock(&c.lock)return true, true}// 若当前buf中没有数据if !block {// 若block等于false,即使没有数据,当前g立即返回,不进行阻塞unlock(&c.lock)return false, false}// 将当前g阻塞到channel上// no sender available: block on this channel.gp := getg()// 从当前p的sudogCache中获取一个sudog数据结构mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// 设置sudog状态,并将sudog与当前g互相绑定,与当前channel绑定mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 将sudog放入接收队列中c.recvq.enqueue(mysg)// 标记当前g的休眠状态atomic.Store8(&gp.parkingOnChan, 1)// 挂起当前G,等待着发送g将其唤醒gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}
**原创不易^ _ ^,本文内容可转载,请附本文链接谢谢。**
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
