Катя обнови решението на 28.12.2015 18:05 (преди над 2 години)
+package main
+
+import (
+        "sync"
+)
+
+type Request interface {
+        // return the id if the Request. If two Requests are the same, they have the same id
+        ID() string
+
+        // Blocks while it executes the Request
+        // Returns the result from the execution or an error
+        // The result and the error shoukd not be passed to SetResult
+        // of the current request - They are stored internally before they are returned
+        Run() (result interface{}, err error)
+
+        // Returns a flag if the Request is cacheble
+        // Has an unidentified behaviour id the called before `Run`
+        Cacheable() bool
+
+        // Sets the result of the Request
+        // Should not be called for Request for which `Run` has been called.
+        SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+        // Adds request and executes it if this us necessary at the first possible time
+        AddRequest(request Request)
+
+        // Stops the Requester. Waits all started Requests and call `SetResult` for all requests
+        // that for which a same type Request has been executed
+        // No new Request should be added at this time. No Requsts should be queued for calling
+        // of `SetResult`
+        Stop()
+}
+
+// Returns a new Requester, which cashes the responses of cacheSize Requests
+// and executed no more than throttleSize Requests at the same time
+func NewRequester(cacheSize int, throttleSize int) Requester {
+        requester := &MyRequester{cacheSize: cacheSize, throttleSize: throttleSize}
+        requester.init()
+        go requester.start()
+
+        return requester
+}
+
+// Implemenation of Requester interface
+type MyRequester struct {
+        cacheSize     int
+        throttleSize  int
+        running       bool
+        mutex         sync.Mutex
+        queue         []Request
+        cache         map[string]ExecutionResult
+        cachedIds     []string
+        executionPool map[string]*Request
+        cond          *sync.Cond
+}
+
+// Initialises all fields of MyRequester
+func (requester *MyRequester) init() {
+        requester.running = true
+        requester.mutex = sync.Mutex{}
+        requester.queue = make([]Request, 0)
+        requester.cache = make(map[string]ExecutionResult, 0)
+        requester.cachedIds = make([]string, 0)
+        requester.executionPool = make(map[string]*Request)
+        condMutex := sync.Mutex{}
+        condMutex.Lock()
+        requester.cond = sync.NewCond(&condMutex)
+}
+
+// Adds a Request for execution. It will be executed if necessary at the first possible time
+func (requester *MyRequester) AddRequest(request Request) {
+        if requester.running {
+                requester.mutex.Lock()
+                defer func() {
+                        requester.mutex.Unlock()
+                }()
+                requester.queue = append(requester.queue, request)
+                requester.cond.Signal()
+        }
+}
+
+// Stops MyRequester. All pending requests will be executed
+func (requester *MyRequester) Stop() {
+        requester.mutex.Lock()
+        defer func() {
+                requester.mutex.Unlock()
+        }()
+        if requester.running {
+                requester.running = false
+        }
+        requester.cond.Signal()
+}
+
+// Waits for Requests and executes them or takes the result from the cache
+func (requester *MyRequester) start() {
+        for {
+                requester.mutex.Lock()
+                if !requester.running && len(requester.queue) == 0 {
+                        requester.mutex.Unlock()
+                        break
+                } else if len(requester.queue) == 0 {
+                        requester.mutex.Unlock()
+                        requester.cond.Wait()
+                } else {
+                        requester.mutex.Unlock()
+                }
+
+                requester.executeRequest()
+        }
+}
+
+// finds  the first available request and executes it
+func (requester *MyRequester) executeRequest() {
+        requester.mutex.Lock()
+        defer func() {
+                requester.mutex.Unlock()
+        }()
+        for i := 0; i < len(requester.queue); i++ {
+                request := requester.queue[i]
+                id := request.ID()
+                // check if it is cached
+                executionResult, ok := requester.cache[id]
+                if ok {
+                        request.SetResult(executionResult.result, executionResult.err)
+                        requester.queue = append(requester.queue[:i], requester.queue[i+1:]...)
+                        break
+                }
+
+                // check if request of the same type is executed right now
+                _, executedNow := requester.executionPool[id]
+                if executedNow {
+                        continue
+                }
+
+                // add the request to the execution pool if possible
+                if len(requester.executionPool) < requester.throttleSize {
+                        requester.executionPool[id] = &request
+                        requester.queue = append(requester.queue[:i], requester.queue[i+1:]...)
+                        go requester.doExecute(request)
+                        break
+                }
+        }
+}
+
+func (requester *MyRequester) doExecute(request Request) {
+        result, err := request.Run()
+        if request.Cacheable() {
+                requester.addToCache(request.ID(), result, err)
+        }
+        requester.mutex.Lock()
+        // remove the request from the execution pool
+        delete(requester.executionPool, request.ID())
+        requester.mutex.Unlock()
+}
+
+// Adds the result of Request's execution to the cache
+func (requester *MyRequester) addToCache(id string, result interface{}, err error) {
+        executionResult := ExecutionResult{result: result, err: err}
+        requester.mutex.Lock()
+        defer func() {
+                requester.mutex.Unlock()
+        }()
+        if len(requester.cachedIds) == requester.cacheSize {
+                removeId := requester.cachedIds[0]
+                requester.cachedIds = requester.cachedIds[1:]
+                delete(requester.cache, removeId)
+        }
+        requester.cachedIds = append(requester.cachedIds, id)
+        requester.cache[id] = executionResult
+}
+
+type ExecutionResult struct {
+        result interface{}
+        err    error
+}
Добър вечер,
на бързо и на градус, мога да кажа че:
- defer приема извикване на функция така че може и само defer requester.mutex.Unlock().
- Принципно ако ще се локва целия обект както изглежда е при теб е прието просто да ембеднеш sync.Mutex-a и да еrequester.Lock(). Не че е грешно иначе, просто е по go-шки (според мен) с ембеднат mutex.
- Решението е интересно (и навярно не лошo) използване на sync.Cond, което изглежда да работи. Нямам спомен да съм го ползвал (sync.Cond) но винаги съм си мислел че в някакви такива ситуации би бил полезен. Браво.
- Явно не съм обяснил, като хората, че Stop изчаква пълното и тотално спиране на Requester-а. Тоест изчаква все пак да завършат всички започнати заявки.
- Прочети отново условието за Stop, главно в частта кои заявки биват завършвани, започвани и довършвани и кои не :D
Лека вечер, приятно кодене и Приятна Нова Година.
