package resource_pool import ( "errors" "fmt" "sync" "sync/atomic" "time" ) type idleHandle struct { handle interface{} keepUntil *time.Time } type TooManyHandles struct { location string } func (t TooManyHandles) Error() string { return fmt.Sprintf("Too many handles to %s", t.location) } type OpenHandleError struct { location string err error } func (o OpenHandleError) Error() string { return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err) } // A resource pool implementation where all handles are associated to the // same resource location. type simpleResourcePool struct { options Options numActive *int32 // atomic counter activeHighWaterMark *int32 // atomic / monotonically increasing value openTokens Semaphore mutex sync.Mutex location string // guard by mutex idleHandles []*idleHandle // guarded by mutex isLameDuck bool // guarded by mutex } // This returns a SimpleResourcePool, where all handles are associated to a // single resource location. func NewSimpleResourcePool(options Options) ResourcePool { numActive := new(int32) atomic.StoreInt32(numActive, 0) activeHighWaterMark := new(int32) atomic.StoreInt32(activeHighWaterMark, 0) var tokens Semaphore if options.OpenMaxConcurrency > 0 { tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency)) } return &simpleResourcePool{ location: "", options: options, numActive: numActive, activeHighWaterMark: activeHighWaterMark, openTokens: tokens, mutex: sync.Mutex{}, idleHandles: make([]*idleHandle, 0, 0), isLameDuck: false, } } // See ResourcePool for documentation. func (p *simpleResourcePool) NumActive() int32 { return atomic.LoadInt32(p.numActive) } // See ResourcePool for documentation. func (p *simpleResourcePool) ActiveHighWaterMark() int32 { return atomic.LoadInt32(p.activeHighWaterMark) } // See ResourcePool for documentation. func (p *simpleResourcePool) NumIdle() int { p.mutex.Lock() defer p.mutex.Unlock() return len(p.idleHandles) } // SimpleResourcePool can only register a single (network, address) entry. // Register should be call before any Get calls. func (p *simpleResourcePool) Register(resourceLocation string) error { if resourceLocation == "" { return errors.New("Invalid resource location") } p.mutex.Lock() defer p.mutex.Unlock() if p.isLameDuck { return fmt.Errorf( "cannot register %s to lame duck resource pool", resourceLocation) } if p.location == "" { p.location = resourceLocation return nil } return errors.New("SimpleResourcePool can only register one location") } // SimpleResourcePool will enter lame duck mode upon calling Unregister. func (p *simpleResourcePool) Unregister(resourceLocation string) error { p.EnterLameDuckMode() return nil } func (p *simpleResourcePool) ListRegistered() []string { p.mutex.Lock() defer p.mutex.Unlock() if p.location != "" { return []string{p.location} } return []string{} } func (p *simpleResourcePool) getLocation() (string, error) { p.mutex.Lock() defer p.mutex.Unlock() if p.location == "" { return "", fmt.Errorf( "resource location is not set for SimpleResourcePool") } if p.isLameDuck { return "", fmt.Errorf( "lame duck resource pool cannot return handles to %s", p.location) } return p.location, nil } // This gets an active resource from the resource pool. Note that the // resourceLocation argument is ignored (The handles are associated to the // resource location provided by the first Register call). func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) { activeCount := atomic.AddInt32(p.numActive, 1) if p.options.MaxActiveHandles > 0 && activeCount > p.options.MaxActiveHandles { atomic.AddInt32(p.numActive, -1) return nil, TooManyHandles{p.location} } highest := atomic.LoadInt32(p.activeHighWaterMark) for activeCount > highest && !atomic.CompareAndSwapInt32( p.activeHighWaterMark, highest, activeCount) { highest = atomic.LoadInt32(p.activeHighWaterMark) } if h := p.getIdleHandle(); h != nil { return h, nil } location, err := p.getLocation() if err != nil { atomic.AddInt32(p.numActive, -1) return nil, err } if p.openTokens != nil { // Current implementation does not wait for tokens to become available. // If that causes availability hits, we could increase the wait, // similar to simple_pool.go. if p.openTokens.TryAcquire(0) { defer p.openTokens.Release() } else { // We could not immediately acquire a token. // Instead of waiting atomic.AddInt32(p.numActive, -1) return nil, OpenHandleError{ p.location, errors.New("Open Error: reached OpenMaxConcurrency")} } } handle, err := p.options.Open(location) if err != nil { atomic.AddInt32(p.numActive, -1) return nil, OpenHandleError{p.location, err} } return NewManagedHandle(p.location, handle, p, p.options), nil } // See ResourcePool for documentation. func (p *simpleResourcePool) Release(handle ManagedHandle) error { if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { return errors.New( "Resource pool cannot take control of a handle owned " + "by another resource pool") } h := handle.ReleaseUnderlyingHandle() if h != nil { // We can unref either before or after queuing the idle handle. // The advantage of unref-ing before queuing is that there is // a higher chance of successful Get when number of active handles // is close to the limit (but potentially more handle creation). // The advantage of queuing before unref-ing is that there's a // higher chance of reusing handle (but potentially more Get failures). atomic.AddInt32(p.numActive, -1) p.queueIdleHandles(h) } return nil } // See ResourcePool for documentation. func (p *simpleResourcePool) Discard(handle ManagedHandle) error { if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { return errors.New( "Resource pool cannot take control of a handle owned " + "by another resource pool") } h := handle.ReleaseUnderlyingHandle() if h != nil { atomic.AddInt32(p.numActive, -1) if err := p.options.Close(h); err != nil { return fmt.Errorf("failed to close resource handle: %v", err) } } return nil } // See ResourcePool for documentation. func (p *simpleResourcePool) EnterLameDuckMode() { p.mutex.Lock() toClose := p.idleHandles p.isLameDuck = true p.idleHandles = []*idleHandle{} p.mutex.Unlock() p.closeHandles(toClose) } // This returns an idle resource, if there is one. func (p *simpleResourcePool) getIdleHandle() ManagedHandle { var toClose []*idleHandle defer func() { // NOTE: Must keep the closure around to late bind the toClose slice. p.closeHandles(toClose) }() now := p.options.getCurrentTime() p.mutex.Lock() defer p.mutex.Unlock() var i int for i = 0; i < len(p.idleHandles); i++ { idle := p.idleHandles[i] if idle.keepUntil == nil || now.Before(*idle.keepUntil) { break } } if i > 0 { toClose = p.idleHandles[0:i] } if i < len(p.idleHandles) { idle := p.idleHandles[i] p.idleHandles = p.idleHandles[i+1:] return NewManagedHandle(p.location, idle.handle, p, p.options) } if len(p.idleHandles) > 0 { p.idleHandles = []*idleHandle{} } return nil } // This adds an idle resource to the pool. func (p *simpleResourcePool) queueIdleHandles(handle interface{}) { var toClose []*idleHandle defer func() { // NOTE: Must keep the closure around to late bind the toClose slice. p.closeHandles(toClose) }() now := p.options.getCurrentTime() var keepUntil *time.Time if p.options.MaxIdleTime != nil { // NOTE: Assign to temp variable first to work around compiler bug x := now.Add(*p.options.MaxIdleTime) keepUntil = &x } p.mutex.Lock() defer p.mutex.Unlock() if p.isLameDuck { toClose = []*idleHandle{ {handle: handle}, } return } p.idleHandles = append( p.idleHandles, &idleHandle{ handle: handle, keepUntil: keepUntil, }) nIdleHandles := uint32(len(p.idleHandles)) if nIdleHandles > p.options.MaxIdleHandles { handlesToClose := nIdleHandles - p.options.MaxIdleHandles toClose = p.idleHandles[0:handlesToClose] p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles] } } // Closes resources, at this point it is assumed that this resources // are no longer referenced from the main idleHandles slice. func (p *simpleResourcePool) closeHandles(handles []*idleHandle) { for _, handle := range handles { _ = p.options.Close(handle.handle) } }