[自建轮]高性能Goroutine Pool
高性能Goroutine Pool go调度器没有限制对goroutine的数量,在goroutine瞬时大规模爆发的场景下来不及复用goroutine从而导致大量goroutine被创建,会导致大量的系统资源占用,尝试池化。 go调度器本身不应该对goroutine数量有限制,因为语言层面无法界定需要限制多少,毕竟程序跑在不同性能的环境,在并发规模不太大的场景做限制甚至会降低性能,原生支持限制goroutine数量无疑是得不偿失的。如果只是中等规模和比较小规模的并发场景其实pool的性能并没有优势 目前设计上还需要加上周期性对空闲队列的prune,等写完再加看看benchmark会提升多少。目前来说对大规模goroutine异步并发的场景(1M, 10M)内存优化(10倍往上)和吞吐量优化效果(2-6倍)非常好。 需求场景与目标 限制并发goroutine的数量 复用goroutine,减轻runtime调度压力,提升程序性能 规避过多的goroutine创建侵占系统资源,cpu&内存 关键技术 锁同步: golang有CAS机制,用spin-lock替代mutex 原理, 讨论 LIFO/FIFO队列: LIFO队列能直接有时间排序功能,方便对需要关联入队时间的操作进行处理 Pool容量限制和弹性伸缩 代码实现 pool.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 package go_pool import ( "errors" "sync" "sync/atomic" "time" ) const( OPEN = iota CLOSED ) var ( ErrPoolClosed = errors.New("this pool has been closed") ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") ErrInvalidExpiryTime = errors.New("invalid expiration time") ErrInvalidPoolCapacity = errors.New("invalid pool capacity") DefaultScanInterval = time.Second ) type Pool struct { capacity int32 running int32 lock sync.Locker scanDuration time.Duration blockingTasksNum int maxBlockingTasks int state int32 cond *sync.Cond workers WorkerQueue // LIFO queue workerCache sync.Pool } func (p *Pool) Submit(task func()) error{ if atomic.LoadInt32(&p.state) == CLOSED{ return ErrPoolClosed } // retrieve worker to do the task // return error if no workers available var w *Worker if w = p.retrieveWorker(); w == nil{ return ErrPoolOverload } w.task <- task return nil } func (p *Pool) Shutdown() { atomic.StoreInt32(&p.state, CLOSED) p.lock.Lock() // reset worker queue p.workers.reset() p.lock.Unlock() } func (p *Pool) isClosed() bool{ return atomic.LoadInt32(&p.state) == CLOSED } // change the capacity of the pool func (p *Pool) Resize(size int){ if p.Cap() == size{ return } atomic.StoreInt32(&p.capacity, int32(size)) // need to stop certain workers if #running_workers > #new_capacity diff := p.Running() - size if diff > 0{ for i := 0; i< diff; i++{ p.retrieveWorker().task <- nil } } } func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPEN){ // initialize the purging go routine go p.scavengerRoutine() } } func (p *Pool) Running() int{ return int(atomic.LoadInt32(&p.running)) } func (p *Pool) Cap() int{ return int(atomic.LoadInt32(&p.capacity)) } func (p *Pool) Free() int{ return p.Cap() - p.Running() } func (p *Pool) incRunning(){ atomic.AddInt32(&p.running, 1) } func (p *Pool) decRunning(){ atomic.AddInt32(&p.running, -1) } // put the worker back into the pool for recycling func (p *Pool) recycleWorker(worker *Worker) bool{ capacity := p.Cap() if p.isClosed() || (capacity >= 0 && p.Running() > capacity){ return false } worker.recycleTime = time.Now() p.lock.Lock() // need to double check if state is CLOSED if p.isClosed(){ p.lock.Unlock() return false } err := p.workers.add(worker) if err != nil{ p.lock.Unlock() return false } // notify any request stuck in retrieveWorker that there is an available worker in pool p.cond.Signal() p.lock.Unlock() return true } func (p *Pool) spawnWorker() *Worker{ worker := p.workerCache.Get().(*Worker) worker.Run() return worker } func (p *Pool) retrieveWorker() (worker *Worker){ p.lock.Lock() worker = p.workers.detach() // get worker from queue successfully if worker != nil{ p.lock.Unlock() }else if capacity := p.Cap();capacity == -1{ p.lock.Unlock() // spawn worker return p.spawnWorker() }else if p.Running() < capacity{ // infinite pool p.lock.Unlock() // spawn worker return p.spawnWorker() }else{ // if the number of blocking tasks reaches the maximum blocking tasks threshold then returns nil // and throw the ErrPoolOverload error in Submit method if p.maxBlockingTasks != 0 && p.maxBlockingTasks <= p.blockingTasksNum{ p.lock.Unlock() return } // the pool is full need to wait until worker is available for task handling Retry: // handle the number of blocking task handling requests // wait until condition being notified p.blockingTasksNum++ p.cond.Wait() p.blockingTasksNum-- // ensure there is a worker available because you don't know if the recycled worker being closed then if p.Running() == 0{ p.lock.Unlock() // spawn worker return p.spawnWorker() } worker = p.workers.detach() if worker == nil{ goto Retry } p.lock.Unlock() } return } func (p *Pool) scavengerRoutine(){ heartbeat := time.NewTicker(p.scanDuration) defer heartbeat.Stop() for range heartbeat.C{ if p.isClosed(){ break } // all workers get cleaned up and some invokers still get stuck on cond.Wait() // we need to wake up all invokers in that situation. if p.Running() == 0{ p.cond.Broadcast() } } } func NewPool(capacity int)(*Pool, error){ if capacity <= 0{ capacity = -1 } pool := &Pool{ capacity: int32(capacity), lock: NewSpinLock(), } pool.workerCache.New = func() interface{}{ return &Worker{ pool: pool, task: make(chan func(), 1), } } pool.scanDuration = DefaultScanInterval // initialize the worker queue if capacity == -1{ return nil, ErrInvalidPoolCapacity } pool.workers = NewWorkerQueue(0) pool.cond = sync.NewCond(pool.lock) // initialize the purging goroutine go pool.scavengerRoutine() return pool, nil } worker.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package go_pool import ( "time" ) type Worker struct{ pool *Pool task chan func() recycleTime time.Time } func (w *Worker) Run(){ w.pool.incRunning() go func(){ defer func(){ w.pool.decRunning() w.pool.workerCache.Put(w) // todo: panic recovery strategy }() for f := range w.task{ // receiving nil indicates that the worker should stop and quit go routine if f == nil{ return } f() // recycle worker back into the pool, if not success quit go routine if success := w.pool.recycleWorker(w); !success{ return } } }() } worker_queue.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package go_pool type WorkerQueue interface { len() int isEmpty() bool add(worker *Worker) error detach() *Worker reset() } func NewWorkerQueue(size int) WorkerQueue{ return NewSimpleWorkerQueue(size) } func NewSimpleWorkerQueue(size int) *simpleWorkerQueue{ return &simpleWorkerQueue{ size: size, workers: make([]*Worker, 0, size), } } type simpleWorkerQueue struct{ workers []*Worker size int } func(sq *simpleWorkerQueue) len() int{ return len(sq.workers) } func(sq *simpleWorkerQueue) isEmpty() bool{ return sq.len() == 0 } func (sq *simpleWorkerQueue) add(worker *Worker) error{ sq.workers = append(sq.workers, worker) return nil } func (sq *simpleWorkerQueue) detach() *Worker{ length := sq.len() if length == 0{ return nil } worker := sq.workers[length - 1] sq.workers[length - 1] = nil // slice operation should avoid memory leak sq.workers = sq.workers[:length-1] return worker } func (sq *simpleWorkerQueue) reset(){ for i := 0;i < sq.len(); i++{ sq.workers[i].task <- nil sq.workers[i] = nil } sq.workers = sq.workers[:0] } lock.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package go_pool import ( "runtime" "sync" "sync/atomic" ) type spinLock uint32 func (sl *spinLock) Lock() { for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { runtime.Gosched() } } func (sl *spinLock) Unlock() { atomic.StoreUint32((*uint32)(sl), 0) } // NewSpinLock instantiates a spin-lock. func NewSpinLock() sync.Locker { return new(spinLock) } pool_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package go_pool import ( "math" "runtime" "sync" "testing" "time" ) const( _ = 1 << (10 * iota) KiB //1024 MiB // 1048578 ) const ( InfinitePoolSize = math.MaxInt32 PoolSize = 10000 SleepTime = 100 OverSizeTaskNum = 10 * PoolSize UnderSizeTaskNum = 0.2 * PoolSize ) var currentMem uint64 func demoTaskFunc(args interface{}){ n := args.(int) time.Sleep(time.Duration(n) * time.Millisecond) } func TestPoolWaitToGetWorker(t *testing.T){ var wg sync.WaitGroup p, err := NewPool(PoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< OverSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestPoolGetWorkerFromCache(t *testing.T){ var currentMem uint64 var wg sync.WaitGroup p, err := NewPool(PoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< UnderSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestNoPool(t *testing.T){ var wg sync.WaitGroup for i:=0; i<UnderSizeTaskNum; i++{ wg.Add(1) go func(){ defer wg.Done() demoTaskFunc(SleepTime) }() } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestWithInfinitePool(t *testing.T){ var wg sync.WaitGroup p, err := NewPool(InfinitePoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< UnderSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } pool_benchmark_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package go_pool import ( "testing" "time" ) const ( RunTimes = 5000000 BenchParam = 10 BenchPoolSize = 200000 ) func demoFunc() { time.Sleep(time.Duration(BenchParam) * time.Millisecond) } func BenchmarkPoolThroughput(b *testing.B) { p, _ := NewPool(BenchPoolSize) defer p.Shutdown() b.StartTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { _ = p.Submit(demoFunc) } } b.StopTimer() } func BenchmarkGoroutinesThroughput(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { go demoFunc() } } }