做一个项目。需要多携程并发处理,但是互相影响。携程3的 可以暂停 携程1、2 的并发池 ,同时 2 也要暂停携程1的并发池,所以自己 写了个小任务池。
目前还在测试项目,所以不排除有坑,自测。
支持并发任务限制、可嵌套暂停恢复(确保恢复执行,否则死锁),提交任务是阻塞的,为了避免提交N个任务携程,所以阻塞放在携程外面了。 没做暂停阻塞等待已提交任务执行完毕,因为我的项目用不上。
package concurrency
import (
“sync”
)
type Pool struct {
needPause bool // 是否支持暂停
sem chan struct{}
mu sync.Mutex
cond *sync.Cond
pauseCnt int
wg sync.WaitGroup
}
func NewPool(max int, needPause bool) *Pool {
p := &Pool{
sem: make(chan struct{}, max),
needPause: needPause,
}
p.cond = sync.NewCond(&p.mu)
return p
}
// 暂停池:所有任务在执行前会阻塞
func (p *Pool) Pause() {
p.mu.Lock()
p.pauseCnt++
p.mu.Unlock()
}
// 恢复池:唤醒所有阻塞的任务
func (p *Pool) Resume() {
p.mu.Lock()
if p.pauseCnt > 0 {
p.pauseCnt–
}
if p.pauseCnt == 0 {
p.cond.Broadcast()
}
p.mu.Unlock()
}
// 等待任务完成
func (p *Pool) Wait() {
p.wg.Wait()
}
// 提交任务
func (p *Pool) Submit(task func()) {
p.wg.Add(1) // 保证 Wait() 不会提前退出
if p.needPause {
p.mu.Lock()
for p.pauseCnt > 0 {
p.cond.Wait()
}
p.mu.Unlock()
}
p.sem <- struct{}{}
// 3. 异步执行
go func() {
defer func() {
<-p.sem
p.wg.Done()
}()
task()
}()
}
测试范例
// 初始化池,最大并发 2,开启暂停功能
p := concurrency.NewPool(2, true)
var wg sync.WaitGroup
fmt.Println(“— 步骤 1: 触发三层嵌套暂停 —“)
p.Pause() // 1层
p.Pause() // 2层
p.Pause() // 3层
fmt.Println(“池子已进入 3 层暂停状态”)
fmt.Println(“— 步骤 2: 异步提交任务 —“)
wg.Add(1)
go func() {
defer wg.Done()
start := time.Now()
fmt.Println(“[Task] 尝试提交任务…”)
p.Submit(func() {
fmt.Printf(“[Task] 任务真正开始执行,等待了 %v\n”, time.Since(start).Round(time.Second))
})
fmt.Println(“[Task] Submit 函数已返回”)
}()
// 给一点时间确保 Submit 已经卡在 cond.Wait
time.Sleep(1 * time.Second)
fmt.Println(“— 步骤 3: 逐层恢复 —“)
// 第 1 次恢复
p.Resume()
fmt.Println(“执行 Resume (1/3)… 任务应继续阻塞”)
time.Sleep(1 * time.Second)
// 第 2 次恢复
p.Resume()
fmt.Println(“执行 Resume (2/3)… 任务应继续阻塞”)
time.Sleep(1 * time.Second)
// 第 3 次恢复
fmt.Println(“执行 Resume (3/3)… 任务应该被唤醒了!”)
p.Resume()
// 等待任务执行完毕
p.Wait()
wg.Wait()
fmt.Println(“— 步骤 4: 测试并发阻塞 —“)
// 验证 Submit 是否真的会因为 sem 满而阻塞
p.Submit(func() { time.Sleep(2 * time.Second); fmt.Println(“并发任务 A 完成”) })
p.Submit(func() { time.Sleep(2 * time.Second); fmt.Println(“并发任务 B 完成”) })
done := make(chan struct{})
go func() {
fmt.Println(“尝试提交任务 C (池子已满,应在此阻塞)…”)
p.Submit(func() { fmt.Println(“并发任务 C 完成”) })
close(done)
}()
select {
case <-done:
fmt.Println(“错误:任务 C 没被阻塞就返回了!”)
case <-time.After(1 * time.Second):
fmt.Println(“预期:任务 C 正在阻塞中…”)
}
p.Wait()
fmt.Println(“=== 所有测试完成 ===”)