Андон обнови решението на 31.12.2015 02:54 (преди над 2 години)
+package main
+
+import "sync"
+
+
+type Request interface {
+ ID() string
+ Run() (result interface{}, err error)
+
+ Cacheable() bool
+
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+ Stop()
+}
+
+type Result struct {
+ id string
+ resData interface{}
+ err error
+}
+
+func (r *Result) ID() string {
+ return r.id
+}
+
+type Cache struct {
+ cachedData []Result
+
+ startIndex uint32
+ endIndex uint32
+
+ elementsSize uint32
+ maxSize uint32
+}
+
+func (q *Cache) Push(res Result) {
+ q.endIndex++
+ if q.endIndex >= q.maxSize {
+ q.endIndex = 0
+ }
+
+ if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
+ q.startIndex++
+ if q.startIndex >= q.maxSize {
+ q.startIndex = 0
+ }
+ } else {
+ q.elementsSize++
+ }
+
+ q.cachedData[q.endIndex] = res
+}
+
+func (q *Cache) GetResult(id string) *Result {
+ for _, res := range q.cachedData {
+ if id == res.ID() {
+ return &res
+ }
+ }
+
+ return nil
+}
+
+type RequestQueue []Request
+
+func (q *RequestQueue) Push(r Request) {
+ *q = append(*q, r)
+}
+
+func (q *RequestQueue) Pop() (r Request) {
+ if len(*q) == 0 {
+ return nil
+ }
+
+ r = (*q)[0]
+
+ *q = (*q)[1:]
+
+ return
+}
+
+type Worker struct {
+ isStopped bool
+
+ unresolvedRequests RequestQueue
+
+ workingRequestsNum int
+ cacheLock sync.Mutex
+ unresolvedRequestsLocker sync.Mutex
+
+ workersWaitGr sync.WaitGroup
+ //slice с канали
+ maxWorkers int
+
+ cachedRequests Cache
+}
+
+//WIP
+
+//this method handles all the requests of the Requester
+func (w *Worker) handleRequests() {
+ for !w.isStopped {
+ if len(w.unresolvedRequests) == 0 {
+ continue
+ }
+
+ for w.workingRequestsNum >= w.maxWorkers {
+ }
+
+ w.unresolvedRequestsLocker.Lock()
+ r := w.unresolvedRequests.Pop()
+ w.unresolvedRequestsLocker.Unlock()
+ if r != nil {
+ if c := w.cachedRequests.GetResult(r.ID()); c != nil {
+
+ r.SetResult(c.resData, c.err)
+ } else {
+
+ w.workingRequestsNum++
+
+ go func() {
+ w.workersWaitGr.Add(1)
+ res, e := r.Run()
+
+ if r.Cacheable() {
+ cacheEntry := Result{id: r.ID(), resData: res, err: e}
+ w.cacheLock.Lock()
+ w.cachedRequests.Push(cacheEntry)
+ w.cacheLock.Unlock()
+ }
+ w.workingRequestsNum--
+ w.workersWaitGr.Done()
+ }()
+
+ }
+ }
+ }
+}
+
+func (w *Worker) AddRequest(request Request) {
+ if w.isStopped {
+ return
+ }
+
+ w.unresolvedRequestsLocker.Lock()
+ w.unresolvedRequests.Push(request)
+ w.unresolvedRequestsLocker.Unlock()
+}
+
+func (w *Worker) Stop() {
+ w.isStopped = true
+
+ w.workersWaitGr.Wait()
+}
+
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
+
+ var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
+ cachedRequests: c, maxWorkers: throttleSize}
+
+ go newRequester.handleRequests()
+ return newRequester
+}
Здрасти,
Основно:
- Няма да е лошо handleRequests да не цикли като няма какво да прави
- Рънни си последните тестове
- Рънни си ги с -race
п.п. Весело изкарване на Новата Година