贴一个GO自用的任务池,控制并发任务,可嵌套暂停恢复

做一个项目。需要多携程并发处理,但是互相影响。携程3的 可以暂停 携程1、2 的并发池  ,同时 2 也要暂停携程1的并发池,所以自己 写了个小任务池。
目前还在测试项目,所以不排除有坑,自测。
支持并发任务限制、可嵌套暂停恢复(确保恢复执行,否则死锁),提交任务是阻塞的,为了避免提交N个任务携程,所以阻塞放在携程外面了。 没做暂停阻塞等待已提交任务执行完毕,因为我的项目用不上。
package concurrency
import (
    “sync”
)
type Pool struct {
    needPause bool // 是否支持暂停
    sem       chan struct{}
    mu        sync.Mutex
    cond      *sync.Cond
    pauseCnt  int
    wg        sync.WaitGroup
}
func NewPool(max int, needPause bool) *Pool {
    p := &Pool{
        sem:       make(chan struct{}, max),
        needPause: needPause,
    }
    p.cond = sync.NewCond(&p.mu)
    return p
}
// 暂停池:所有任务在执行前会阻塞
func (p *Pool) Pause() {
    p.mu.Lock()
    p.pauseCnt++
    p.mu.Unlock()
}
// 恢复池:唤醒所有阻塞的任务
func (p *Pool) Resume() {
    p.mu.Lock()
    if p.pauseCnt > 0 {
        p.pauseCnt–
    }
    if p.pauseCnt == 0 {
        p.cond.Broadcast()
    }
    p.mu.Unlock()
}
// 等待任务完成
func (p *Pool) Wait() {
    p.wg.Wait()
}
// 提交任务
func (p *Pool) Submit(task func()) {
    p.wg.Add(1) // 保证 Wait() 不会提前退出
    if p.needPause {
        p.mu.Lock()
        for p.pauseCnt > 0 {
            p.cond.Wait()
        }
        p.mu.Unlock()
    }
    p.sem <- struct{}{}
    // 3. 异步执行
    go func() {
        defer func() {
            <-p.sem
            p.wg.Done()
        }()
        task()
    }()
}

测试范例

// 初始化池,最大并发 2,开启暂停功能
        p := concurrency.NewPool(2, true)
        var wg sync.WaitGroup
        fmt.Println(“— 步骤 1: 触发三层嵌套暂停 —“)
        p.Pause() // 1层
        p.Pause() // 2层
        p.Pause() // 3层
        fmt.Println(“池子已进入 3 层暂停状态”)
        fmt.Println(“— 步骤 2: 异步提交任务 —“)
        wg.Add(1)
        go func() {
            defer wg.Done()
            start := time.Now()
            fmt.Println(“[Task] 尝试提交任务…”)
            p.Submit(func() {
                fmt.Printf(“[Task] 任务真正开始执行,等待了 %v\n”, time.Since(start).Round(time.Second))
            })
            fmt.Println(“[Task] Submit 函数已返回”)
        }()
        // 给一点时间确保 Submit 已经卡在 cond.Wait
        time.Sleep(1 * time.Second)
        fmt.Println(“— 步骤 3: 逐层恢复 —“)
        // 第 1 次恢复
        p.Resume()
        fmt.Println(“执行 Resume (1/3)… 任务应继续阻塞”)
        time.Sleep(1 * time.Second)
        // 第 2 次恢复
        p.Resume()
        fmt.Println(“执行 Resume (2/3)… 任务应继续阻塞”)
        time.Sleep(1 * time.Second)
        // 第 3 次恢复
        fmt.Println(“执行 Resume (3/3)… 任务应该被唤醒了!”)
        p.Resume()
        // 等待任务执行完毕
        p.Wait()
        wg.Wait()
        fmt.Println(“— 步骤 4: 测试并发阻塞 —“)
        // 验证 Submit 是否真的会因为 sem 满而阻塞
        p.Submit(func() { time.Sleep(2 * time.Second); fmt.Println(“并发任务 A 完成”) })
        p.Submit(func() { time.Sleep(2 * time.Second); fmt.Println(“并发任务 B 完成”) })
        done := make(chan struct{})
        go func() {
            fmt.Println(“尝试提交任务 C (池子已满,应在此阻塞)…”)
            p.Submit(func() { fmt.Println(“并发任务 C 完成”) })
            close(done)
        }()
        select {
        case <-done:
            fmt.Println(“错误:任务 C 没被阻塞就返回了!”)
        case <-time.After(1 * time.Second):
            fmt.Println(“预期:任务 C 正在阻塞中…”)
        }
        p.Wait()
        fmt.Println(“=== 所有测试完成 ===”)

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注