V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
tim0991
V2EX  ›  Go 编程语言

golang 中多个协程池如何优雅退出

  •  1
     
  •   tim0991 · 2021-08-07 17:50:47 +08:00 · 2719 次点击
    这是一个创建于 990 天前的主题,其中的信息可能已经有所发展或是发生改变。

    假设有三个协程池(A,B,C), 三个分别处理不一样的业务且每个协程池中的 worker 数量不一致

    当前数据流向为 A => B => C,任务在任一协程池中都有出现 err 导致该任务跳过的可能

    在如下两种情况下:

    • 进入 A 协程 100 个任务,任务不会继续向下拆分,最后从 C 出来最多也就是 100 个
    • 进入 A 协程 100 个任务,后续每个协程池中都会继续拆分任务,比如转到 B 就是 500 个,继续转到 C 就是 1W 个

    最后在主程序中,针对上述两种情况,有没有优雅的办法知道任务已经全部完成且让主程序退出

    第 1 条附言  ·  2021-08-07 22:29:11 +08:00
    针对情况 2 可以举例:

    从热门微博抓到前 100 个用户 id
    => A (抓每个用户前 500 条微博)
    => B (抓取每条微博前 1W 条评论)
    => C (获取评论用户的地址、性别、头像、年龄等)

    其中可能的情况是,每个用户可能并不一定有 500 条微博,每条微博也并不一定有 1W 条评论
    23 条回复    2021-08-09 11:36:14 +08:00
    kwanzaa
        1
    kwanzaa  
       2021-08-07 17:59:13 +08:00
    怎么听着像是 WaitGroup 该干的事儿
    tim0991
        2
    tim0991  
    OP
       2021-08-07 17:59:39 +08:00
    如果用 sync.WaitGroup 只适合情况 1,且需要在 err 出现的时候考虑到如何 done 好像挺恶心的
    Trim21
        3
    Trim21  
       2021-08-07 18:09:28 +08:00
    Sasasu
        4
    Sasasu  
       2021-08-07 18:10:40 +08:00
    激进:在每个任务的事件循环,每一轮循环都检测一下某个标识退出的全局变量是不是该退出了。如果没有循环那就每个阻塞操作都设超时,超时或完成时检测这个标识。
    保守:关掉 A 的入口,然后当协程池工作线程 = 0 时计数 +1,计数等于协程池数量时退。

    如果你要 Go 风格的话,起手一个全局 hash 表(带锁),里面 K 是每个 context 的指针。

    激进:锁表,每个 context 都发 cancel,然后退出。
    保守:关掉 A 的入口,轮训这个 hash 表,内容物为 0 时退出。

    另外两个奇葩方案。

    不想思考:关掉 A 的入口,sleep 一个所有任务超时的事件,退出。
    做 CDN 的:用 dlopen 打开备用的 .so ,切工作函数。按照激进策略等旧的 .so 不再运行,释放旧 .so 然后移动新的 .so 。
    tim0991
        5
    tim0991  
    OP
       2021-08-07 22:20:45 +08:00
    @Trim21 谢谢回复 这个包可以比较好的处理情况 1 中 err 的部分,但是无法处理情况 2
    tim0991
        6
    tim0991  
    OP
       2021-08-07 22:33:55 +08:00
    @Sasasu 谢谢你提供的思路,但是对你说的内容有些疑问
    > 然后当协程池工作线程 = 0 时计数 +1

    如何知道某个协程中任务已经空了?
    Trim21
        7
    Trim21  
       2021-08-07 23:44:14 +08:00
    @tim0991 #5 2 这种有拆分的应该是用 context 吧。。。
    CEBBCAT
        8
    CEBBCAT  
       2021-08-08 01:25:47 +08:00
    看不太懂,为什么不让 MQ 做消息转接呢?
    Mitt
        9
    Mitt  
       2021-08-08 01:45:23 +08:00   ❤️ 1
    Context + WaitGroup 不是可以实现么,向下传递 Context 并附加 WaitGroup,每级往 WaitGroup 里添加真实的需要采集的数量并在出错的时候 defer wg.Done 并添加到 Context 中的 ErrArray,或者 errgroup,外层 wait 就好了,如果想让程序提前终止,就可以用带 cancel 的 context 从最外层一关,里面的所有任务就都会收到通知了
    tim0991
        10
    tim0991  
    OP
       2021-08-08 09:50:24 +08:00
    @Mitt @Trim21 context 的只是起到超时或者从上到下关闭作用,并不能感知到任务是否完成吧

    倒是全局使用一个 waitgroup 可以解决,只是全局变量到处飞略微有些蛋疼。。。
    lesismal
        11
    lesismal  
       2021-08-08 10:47:24 +08:00
    难道不是

    wg.Add(1)
    pool.Go(func(){
    defer wg.Done()
    ....
    })

    吗?
    tim0991
        12
    tim0991  
    OP
       2021-08-08 11:26:16 +08:00
    @lesismal 是倒是是 就是 pool 会有多个
    lesismal
        13
    lesismal  
       2021-08-08 12:11:30 +08:00
    @tim0991 你只要是整体的别落下 add done,每个 Go 是一个,不管 Go 的 func 里有没有失败跳过,只要 defer done 了就能确保 wg.wait 正常结束,所以,好像不应该有这个困扰
    securityCoding
        14
    securityCoding  
       2021-08-08 13:02:58 +08:00 via Android
    应该是想要 completefuture 那种?
    tim0991
        15
    tim0991  
    OP
       2021-08-08 14:43:40 +08:00
    @lesismal 但是这样其实也有一个问题,就是你上面的代码示例中,协程数量是不固定的,那要是固定的协程数量怎么写比较优雅一点?
    Mitt
        16
    Mitt  
       2021-08-08 14:46:19 +08:00
    @tim0991 #10 context 是上下文啊,用来把 waitgroup 传递下去的,让不管多下层都能用同一个 waitgroup 并且能通过 context 感知上层要主动关闭 比如超时,而上层也可以通过 waitgroup 来感知下层任务是否已经完成,不管你下层开多少个协程,你每一层都是知道数量的,添加进去就好了
    lesismal
        17
    lesismal  
       2021-08-08 16:42:49 +08:00
    @tim0991 协程池本身就应该自带控制协程数量的属性,否则协程池还不如直接 go 。你看我上面写的也是 pool.Go
    lesismal
        18
    lesismal  
       2021-08-08 16:52:46 +08:00
    @lesismal @tim0991 #17 只要你生产速度大于协程池消费速度,一样能充分利用这些数量的协程并发。最简单的实现,一个带容量的 chan,生产者往 chan 里写,多个协程去读,当前协程都忙、就被 chan 缓冲了、发送数量大于协程数量和 chan size 生产者就阻塞,这些细节看你怎么设计,姿势太多了,我这个库里就有好几种定制的,以前有些特殊的 hash 和时序需要所以没用其他三方的:
    https://github.com/lesismal/nbio/tree/master/taskpool

    其他第三方的也很多
    nuk
        19
    nuk  
       2021-08-08 17:14:54 +08:00
    给协程加 id,有任务就塞 map,ABC 三个 map 都空了不就表示任务干完了。。
    tim0991
        20
    tim0991  
    OP
       2021-08-08 17:33:50 +08:00
    @Mitt @lesismal 谢谢你们的回复和思路
    tim0991
        21
    tim0991  
    OP
       2021-08-08 17:34:14 +08:00
    @nuk 和四楼的一个意思 单还是谢谢回复:)
    zhengxiaowai
        22
    zhengxiaowai  
       2021-08-09 10:06:25 +08:00
    先说结论:要在 main 中感知任务的完成状态可以通过 chan,控制数据流转也是用 chan,如果有超时机制或者每个任务属性之类数据需要用到 context 。

    1 )步骤大约是
    main 函数中创建三个用于接受结果的 A B C chan,大小为 A 任务个数,和一个 notify chan 用于完成通知,再来一个通知主程序关闭的 doneChan

    起一个 goruntine 去 for notify chan
    go func() {
    for a := range nChan {
    if (ACount == BCount == CCount == 100) {
    doneChan <- struct{}
    }

    }
    }


    起三个 goruntine 去 for 分别处理 A B C chan,里面写处理逻辑,大约是这样子
    go func(nChan chan Notify) {
    for a := range AChan {
    // dosomething

    if ok {
    bChan <- weiboEntryID
    } else {
    nChan <- weiboEntryID
    }
    }
    close(bChan)
    }(nChan)

    go func(chan Notify) {
    for b := range BChan {
    // dosomething

    if ok {
    cChan <- commentID
    } else {
    nChan <- commentID
    }
    }
    close(cChan)
    }(nChan)

    go func(chan Notify) {
    for c := range CChan {
    // dosomething

    if ok {
    cChan <- commentUserID
    } else {
    nChan <- commentUserID
    }
    }
    }(nChan)

    这时候就 main 中可以往 AChan 里写数据了,写完直接 close AChan,然后直接用 doneChan 阻塞

    for userID := range userIDs {
    AChan <- userID
    }
    close(aChan)
    <-doneChan


    大致流程就是这样,需要注意的是需要正确关闭 ABC chan,就是在发送完成后关闭,nChan 用于任务信息回收还可以用于任务回放,另外为了保证每个任务都会返回,需要弄一个 timeout context 超时当作失败处理

    2 )第二种情况,由于任务个数是未知的,上面的 100 就不能用了,有两种方式可以解决,一是可以预先知道每次数量,用另外的一个 chan 再任务开始时候传给发送 doneChan 那个 goruntine,把 100 改成最后总数即可。二是未知任务数量,这时候只能再来一个 chan,在做完 ABC 后发送一个 aDone 、bDone 、cDone,给发送 doneChan 那个 goruntine,当确认三个都完成后,就可以发送 done 了。

    ------

    code 硬敲的可能很多不对,变量和具体结构可酌情改变 :-)
    tim0991
        23
    tim0991  
    OP
       2021-08-09 11:36:14 +08:00
    @zhengxiaowai 谢谢回复 我还是觉得上面说的由协程池中的任务数量来判断是否已经完成简单一点
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2835 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 15:34 · PVG 23:34 · LAX 08:34 · JFK 11:34
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.