Катя обнови решението на 28.12.2015 18:05 (преди над 2 години)
+package main
+
+import (
+ "sync"
+)
+
+type Request interface {
+ // return the id if the Request. If two Requests are the same, they have the same id
+ ID() string
+
+ // Blocks while it executes the Request
+ // Returns the result from the execution or an error
+ // The result and the error shoukd not be passed to SetResult
+ // of the current request - They are stored internally before they are returned
+ Run() (result interface{}, err error)
+
+ // Returns a flag if the Request is cacheble
+ // Has an unidentified behaviour id the called before `Run`
+ Cacheable() bool
+
+ // Sets the result of the Request
+ // Should not be called for Request for which `Run` has been called.
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ // Adds request and executes it if this us necessary at the first possible time
+ AddRequest(request Request)
+
+ // Stops the Requester. Waits all started Requests and call `SetResult` for all requests
+ // that for which a same type Request has been executed
+ // No new Request should be added at this time. No Requsts should be queued for calling
+ // of `SetResult`
+ Stop()
+}
+
+// Returns a new Requester, which cashes the responses of cacheSize Requests
+// and executed no more than throttleSize Requests at the same time
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ requester := &MyRequester{cacheSize: cacheSize, throttleSize: throttleSize}
+ requester.init()
+ go requester.start()
+
+ return requester
+}
+
+// Implemenation of Requester interface
+type MyRequester struct {
+ cacheSize int
+ throttleSize int
+ running bool
+ mutex sync.Mutex
+ queue []Request
+ cache map[string]ExecutionResult
+ cachedIds []string
+ executionPool map[string]*Request
+ cond *sync.Cond
+}
+
+// Initialises all fields of MyRequester
+func (requester *MyRequester) init() {
+ requester.running = true
+ requester.mutex = sync.Mutex{}
+ requester.queue = make([]Request, 0)
+ requester.cache = make(map[string]ExecutionResult, 0)
+ requester.cachedIds = make([]string, 0)
+ requester.executionPool = make(map[string]*Request)
+ condMutex := sync.Mutex{}
+ condMutex.Lock()
+ requester.cond = sync.NewCond(&condMutex)
+}
+
+// Adds a Request for execution. It will be executed if necessary at the first possible time
+func (requester *MyRequester) AddRequest(request Request) {
+ if requester.running {
+ requester.mutex.Lock()
+ defer func() {
+ requester.mutex.Unlock()
+ }()
+ requester.queue = append(requester.queue, request)
+ requester.cond.Signal()
+ }
+}
+
+// Stops MyRequester. All pending requests will be executed
+func (requester *MyRequester) Stop() {
+ requester.mutex.Lock()
+ defer func() {
+ requester.mutex.Unlock()
+ }()
+ if requester.running {
+ requester.running = false
+ }
+ requester.cond.Signal()
+}
+
+// Waits for Requests and executes them or takes the result from the cache
+func (requester *MyRequester) start() {
+ for {
+ requester.mutex.Lock()
+ if !requester.running && len(requester.queue) == 0 {
+ requester.mutex.Unlock()
+ break
+ } else if len(requester.queue) == 0 {
+ requester.mutex.Unlock()
+ requester.cond.Wait()
+ } else {
+ requester.mutex.Unlock()
+ }
+
+ requester.executeRequest()
+ }
+}
+
+// finds the first available request and executes it
+func (requester *MyRequester) executeRequest() {
+ requester.mutex.Lock()
+ defer func() {
+ requester.mutex.Unlock()
+ }()
+ for i := 0; i < len(requester.queue); i++ {
+ request := requester.queue[i]
+ id := request.ID()
+ // check if it is cached
+ executionResult, ok := requester.cache[id]
+ if ok {
+ request.SetResult(executionResult.result, executionResult.err)
+ requester.queue = append(requester.queue[:i], requester.queue[i+1:]...)
+ break
+ }
+
+ // check if request of the same type is executed right now
+ _, executedNow := requester.executionPool[id]
+ if executedNow {
+ continue
+ }
+
+ // add the request to the execution pool if possible
+ if len(requester.executionPool) < requester.throttleSize {
+ requester.executionPool[id] = &request
+ requester.queue = append(requester.queue[:i], requester.queue[i+1:]...)
+ go requester.doExecute(request)
+ break
+ }
+ }
+}
+
+func (requester *MyRequester) doExecute(request Request) {
+ result, err := request.Run()
+ if request.Cacheable() {
+ requester.addToCache(request.ID(), result, err)
+ }
+ requester.mutex.Lock()
+ // remove the request from the execution pool
+ delete(requester.executionPool, request.ID())
+ requester.mutex.Unlock()
+}
+
+// Adds the result of Request's execution to the cache
+func (requester *MyRequester) addToCache(id string, result interface{}, err error) {
+ executionResult := ExecutionResult{result: result, err: err}
+ requester.mutex.Lock()
+ defer func() {
+ requester.mutex.Unlock()
+ }()
+ if len(requester.cachedIds) == requester.cacheSize {
+ removeId := requester.cachedIds[0]
+ requester.cachedIds = requester.cachedIds[1:]
+ delete(requester.cache, removeId)
+ }
+ requester.cachedIds = append(requester.cachedIds, id)
+ requester.cache[id] = executionResult
+}
+
+type ExecutionResult struct {
+ result interface{}
+ err error
+}
Добър вечер,
на бързо и на градус, мога да кажа че:
- defer приема извикване на функция така че може и само
defer requester.mutex.Unlock()
. - Принципно ако ще се локва целия обект както изглежда е при теб е прието просто да ембеднеш
sync.Mutex
-a и да еrequester.Lock()
. Не че е грешно иначе, просто е по go-шки (според мен) с ембеднат mutex. - Решението е интересно (и навярно не лошo) използване на sync.Cond, което изглежда да работи. Нямам спомен да съм го ползвал (sync.Cond) но винаги съм си мислел че в някакви такива ситуации би бил полезен. Браво.
- Явно не съм обяснил, като хората, че Stop изчаква пълното и тотално спиране на Requester-а. Тоест изчаква все пак да завършат всички започнати заявки.
- Прочети отново условието за Stop, главно в частта кои заявки биват завършвани, започвани и довършвани и кои не :D
Лека вечер, приятно кодене и Приятна Нова Година.