Юлия обнови решението на 29.12.2015 19:47 (преди над 2 години)
+package main
+
+import "sync"
+
+type Request interface {
+ ID() string
+ Run() (result interface{}, err error)
+ Cacheable() bool
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+ Stop()
+}
+
+type CachedRequest struct {
+ request Request
+ result interface{}
+ err error
+}
+
+type Added struct {
+ toRun chan *Request
+ closed bool
+ mutex *sync.Mutex
+}
+
+type Running struct {
+ counter chan struct{}
+ runningRequests []*SyncRequest
+ mutex *sync.Mutex
+}
+
+func (r *Running) addToRuning(request *SyncRequest) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ r.runningRequests = append(r.runningRequests, request)
+}
+
+func (r *Running) removeFromRuning(request *SyncRequest) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ for i := 0; i < len(r.runningRequests); i++ {
+ if r.runningRequests[i] == request {
+ r.runningRequests = append(r.runningRequests[:i], append([]*SyncRequest{request}, r.runningRequests[i+1:]...)...)
+ return
+ }
+ }
+}
+
+func (r *Running) doubleIsRunning(request Request) *SyncRequest {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ for i := 0; i < len(r.runningRequests); i++ {
+ if (*r.runningRequests[i].request).ID() == request.ID() {
+ return r.runningRequests[i]
+ }
+ }
+ return nil
+}
+
+type Cached struct {
+ buffer []CachedRequest
+ positionToInsert int
+ cacheSize int
+ mutex *sync.Mutex
+}
+
+func (c *Cached) AddToCache(toCache *CachedRequest) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ var left []CachedRequest
+
+ if c.positionToInsert != c.cacheSize-1 && c.positionToInsert != len(c.buffer) {
+ left = append([]CachedRequest{*toCache}, c.buffer[c.positionToInsert+1:]...)
+ } else {
+ left = []CachedRequest{*toCache}
+ }
+ c.buffer = append(c.buffer[:c.positionToInsert], left...)
+ c.positionToInsert = (c.positionToInsert + 1) % c.cacheSize
+}
+
+func (c *Cached) isInCache(request Request) *CachedRequest {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ for i := 0; i < len(c.buffer); i++ {
+ if c.buffer[i].request.ID() == request.ID() {
+ return &(c.buffer[i])
+ }
+ }
+ return nil
+}
+
+type Cache struct {
+ addedRequests *Added
+ runningRequests *Running
+ cachedRequests *Cached
+ wg, finished sync.WaitGroup
+ mutex *sync.Mutex
+}
+
+type SyncRequest struct {
+ request *Request
+ ready chan struct{}
+}
+
+func NewRequester(cacheSize, throttleSize int) Requester {
+ toRun := make(chan *Request, 100)
+ closed := new(sync.Mutex)
+ addedRequests := &Added{toRun, false, closed}
+
+ counter := make(chan struct{}, throttleSize)
+ var runningBuffer []*SyncRequest
+ runningMutex := new(sync.Mutex)
+ for i := 0; i < throttleSize; i++ {
+ counter <- struct{}{}
+ }
+ runningRequests := &Running{counter, runningBuffer, runningMutex}
+
+ var cacheBuffer []CachedRequest
+ cacheMutex := new(sync.Mutex)
+ cachedRequests := &Cached{cacheBuffer, 0, cacheSize, cacheMutex}
+
+ var wg, finished sync.WaitGroup
+ mutex := new(sync.Mutex)
+ requester := &Cache{addedRequests, runningRequests, cachedRequests, wg, finished, mutex}
+
+ go requester.serveRequests()
+ return requester
+}
+
+func (c *Cache) AddRequest(request Request) {
+ c.addedRequests.mutex.Lock()
+ closed := c.addedRequests.closed
+ c.addedRequests.mutex.Unlock()
+ if closed == false {
+ c.wg.Add(1)
+ c.finished.Add(1)
+ c.addedRequests.toRun <- &request
+ c.wg.Wait()
+ }
+}
+
+func (c *Cache) serveRequests() {
+ for {
+ request := <-c.addedRequests.toRun
+ if request == nil {
+ c.finishRequests()
+ return
+ }
+ if cacheResult := c.cachedRequests.isInCache(*request); cacheResult != nil {
+ (*request).SetResult(cacheResult.result, cacheResult.err)
+ } else if double := c.runningRequests.doubleIsRunning(*request); double != nil {
+ <-double.ready
+ if cached := c.cachedRequests.isInCache(*request); cached != nil {
+ (*request).SetResult(cached.result, cached.err)
+ } else {
+ c.runRequest(request)
+ }
+ } else {
+ c.runRequest(request)
+ }
+ c.wg.Done()
+ c.finished.Done()
+ }
+}
+
+func (c *Cache) runRequest(request *Request) (interface{}, error) {
+ ready := make(chan struct{}, 1)
+ req := &SyncRequest{request, ready}
+
+ <-c.runningRequests.counter
+ c.runningRequests.addToRuning(req)
+ result, err := (*request).Run()
+ if (*request).Cacheable() {
+ c.cachedRequests.AddToCache(&CachedRequest{*request, result, err})
+ }
+ ready <- struct{}{}
+ c.runningRequests.counter <- struct{}{}
+ return result, err
+}
+
+func (c *Cache) finishRequests() {
+ for request := range c.addedRequests.toRun {
+ if cached := c.cachedRequests.isInCache(*request); cached != nil {
+ (*request).SetResult(cached.result, cached.err)
+ }
+ c.wg.Done()
+ c.finished.Done()
+ }
+}
+
+func (c *Cache) Stop() {
+ c.addedRequests.toRun <- nil
+ c.addedRequests.mutex.Lock()
+ c.addedRequests.closed = true
+ c.addedRequests.mutex.Unlock()
+ c.finished.Wait()
+}