未分类 · 2024年1月10日 0

Go 并发任务编排利器之 WaitGroup

Go 并发系列是根据我对晁岳攀老师的《Go 并发编程实战课》的吸收和理解整理而成,如有偏差,欢迎指正~

WaitGroup 是什么?

WaitGroup 是 Go 内置的 sync 包解决任务编排的并发原语。WaitGroup 直译是“等待组”,翻译成大白话就是等待一组协程完成任务。如果没有完成,就阻塞。

举个例子,我们要计算100万个数的和,并对这个和求根号。常规的思路肯定是先一个 for 循环计算总和,再开根号,但是这样效率很低。我们可以起1000个协程,每个协程计算1000个数的和,然后再对这些和求和,最后再开个根号。

这里有一个问题,计算根号的时候,需要等所有并发的协程都计算完才行,WaitGroup 就是解决等所有并发协程完成计算的问题的。

WaitGroup 的基本用法

WaitGroup 的用法很简单。标准库中的 WaitGroup 只有三个方法,分别是:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • Add:用来设置 WaitGroup 的计数值,delta 可正可负
  • Done:用来将 WaitGroup 的计数值减一,其实就是调用了 Add(-1)。
  • Wait:阻塞等待,直到 WaitGroup 的计数值变成0,进入下一步。

WaitGroup 的使用套路

使用 WaitGroup 的常规套路如下:

  1. 声明 WaitGroup 变量
  2. 执行 Add 方法。协程组的个数有 n 个,执行 Add(n)
  3. 协程组中,每个协程最后,执行方法 Done
  4. 在协程组后面,执行 Wait 方法

拿上面那个计算100万个数之和再开根号的场景做示例:

package main

import (
    "fmt"
    "math"
    "sync"
)

// 计算1000个数的和
func compute(m *sync.Mutex, wg *sync.WaitGroup, s, e int, count *int) {
    sum := 0
    for i := s; i  e; i++ {
        sum += i
    }
    m.Lock()
    *count += sum
    m.Unlock()
    wg.Done()
}

func main() {

    var m sync.Mutex
    var wg sync.WaitGroup

    var count int
    wg.Add(1000)
    for i := 0; i  1000; i++ {
        go compute(&m, &wg, i*1000+1, (i+1)*1000+1, &count)
    }
    wg.Wait()
    fmt.Println(math.Sqrt(float64(count)))
    return
}

为了图省事,我这里直接计算的使1-1000000的和。我们一起来看下这段代码。

第24行,先声明一个零值的 WaitGroup 变量。

第27行,因为这里开了1000个协程,所以执行 Add(1000)。

第28-30行,就是具体求和的部分。这里开了1000个协程求和,每个协程执行结束最后都会执行一次 Done() 函数,表示当前协程完成。

第31行执行 Wait() 函数,等待这1000个协程组完成任务。

看,WaitGroup 的使用确实很简单吧~~

WaitGroup 的实现原理

其实看完 WaitGroup 的使用示例后,我们就能大概猜到 WaitGroup 内部的实现原理:内部维护一个计数器,协程组中每完成一个任务,计数器减一,Wait() 函数中判断计数器的值是不是0,不是0的话,就阻塞,是的话,就进入下一步。

但是真的这么简单吗?

WaitGroup 的定义

先看下源码中 WaitGroup 的定义:

type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    state1 [3]uint32
}

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

先解释一下 noCopy 这个变量。其实这个变量和 WaitGroup 的实际功能没有一毛钱关系。它的实际作用是用来检测是否存在复制 WaitGroup 的情况。

还记得之前提过的 go vet 工具吗?这个工具可以检测并发原语有没有被复制,原理是对 Locker 接口进行静态检查。

noCopy 实现了 Locker 接口:

type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

所以通过 noCopy 这个变量,WaitGroup 也能被检查出来有没有被复制。

重点来了!!下面开始 state1 字段的解释!!

state1 不太好解释,所以我先把原始的英文注释放出来了,如果看不懂我解释的,就看注释吧。。。

state1 是一个长度为3的 int32 类型的数组,在64位系统下定义如下:


在32位系统下,如果 state1 不是64位对齐,定义如下:


注意,32位系统中如果 state1 是64位对齐,还是使用上面的那种分配方式。

先不管是32位系统还是64位系统,总之我们能看到,不考虑顺序,state1 的3个元素分别表示【waiter数】,【计数值】和【信号量】。这三个元素不解释你也能大概猜出来是什么意思,计数值就是协程组中运行着的协程的个数,waiter 是等待者的个数,因为一般只执行一次 Wait 函数,所以 waiter 数一般是1。这一步没有任何问题。

那为什么要区分64位系统还是32位系统呢?

这里有一个知识点:64位的原子操作需要64位对齐【手动加粗】。

在后面 Add 或者 Wait 的操作中,类似这样的操作: atomic.AddUint64(statep, uint64(delta) 都是64位的原子操作,意思是说这个加法是对64位的内存块直接操作的。所以像这种64位的原子操作,都要求64位对齐。

64位对齐是要求数据的起始地址是64的倍数。在64位的系统中没有问题,但是在32位的系统中,内存块最小是4个字节,只能保证32位对齐,有可能会出现 state1 的起始地址不是64的倍数,而是32的倍数,这种情况下,将 state1[0] 当成信号量,就能保证 state1[1-2] 是64位对齐。

Add 和 Done 实现逻辑

Add 实现源码(删减了部分异常校验的逻辑):

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    // 高32bit是计数值v,所以把delta左移32,增加到计数上
    state := atomic.AddUint64(statep, uint64(delta)32)
    v := int32(state >> 32) // 当前计数值
    w := uint32(state) // waiter count

    if v > 0 || w == 0 {
        return
    }

    // 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
    // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}


// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

一直到第8行之前,Add 部分的代码都很容易理解。获取 state 字段,拆成计数器和 waiter。delta 是多少,计数器就加多少。如果计数器的值大于0(其它协程已经执行了Add操作)或者 waiter 等于0(第一个 Add 操作),就直接进行下一步。

接下来的部分其实是执行 Done 之后的逻辑。看21-24行,Done 的实现就是 Add(-1)。

如果计数器等于0且 waiter 不等于0,说明这个时候需要唤醒 waiter 了。waiter 就是执行 Wait 然后阻塞的协程。

Wait 实现逻辑

Wait 的实现源码:

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // 当前计数值
        w := uint32(state) // waiter的数量
        if v == 0 {
            // 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
            return
        }
        // 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            // 阻塞休眠等待
            runtime_Semacquire(semap)
            // 被唤醒,不再阻塞,返回
            return
        }
    }
}

Wait 的逻辑也比较简单,如果计数器的值等于0,说明没有协程组的任务已经完成了,这个时候可以直接退出,继续执行后面的逻辑。如果计数器不等于0,就将 waiter 的数量加1,然后进入阻塞状态,等待被唤醒。

常规的用法中,我们只会调用一次 Wait,所以 waiter 的值就是1。其实如果有多个协程等待协程组的结果,也可以多次调用 Wait 函数。

WaitGroup 的易错场景

WaitGroup 的使用很简单,但是如果对 WaitGroup 的检查机制不清楚,误用了 WaitGroup,就会出现很多严重的问题。

下面让我们一起来看一下几种常见的错误场景。

问题一:计数器为负值

WaitGroup 的计数器的值必须大于等于0,不然就会出现 panic 错误。

一般情况下有两种场景会导致 WaitGroup 的计数器是负值。

一种是通过 Add() 函数直接将计数器设置为负值。比如上来就执行 Add(-1),程序会直接 panic,退出。这种情况比较简单,出错的可能性比较低。

另一种是 Done() 函数执行的太多,导致计数器变成负值。使用 WaitGroup 的正确方法是,首先确定 WaitGroup 的计数器的值(协程组的大小),然后调用相同次数的 Done()函数。如果 Done() 的调用次数过多,就会导致计数器的值为负,出现 panic。

当然,也不能少调用,否则会出现死锁。

问题二:不期望的 Add 时机

这个问题说的是所有 Add() 的执行需要确保在 Wait() 执行之前,否则,Add() 还没有执行,Wait() 先执行,判断出计数器值等于0,就直接进行下一步。

举个例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

func do(wg *sync.WaitGroup) {
    time.Sleep(1 * time.Second)
    fmt.Println("--------test---------")
    wg.Add(1)
    wg.Done()
}

func main() {
    var wg sync.WaitGroup

    for i:=0; i10; i++ {
        go do(&wg)
    }
    
    wg.Wait()
}

这段代码中,Add() 的执行就在每个最后,但是如果你复制一下这段代码,执行一下就会发现,终端不会打印任何字符。这是因为 Add() 还没来的及执行,计数器的值还是0,导致 Wait() 判断失误,程序提前结束。

修改的方法很简单,提前计算出有有多少个协程,然后在 for 循环前面一次性执行,或者在每个协程的一开始,也就是第10行前面,提前执行 Add(1)。

问题三:前一个 Wait() 还没有,就复用 WaitGroup

这个问题不太好理解。

首先 WaitGroup 是可以重用的。也就是说,只要 WaitGroup 的计数器的值变成0,那么它就可以看做是新创建的 WaitGroup,被重复使用。

但是重复使用前,一定要等之前的 Wait() 执行完毕,才能执行 Add() 或者
Done() 等函数,否则会报“不能重用的错误”。

比如下面这个例子:

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        time.Sleep(time.Millisecond)
        wg.Done() // 计数器减1
        wg.Add(1) // 计数值加1
    }()
    wg.Wait() // 主goroutine等待,有可能和第7行并发执行
}

这里虽然第6行将计数器置成了0,但是因为第9行的 Wait() 一直在循环检测,它就会和第7行的的 Add() 形成并发冲突。

所以,WaitGroup 虽然可以重用,但是必须等上一轮的 Wait() 执行完成之后,才能执行 Add()/Done() 等操作。否则,可能会出先 panic,提示 sync: WaitGroup is reused before previous Wait has returned

原创不易,欢迎大家关注我的公众号【码农的自由之路】,左手代码,右手理财,996的码农也想自由~

都看到这里了,不如顺手点个 赞/喜欢?


有个奇怪的现象:技术类的文章收藏数远远高于评论数,但是有多少收藏之后会阅读呢?所以有问题大家尽管评论哈,我尽量解答~~

文章来源于互联网:Go 并发任务编排利器之 WaitGroup

打赏 赞(0) 分享'
分享到...
微信
支付宝
微信二维码图片

微信扫描二维码打赏

支付宝二维码图片

支付宝扫描二维码打赏

文章目录