Стоян обнови решението на 31.12.2015 18:40 (преди над 2 години)
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+type Request interface {
+ // Връща идентификатор за заявката. Ако две заявки имат еднакви идентификатори
+ // то те са "равни".
+ ID() string
+
+ // Блокира докато изпълнява заявката.
+ // Връща резултата или грешка ако изпълнението е неуспешно.
+ // Резултата и грешката не трябва да бъдат подавани на SetResult
+ // за текущата заявка - те се запазват вътрешно преди да бъдат върнати.
+ Run() (result interface{}, err error)
+
+ // Връща дали заявката е кешируерма.
+ // Метода има неопределено поведение, ако бъде извикан преди `Run`.
+ Cacheable() bool
+
+ // Задава резултата на заявката.
+ // Не трябва да се извиква за заявки, за които е бил извикан `Run`.
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ // Добавя заявка за изпълнение и я изпълнява, ако това е необходимо, при първа възможност.
+ AddRequest(request Request)
+
+ // Спира 'Заявчика'. Това означава, че изчаква всички вече започнали заявки да завършат
+ // и извиква `SetResult` на тези заявки, които вече са били добавени, но "равни" на тях вече са били изпълнявание.
+ // Нови заявки не трябва да бъдат започвани през това време, нито вече започнати, равни на тях, да бъдат добавяни за извикване на `SetResult`.
+ Stop()
+}
+
+type CyclicBuffer struct {
+ buffer []interface{}
+ start, end int
+}
+
+func (cb *CyclicBuffer) Length() int {
+ if cb.start < cb.end {
+ return cb.end - cb.start + 1
+ } else {
+ return cb.start - cb.end + 1
+ }
+}
+
+type InvalidIndexError struct {
+ index int
+}
+
+func (e *InvalidIndexError) Error() string {
+ return fmt.Sprintf("%d is not a valid index", e.index)
+}
+
+func (cb *CyclicBuffer) Get(index int) (interface{}, error) {
+ if index < 0 || index >= len(cb.buffer) || cb.start == 0 && index > cb.end {
+ return nil, &InvalidIndexError{index}
+ }
+
+ pos := cb.end - index
+
+ if pos < 0 {
+ pos += len(cb.buffer)
+ }
+
+ return cb.buffer[pos], nil
+
+}
+
+func (cb *CyclicBuffer) Add(value interface{}) {
+ if cb.start == -1 || cb.end == -1 {
+ cb.start = 0
+ cb.end = 0
+ cb.buffer[0] = value
+ return
+ }
+
+ if cb.end == len(cb.buffer)-1 {
+ cb.end = 0
+ } else {
+ cb.end += 1
+ }
+
+ if cb.start == len(cb.buffer)-1 {
+ cb.start = 0
+ } else if cb.start == cb.end {
+ cb.start += 1
+ }
+
+ cb.buffer[cb.end] = value
+}
+
+func NewCyclicBuffer(size int) *CyclicBuffer {
+ return &CyclicBuffer{
+ buffer: make([]interface{}, size),
+ start: -1,
+ end: -1,
+ }
+}
+
+type BufferRequester struct {
+ queue chan Request
+ cache *CyclicBuffer
+ throttle chan struct{}
+ lock sync.RWMutex
+ waiter sync.WaitGroup
+ isStopped bool
+ classes map[string]chan Request
+}
+
+type Result struct {
+ ID string
+ Result interface{}
+ Err error
+}
+
+type RequestNotFoundError struct {
+ ID string
+}
+
+func (e *RequestNotFoundError) Error() string {
+ return fmt.Sprintf("Could not find result of request which ID is %v", e.ID)
+}
+
+func (br *BufferRequester) FindResult(id string) (*Result, error) {
+ cacheLen := br.cache.Length()
+
+ for i := 0; i < cacheLen; i++ {
+ cacheItem, _ := br.cache.Get(i)
+
+ if result, ok := cacheItem.(Result); ok && result.ID == id {
+ return &result, nil
+ }
+ }
+
+ return nil, &RequestNotFoundError{id}
+}
+
+func (br *BufferRequester) AddRequest(request Request) {
+ if !br.isStopped {
+ br.queue <- request
+ }
+}
+
+func (br *BufferRequester) Stop() {
+ br.isStopped = true
+ close(br.queue)
+ for range br.queue {
+ }
+ br.waiter.Wait()
+ close(br.throttle)
+ for range br.throttle {
+ }
+}
+
+// Връща нов заявчик, който кешира отговорите на до cacheSize заявки,
+// изпълнявайки не повече от throttleSize заявки едновременно.
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ br := &BufferRequester{
+ cache: NewCyclicBuffer(cacheSize),
+ queue: make(chan Request, 100),
+ throttle: make(chan struct{}, throttleSize),
+ classes: make(map[string]chan Request),
+ }
+
+ for i := 0; i < throttleSize; i++ {
+ br.throttle <- struct{}{}
+ }
+
+ br.waiter.Add(1)
+
+ go func() {
+ for request := range br.queue {
+ br.waiter.Add(1)
+
+ go func(request Request) {
+ defer br.waiter.Done()
+ id := request.ID()
+ br.lock.Lock()
+ class, ok := br.classes[id]
+ br.lock.Unlock()
+
+ if ok {
+ class <- request
+ return
+ }
+
+ cachedResult, _ := br.FindResult(id)
+
+ if cachedResult != nil {
+ request.SetResult(cachedResult.Result, cachedResult.Err)
+ return
+ }
+
+ <-br.throttle
+ class = make(chan Request, 100)
+ br.lock.Lock()
+ br.classes[id] = class
+ br.lock.Unlock()
+ result, err := request.Run()
+ close(class)
+
+ if request.Cacheable() {
+ br.cache.Add(&Result{
+ ID: id,
+ Result: result,
+ Err: err,
+ })
+
+ for identicalRequest := range class {
+ identicalRequest.SetResult(result, err)
+ }
+ } else if !br.isStopped {
+ for identicalRequest := range class {
+ br.queue <- identicalRequest
+ }
+ }
+
+ br.lock.Lock()
+ delete(br.classes, id)
+ br.lock.Unlock()
+ br.throttle <- struct{}{}
+ }(request)
+ }
+
+ br.waiter.Done()
+ }()
+
+ return br
+}