Михаил обнови решението на 28.12.2015 13:35 (преди над 2 години)
+package main
+
+import (
+ "sync"
+)
+
+type Request interface {
+ ID() string
+
+ Run() (result interface{}, err error)
+
+ Cacheable() bool
+
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+
+ Stop()
+}
+
+type Result struct {
+ Result interface{}
+ Error error
+}
+
+type SomeRequester struct {
+ cache map[string]Result
+ size int
+ maxProc int
+ queues map[string][]Request
+ queuesMutex sync.Mutex
+ workerScheduler chan chan struct{}
+ order []string
+ stopped bool
+}
+
+func (sr SomeRequester) AddRequest(request Request) {
+ if sr.stopped {
+ return
+ }
+
+ if result, ok := sr.cache[request.ID()]; ok {
+ request.SetResult(result.Result, result.Error)
+ return
+ }
+
+ go sr.runRequest(request)
+}
+
+func (sr *SomeRequester) runRequest(request Request) {
+ ch := make(chan struct{})
+ sr.workerScheduler <- ch
+ defer func() { ch <- struct{}{} }()
+ index := sr.appendToQueues(request.ID(), request)
+ if index > 0 {
+ return
+ }
+
+ result, err := request.Run()
+
+ if request.Cacheable() {
+ sr.order = append(sr.order, request.ID())
+
+ if len(sr.cache) == sr.size {
+ delete(sr.cache, sr.order[0])
+ sr.order = sr.order[1:]
+ }
+
+ sr.cache[request.ID()] = Result{Result: result, Error: err}
+ sr.processQueueAfterCaching(request.ID())
+ return
+ }
+
+ sr.queues[request.ID()] = sr.queues[request.ID()][1:]
+ if len(sr.queues[request.ID()]) > 0 {
+ sr.runRequest(sr.queues[request.ID()][0])
+ }
+
+}
+
+func (sr *SomeRequester) appendToQueues(id string, request Request) int {
+ sr.queuesMutex.Lock()
+ index := len(sr.queues[request.ID()])
+ sr.queues[request.ID()] = append(sr.queues[request.ID()], request)
+ sr.queuesMutex.Unlock()
+
+ return index
+}
+
+func (sr *SomeRequester) processQueueAfterCaching(id string) {
+ for _, request := range sr.queues[id][1:] {
+ request.SetResult(sr.cache[id].Result, sr.cache[id].Error)
+ }
+}
+
+func (sr SomeRequester) Stop() {
+ sr.stopped = true
+}
+
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ requester := SomeRequester{
+ cache: make(map[string]Result),
+ size: cacheSize,
+ workerScheduler: make(chan chan struct{}),
+ queues: make(map[string][]Request),
+ }
+
+ for i := 0; i < throttleSize; i++ {
+ go func() {
+ for {
+ ch := <-requester.workerScheduler
+ <-ch
+ }
+ }()
+ }
+
+ return requester
+}
Добър вечер,
Бърз преглед на градус ми казва че :
- не си прочел условието за Stop докрай или не съм го обяснил както трябва.
- Усетил си се че може би AddRequest не трябва да блокира предвид това че не връща резултат.
- имаш race condition-и на ляво и дясно, примерно конкурентното писане и четене в map не е добра идея.
- на ред 74-ти си изял един else.
Лека вечер, приятно кодене и Весела Нова Година
Привет,
- Мисля, че Stop-a ми е правилен. Реално прави AddRequest да не добавя нови рекуести, а другата част от условието за Stop - рекуестите, които са били добавени, но еднакви на тях се изпълняват, им бива извикан SetResult (ако еднаквия, който се е изпълнявал е кешируем, или биват накарани да се изпълнят, ако не е бил кешируем) Тук въпроса ми е, ако сме спрели Requester-a и имаме чакащи копия на рекуест, когато този рекуест се изпълни и не е кешируем, еднаквите с него трябва да се изпълнят нали?
- :)
- знам за race condition-ите, просто ме мързи да ги оправя и отлагам :D
- вярвам, че elsa не е нужен, защото имам return в края на if block-a
- виж този мой коментар
- ех
- ще се съглася с това ти твърдение, но ще поясня че Cacheable за равни request-и винаги връща едно и също нещо. Тоест няма нужда да ги викаш така последователно. Имаше такова пояснение в условието по-време на някоя ревизия но явно е изпаднало.