[自建轮]高性能Goroutine Pool

高性能Goroutine Pool go调度器没有限制对goroutine的数量,在goroutine瞬时大规模爆发的场景下来不及复用goroutine从而导致大量goroutine被创建,会导致大量的系统资源占用,尝试池化。 go调度器本身不应该对goroutine数量有限制,因为语言层面无法界定需要限制多少,毕竟程序跑在不同性能的环境,在并发规模不太大的场景做限制甚至会降低性能,原生支持限制goroutine数量无疑是得不偿失的。如果只是中等规模和比较小规模的并发场景其实pool的性能并没有优势 目前设计上还需要加上周期性对空闲队列的prune,等写完再加看看benchmark会提升多少。目前来说对大规模goroutine异步并发的场景(1M, 10M)内存优化(10倍往上)和吞吐量优化效果(2-6倍)非常好。 需求场景与目标 限制并发goroutine的数量 复用goroutine,减轻runtime调度压力,提升程序性能 规避过多的goroutine创建侵占系统资源,cpu&内存 关键技术 锁同步: golang有CAS机制,用spin-lock替代mutex 原理, 讨论 LIFO/FIFO队列: LIFO队列能直接有时间排序功能,方便对需要关联入队时间的操作进行处理 Pool容量限制和弹性伸缩 代码实现 pool.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 package go_pool import ( "errors" "sync" "sync/atomic" "time" ) const( OPEN = iota CLOSED ) var ( ErrPoolClosed = errors.New("this pool has been closed") ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") ErrInvalidExpiryTime = errors.New("invalid expiration time") ErrInvalidPoolCapacity = errors.New("invalid pool capacity") DefaultScanInterval = time.Second ) type Pool struct { capacity int32 running int32 lock sync.Locker scanDuration time.Duration blockingTasksNum int maxBlockingTasks int state int32 cond *sync.Cond workers WorkerQueue // LIFO queue workerCache sync.Pool } func (p *Pool) Submit(task func()) error{ if atomic.LoadInt32(&p.state) == CLOSED{ return ErrPoolClosed } // retrieve worker to do the task // return error if no workers available var w *Worker if w = p.retrieveWorker(); w == nil{ return ErrPoolOverload } w.task <- task return nil } func (p *Pool) Shutdown() { atomic.StoreInt32(&p.state, CLOSED) p.lock.Lock() // reset worker queue p.workers.reset() p.lock.Unlock() } func (p *Pool) isClosed() bool{ return atomic.LoadInt32(&p.state) == CLOSED } // change the capacity of the pool func (p *Pool) Resize(size int){ if p.Cap() == size{ return } atomic.StoreInt32(&p.capacity, int32(size)) // need to stop certain workers if #running_workers > #new_capacity diff := p.Running() - size if diff > 0{ for i := 0; i< diff; i++{ p.retrieveWorker().task <- nil } } } func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPEN){ // initialize the purging go routine go p.scavengerRoutine() } } func (p *Pool) Running() int{ return int(atomic.LoadInt32(&p.running)) } func (p *Pool) Cap() int{ return int(atomic.LoadInt32(&p.capacity)) } func (p *Pool) Free() int{ return p.Cap() - p.Running() } func (p *Pool) incRunning(){ atomic.AddInt32(&p.running, 1) } func (p *Pool) decRunning(){ atomic.AddInt32(&p.running, -1) } // put the worker back into the pool for recycling func (p *Pool) recycleWorker(worker *Worker) bool{ capacity := p.Cap() if p.isClosed() || (capacity >= 0 && p.Running() > capacity){ return false } worker.recycleTime = time.Now() p.lock.Lock() // need to double check if state is CLOSED if p.isClosed(){ p.lock.Unlock() return false } err := p.workers.add(worker) if err != nil{ p.lock.Unlock() return false } // notify any request stuck in retrieveWorker that there is an available worker in pool p.cond.Signal() p.lock.Unlock() return true } func (p *Pool) spawnWorker() *Worker{ worker := p.workerCache.Get().(*Worker) worker.Run() return worker } func (p *Pool) retrieveWorker() (worker *Worker){ p.lock.Lock() worker = p.workers.detach() // get worker from queue successfully if worker != nil{ p.lock.Unlock() }else if capacity := p.Cap();capacity == -1{ p.lock.Unlock() // spawn worker return p.spawnWorker() }else if p.Running() < capacity{ // infinite pool p.lock.Unlock() // spawn worker return p.spawnWorker() }else{ // if the number of blocking tasks reaches the maximum blocking tasks threshold then returns nil // and throw the ErrPoolOverload error in Submit method if p.maxBlockingTasks != 0 && p.maxBlockingTasks <= p.blockingTasksNum{ p.lock.Unlock() return } // the pool is full need to wait until worker is available for task handling Retry: // handle the number of blocking task handling requests // wait until condition being notified p.blockingTasksNum++ p.cond.Wait() p.blockingTasksNum-- // ensure there is a worker available because you don't know if the recycled worker being closed then if p.Running() == 0{ p.lock.Unlock() // spawn worker return p.spawnWorker() } worker = p.workers.detach() if worker == nil{ goto Retry } p.lock.Unlock() } return } func (p *Pool) scavengerRoutine(){ heartbeat := time.NewTicker(p.scanDuration) defer heartbeat.Stop() for range heartbeat.C{ if p.isClosed(){ break } // all workers get cleaned up and some invokers still get stuck on cond.Wait() // we need to wake up all invokers in that situation. if p.Running() == 0{ p.cond.Broadcast() } } } func NewPool(capacity int)(*Pool, error){ if capacity <= 0{ capacity = -1 } pool := &Pool{ capacity: int32(capacity), lock: NewSpinLock(), } pool.workerCache.New = func() interface{}{ return &Worker{ pool: pool, task: make(chan func(), 1), } } pool.scanDuration = DefaultScanInterval // initialize the worker queue if capacity == -1{ return nil, ErrInvalidPoolCapacity } pool.workers = NewWorkerQueue(0) pool.cond = sync.NewCond(pool.lock) // initialize the purging goroutine go pool.scavengerRoutine() return pool, nil } worker.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package go_pool import ( "time" ) type Worker struct{ pool *Pool task chan func() recycleTime time.Time } func (w *Worker) Run(){ w.pool.incRunning() go func(){ defer func(){ w.pool.decRunning() w.pool.workerCache.Put(w) // todo: panic recovery strategy }() for f := range w.task{ // receiving nil indicates that the worker should stop and quit go routine if f == nil{ return } f() // recycle worker back into the pool, if not success quit go routine if success := w.pool.recycleWorker(w); !success{ return } } }() } worker_queue.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package go_pool type WorkerQueue interface { len() int isEmpty() bool add(worker *Worker) error detach() *Worker reset() } func NewWorkerQueue(size int) WorkerQueue{ return NewSimpleWorkerQueue(size) } func NewSimpleWorkerQueue(size int) *simpleWorkerQueue{ return &simpleWorkerQueue{ size: size, workers: make([]*Worker, 0, size), } } type simpleWorkerQueue struct{ workers []*Worker size int } func(sq *simpleWorkerQueue) len() int{ return len(sq.workers) } func(sq *simpleWorkerQueue) isEmpty() bool{ return sq.len() == 0 } func (sq *simpleWorkerQueue) add(worker *Worker) error{ sq.workers = append(sq.workers, worker) return nil } func (sq *simpleWorkerQueue) detach() *Worker{ length := sq.len() if length == 0{ return nil } worker := sq.workers[length - 1] sq.workers[length - 1] = nil // slice operation should avoid memory leak sq.workers = sq.workers[:length-1] return worker } func (sq *simpleWorkerQueue) reset(){ for i := 0;i < sq.len(); i++{ sq.workers[i].task <- nil sq.workers[i] = nil } sq.workers = sq.workers[:0] } lock.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package go_pool import ( "runtime" "sync" "sync/atomic" ) type spinLock uint32 func (sl *spinLock) Lock() { for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { runtime.Gosched() } } func (sl *spinLock) Unlock() { atomic.StoreUint32((*uint32)(sl), 0) } // NewSpinLock instantiates a spin-lock. func NewSpinLock() sync.Locker { return new(spinLock) } pool_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package go_pool import ( "math" "runtime" "sync" "testing" "time" ) const( _ = 1 << (10 * iota) KiB //1024 MiB // 1048578 ) const ( InfinitePoolSize = math.MaxInt32 PoolSize = 10000 SleepTime = 100 OverSizeTaskNum = 10 * PoolSize UnderSizeTaskNum = 0.2 * PoolSize ) var currentMem uint64 func demoTaskFunc(args interface{}){ n := args.(int) time.Sleep(time.Duration(n) * time.Millisecond) } func TestPoolWaitToGetWorker(t *testing.T){ var wg sync.WaitGroup p, err := NewPool(PoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< OverSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestPoolGetWorkerFromCache(t *testing.T){ var currentMem uint64 var wg sync.WaitGroup p, err := NewPool(PoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< UnderSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestNoPool(t *testing.T){ var wg sync.WaitGroup for i:=0; i<UnderSizeTaskNum; i++{ wg.Add(1) go func(){ defer wg.Done() demoTaskFunc(SleepTime) }() } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestWithInfinitePool(t *testing.T){ var wg sync.WaitGroup p, err := NewPool(InfinitePoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< UnderSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } pool_benchmark_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package go_pool import ( "testing" "time" ) const ( RunTimes = 5000000 BenchParam = 10 BenchPoolSize = 200000 ) func demoFunc() { time.Sleep(time.Duration(BenchParam) * time.Millisecond) } func BenchmarkPoolThroughput(b *testing.B) { p, _ := NewPool(BenchPoolSize) defer p.Shutdown() b.StartTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { _ = p.Submit(demoFunc) } } b.StopTimer() } func BenchmarkGoroutinesThroughput(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { go demoFunc() } } }

December 30, 2020 · 9 min · 1751 words · Me

Possible Memory Leak

实际上对于一个有GC的语言,我们不必太多关心内存泄漏的问题,因为程序的runtime帮我们很好地额回收不再使用的内存。但是,我们还是得了解一些特殊的场景,这些场景会产生暂时性或者永久性的内存泄漏。 待开坑...

December 25, 2020 · 1 min · 2 words · Me

Close Channels Gracefully

优雅地关闭通道 场景一:M个接收者和一个发送者。发送者通过关闭用来传输数据的通道来传递发送结束信号 这是最简单的一种情形。当发送者欲结束发送,让它关闭用来传输数据的通道即可。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) // 发送者 go func() { for { if value := rand.Intn(Max); value == 0 { // 此唯一的发送者可以安全地关闭此数据通道。 close(dataCh) return } else { dataCh <- value } } }() // 接收者 for i := 0; i < NumReceivers; i++ { go func() { defer wgReceivers.Done() // 接收数据直到通道dataCh已关闭 // 并且dataCh的缓冲队列已空。 for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() } 场景二: 一个接收者和N个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要在发送数据了 此情形比上一种情形复杂一些。我们不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了通道关闭原则。 但是我们可以让接收者关闭一个额外的信号通道来通知发送者不要在发送数据了。 ...

December 24, 2020 · 7 min · 1377 words · Me

Channels Concurrency Work-Around

记录了一些channels常见的场景,以及自己的一些感受: 使用通道进行异步和并发编程是简单和惬意的; 通道同步技术比被很多其它语言采用的其它同步方案(比如角色模型和async/await模式)有着更多的应用场景和更多的使用变种。 通道作为同步手段,并非在任何情况下都是最佳的同步技术,本文也会补充原子操作和sync包内其他的技术作为参考。 将通道用做future/promise 很多其它流行语言支持future/promise来实现异步(并发)编程。 Future/promise常常用在请求/回应场合。 返回单向接收通道做为函数返回结果 在下面这个例子中,sumSquares函数调用的两个实参请求并发进行。 每个通道读取操作将阻塞到请求返回结果为止。 两个实参总共需要大约3秒钟(而不是6秒钟)准备完毕(以较慢的一个为准)。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package main import ( "time" "math/rand" "fmt" ) func longTimeRequest() <-chan int32 { r := make(chan int32) go func() { time.Sleep(time.Second * 3) // 模拟一个工作负载 r <- rand.Int31n(100) }() return r } func sumSquares(a, b int32) int32 { return a*a + b*b } func main() { rand.Seed(time.Now().UnixNano()) a, b := longTimeRequest(), longTimeRequest() fmt.Println(sumSquares(<-a, <-b)) } 将单向发送通道类型用做函数实参 和上例一样,在下面这个例子中,sumSquares函数调用的两个实参的请求也是并发进行的。 和上例不同的是longTimeRequest函数接收一个单向发送通道类型参数而不是返回一个单向接收通道结果。 ...

December 22, 2020 · 14 min · 2852 words · Me

Golang TDD

Preface 本文整理golang编码的单元测试常用示例,以及TDD的简要流程。 单元测试基础 单元测试文件以_test.go结尾,需要记住以下原则: 文件名必须是_test.go结尾的,这样在执行go test的时候才会执行到相应的代码 你必须import testing这个包 所有的测试用例函数必须是Test开头 测试用例会按照源代码中写的顺序依次执行 测试函数TestXxx()的参数是testing.T,我们可以使用该类型来记录错误或者是测试状态 测试格式:func TestXxx (t *testing.T),Xxx部分可以为任意的字母数字的组合,但是首字母不能是小写字母[a-z],例如Testintdiv是错误的函数名。 函数中通过调用testing.T的Error, Errorf, FailNow, Fatal, FatalIf方法,说明测试不通过,调用Log方法用来记录测试的信息。 Table-Driven-Testing 测试讲究 case 覆盖,当我们要覆盖更多 case 时,显然通过修改代码的方式很笨拙。这时我们可以采用 Table-Driven 的方式写测试,标准库中有很多测试是使用这种方式写的。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func TestFib(t *testing.T) { var fibTests = []struct { in int // input expected int // expected result }{ {1, 1}, {2, 1}, {3, 2}, {4, 3}, {5, 5}, {6, 8}, {7, 13}, } for _, tt := range fibTests { actual := Fib(tt.in) if actual != tt.expected { t.Errorf("Fib(%d) = %d; expected %d", tt.in, actual, tt.expected) } } } 由于我们使用的是 t.Errorf,即使其中某个 case 失败,也不会终止测试执行。 ...

December 19, 2020 · 17 min · 3480 words · Me