Решение на HTTP сваляч от Радослав Георгиев

Обратно към всички решения

Към профила на Радослав Георгиев

Резултати

  • 8 точки от тестове
  • 0 бонус точки
  • 8 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"fmt"
"sync"
)
const (
bufferSize = 1000
)
type Request interface {
// Връща идентификатор за заявката. Ако две заявки имат еднакви идентификатори
// то те са "равни".
ID() string
// Блокира докато изпълнява заявката.
// Връща резултата или грешка ако изпълнението е неуспешно.
// Резултата и грешката не трябва да бъдат подавани на SetResult
// за текущата заявка - те се запазват вътрешно преди да бъдат върнати.
Run() (result interface{}, err error)
// Връща дали заявката е кешируерма.
// Метода има неопределено поведение, ако бъде извикан преди `Run`.
Cacheable() bool
// Задава резултата на заявката.
// Не трябва да се извиква за заявки, за които е бил извикан `Run`.
SetResult(result interface{}, err error)
}
type RingBuffer struct {
buffer []interface{}
beginPos, endPos int
}
type InvalidIndexError struct {
index int
}
func (iie *InvalidIndexError) Error() string {
return fmt.Sprintf("%d is not a valid index for an element in this buffer", iie.index)
}
func (rb *RingBuffer) Length() int {
if rb.beginPos < rb.endPos {
return rb.endPos - rb.beginPos + 1
} else {
return len(rb.buffer)
}
}
func (rb *RingBuffer) Item(index int) (interface{}, error) {
if index < 0 || index >= len(rb.buffer) || rb.beginPos == 0 && index > rb.endPos {
return nil, &InvalidIndexError{index}
}
pos := rb.endPos - index
if pos < 0 {
pos += len(rb.buffer)
}
return rb.buffer[pos], nil
}
func (rb *RingBuffer) Append(value interface{}) {
if rb.beginPos == -1 || rb.endPos == -1 {
rb.beginPos = 0
rb.endPos = 0
rb.buffer[0] = value
return
}
if rb.endPos == len(rb.buffer)-1 {
rb.endPos = 0
} else {
rb.endPos++
}
if rb.beginPos == len(rb.buffer)-1 {
rb.beginPos = 0
} else if rb.beginPos == rb.endPos {
rb.beginPos++
}
rb.buffer[rb.endPos] = value
}
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
buffer: make([]interface{}, size),
beginPos: -1,
endPos: -1,
}
}
type Requester interface {
// Добавя заявка за изпълнение и я изпълнява, ако това е необходимо, при първа възможност.
AddRequest(request Request)
// Спира 'Заявчика'. Това означава, че изчаква всички вече започнали заявки да завършат
// и извиква `SetResult` на тези заявки, които вече са били добавени, но "равни" на тях вече са били изпълнявание.
// Нови заявки не трябва да бъдат започвани през това време, нито вече започнати, равни на тях, да бъдат добавяни за извикване на `SetResult`.
Stop()
}
type SimpleRequester struct {
queue chan Request
cache *RingBuffer
throttle, receiveStopper, sendStopper, addStopper chan struct{}
mapLock sync.RWMutex
waiter sync.WaitGroup
classes map[string]*RequestClass
}
type RequestClass struct {
Queue chan Request
Stopper chan struct{}
}
type CachedResult struct {
ID string
Result interface{}
Err error
}
type RequestNotFoundError struct {
ID string
}
func (rnfe *RequestNotFoundError) Error() string {
return fmt.Sprintf("could not find result of request with ID %v in cache", rnfe.ID)
}
func (sr *SimpleRequester) FindResult(id string) (*CachedResult, error) {
cacheLen := sr.cache.Length()
for i := 0; i < cacheLen; i++ {
cacheItem, _ := sr.cache.Item(i)
if result, ok := cacheItem.(CachedResult); ok && result.ID == id {
return &result, nil
}
}
return nil, &RequestNotFoundError{id}
}
func (sr *SimpleRequester) AddRequest(request Request) {
select {
case <-sr.addStopper:
sr.addStopper <- struct{}{}
default:
sr.queue <- request
}
}
func (sr *SimpleRequester) Stop() {
sr.addStopper <- struct{}{}
sr.sendStopper <- struct{}{}
sr.receiveStopper <- struct{}{}
sr.waiter.Wait()
}
// Връща нов заявчик, който кешира отговорите на до cacheSize заявки,
// изпълнявайки не повече от throttleSize заявки едновременно.
func NewRequester(cacheSize int, throttleSize int) Requester {
sr := &SimpleRequester{
cache: NewRingBuffer(cacheSize),
queue: make(chan Request, bufferSize),
throttle: make(chan struct{}, throttleSize),
receiveStopper: make(chan struct{}, 1),
sendStopper: make(chan struct{}, 1),
addStopper: make(chan struct{}, 1),
classes: make(map[string]*RequestClass),
}
for i := 0; i < throttleSize; i++ {
sr.throttle <- struct{}{}
}
sr.waiter.Add(1)
go func() {
for {
select {
case request := <-sr.queue:
sr.waiter.Add(1)
go func(request Request) {
defer sr.waiter.Done()
id := request.ID()
sr.mapLock.RLock()
class, ok := sr.classes[id]
sr.mapLock.RUnlock()
if ok {
select {
case <-class.Stopper:
class.Stopper <- struct{}{}
default:
class.Queue <- request
}
return
}
cachedResult, _ := sr.FindResult(id)
if cachedResult != nil {
request.SetResult(cachedResult.Result, cachedResult.Err)
return
}
<-sr.throttle
newClass := &RequestClass{
Queue: make(chan Request, bufferSize),
Stopper: make(chan struct{}, 1),
}
sr.mapLock.Lock()
sr.classes[id] = newClass
sr.mapLock.Unlock()
result, err := request.Run()
newClass.Stopper <- struct{}{}
close(newClass.Queue)
if request.Cacheable() {
sr.cache.Append(&CachedResult{
ID: id,
Result: result,
Err: err,
})
for identicalRequest := range newClass.Queue {
identicalRequest.SetResult(result, err)
}
} else {
for identicalRequest := range newClass.Queue {
select {
case <-sr.sendStopper:
sr.sendStopper <- struct{}{}
break
default:
sr.queue <- identicalRequest
}
}
}
sr.mapLock.Lock()
delete(sr.classes, id)
sr.mapLock.Unlock()
sr.throttle <- struct{}{}
}(request)
case <-sr.receiveStopper:
close(sr.queue)
sr.waiter.Done()
return
}
}
}()
return sr
}

Лог от изпълнението

PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.003s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.003s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.104s
panic: test timed out after 1s

goroutine 26 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2b28, 0x66fe20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820062510)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-1nknvqc/_test/_testmain.go:78 +0x116

goroutine 20 [chan receive]:
_/tmp/d20160101-5892-1nknvqc.TestAddRequestRunsOnlyFirstRequest(0xc820098120)
	/tmp/d20160101-5892-1nknvqc/solution_test.go:152 +0x3f2
testing.tRunner(0xc820098120, 0x66fe68)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 21 [select]:
_/tmp/d20160101-5892-1nknvqc.NewRequester.func1(0xc820060420)
	/tmp/d20160101-5892-1nknvqc/solution.go:187 +0x1a7
created by _/tmp/d20160101-5892-1nknvqc.NewRequester
	/tmp/d20160101-5892-1nknvqc/solution.go:261 +0x370
exit status 2
FAIL	_/tmp/d20160101-5892-1nknvqc	1.005s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.003s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.043s
panic: test timed out after 1s

goroutine 16 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2b28, 0x66fe20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-1nknvqc/_test/_testmain.go:78 +0x116

goroutine 6 [chan receive]:
_/tmp/d20160101-5892-1nknvqc.TestCacheSize(0xc82008e000)
	/tmp/d20160101-5892-1nknvqc/solution_test.go:293 +0x866
testing.tRunner(0xc82008e000, 0x66feb0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 7 [select]:
_/tmp/d20160101-5892-1nknvqc.NewRequester.func1(0xc82001e480)
	/tmp/d20160101-5892-1nknvqc/solution.go:187 +0x1a7
created by _/tmp/d20160101-5892-1nknvqc.NewRequester
	/tmp/d20160101-5892-1nknvqc/solution.go:261 +0x370
exit status 2
FAIL	_/tmp/d20160101-5892-1nknvqc	1.005s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.043s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.053s
--- FAIL: TestStopWithQueue (0.10s)
	solution_test.go:57: Should not be Ran after Stop
FAIL
exit status 1
FAIL	_/tmp/d20160101-5892-1nknvqc	0.214s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.033s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.053s
PASS
ok  	_/tmp/d20160101-5892-1nknvqc	0.115s

История (2 версии и 1 коментар)

Радослав обнови решението на 31.12.2015 15:11 (преди над 2 години)

+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+const (
+ bufferSize = 100
+)
+
+type Request interface {
+ // Връща идентификатор за заявката. Ако две заявки имат еднакви идентификатори
+ // то те са "равни".
+ ID() string
+
+ // Блокира докато изпълнява заявката.
+ // Връща резултата или грешка ако изпълнението е неуспешно.
+ // Резултата и грешката не трябва да бъдат подавани на SetResult
+ // за текущата заявка - те се запазват вътрешно преди да бъдат върнати.
+ Run() (result interface{}, err error)
+
+ // Връща дали заявката е кешируерма.
+ // Метода има неопределено поведение, ако бъде извикан преди `Run`.
+ Cacheable() bool
+
+ // Задава резултата на заявката.
+ // Не трябва да се извиква за заявки, за които е бил извикан `Run`.
+ SetResult(result interface{}, err error)
+}
+
+type RingBuffer struct {
+ buffer []interface{}
+ beginPos, endPos int
+}
+
+type InvalidIndexError struct {
+ index int
+}
+
+func (iie *InvalidIndexError) Error() string {
+ return fmt.Sprintf("%d is not a valid index for an element in this buffer", iie.index)
+}
+
+func (rb *RingBuffer) Length() int {
+ if rb.beginPos < rb.endPos {
+ return rb.endPos - rb.beginPos + 1
+ } else {
+ return len(rb.buffer)
+ }
+}
+
+func (rb *RingBuffer) Item(index int) (interface{}, error) {
+ if index < 0 || index >= len(rb.buffer) || rb.beginPos == 0 && index > rb.endPos {
+ return nil, &InvalidIndexError{index}
+ }
+
+ pos := rb.endPos - index
+
+ if pos < 0 {
+ pos += len(rb.buffer)
+ }
+
+ return rb.buffer[pos], nil
+}
+
+func (rb *RingBuffer) Append(value interface{}) {
+ if rb.beginPos == -1 || rb.endPos == -1 {
+ rb.beginPos = 0
+ rb.endPos = 0
+ rb.buffer[0] = value
+ return
+ }
+
+ if rb.endPos == len(rb.buffer)-1 {
+ rb.endPos = 0
+ } else {
+ rb.endPos++
+ }
+
+ if rb.beginPos == len(rb.buffer)-1 {
+ rb.beginPos = 0
+ } else if rb.beginPos == rb.endPos {
+ rb.beginPos++
+ }
+
+ rb.buffer[rb.endPos] = value
+}
+
+func NewRingBuffer(size int) *RingBuffer {
+ return &RingBuffer{
+ buffer: make([]interface{}, size),
+ beginPos: -1,
+ endPos: -1,
+ }
+}
+
+type Requester interface {
+ // Добавя заявка за изпълнение и я изпълнява, ако това е необходимо, при първа възможност.
+ AddRequest(request Request)
+
+ // Спира 'Заявчика'. Това означава, че изчаква всички вече започнали заявки да завършат
+ // и извиква `SetResult` на тези заявки, които вече са били добавени, но "равни" на тях вече са били изпълнявание.
+ // Нови заявки не трябва да бъдат започвани през това време, нито вече започнати, равни на тях, да бъдат добавяни за извикване на `SetResult`.
+ Stop()
+}
+
+type SimpleRequester struct {
+ queue chan Request
+ cache *RingBuffer
+ throttle chan struct{}
+ lock sync.RWMutex
+ waiter sync.WaitGroup
+ isStopped bool
+ classes map[string]chan Request
+}
+
+type CachedResult struct {
+ ID string
+ Result interface{}
+ Err error
+}
+
+type RequestNotFoundError struct {
+ ID string
+}
+
+func (rnfe *RequestNotFoundError) Error() string {
+ return fmt.Sprintf("could not find result of request with ID %v in cache", rnfe.ID)
+}
+
+func (sr *SimpleRequester) FindResult(id string) (*CachedResult, error) {
+ cacheLen := sr.cache.Length()
+
+ for i := 0; i < cacheLen; i++ {
+ cacheItem, _ := sr.cache.Item(i)
+
+ if result, ok := cacheItem.(CachedResult); ok && result.ID == id {
+ return &result, nil
+ }
+ }
+
+ return nil, &RequestNotFoundError{id}
+}
+
+func (sr *SimpleRequester) AddRequest(request Request) {
+ if !sr.isStopped {
+ sr.queue <- request
+ }
+}
+
+func (sr *SimpleRequester) Stop() {
+ sr.isStopped = true
+ close(sr.queue)
+ for range sr.queue {
+ }
+ sr.waiter.Wait()
+ close(sr.throttle)
+ for range sr.throttle {
+ }
+}
+
+// Връща нов заявчик, който кешира отговорите на до cacheSize заявки,
+// изпълнявайки не повече от throttleSize заявки едновременно.
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ sr := &SimpleRequester{
+ cache: NewRingBuffer(cacheSize),
+ queue: make(chan Request, bufferSize),
+ throttle: make(chan struct{}, throttleSize),
+ classes: make(map[string]chan Request),
+ }
+
+ for i := 0; i < throttleSize; i++ {
+ sr.throttle <- struct{}{}
+ }
+
+ sr.waiter.Add(1)
+
+ go func() {
+ for request := range sr.queue {
+ sr.waiter.Add(1)
+
+ go func(request Request) {
+ defer sr.waiter.Done()
+ id := request.ID()
+ sr.lock.Lock()
+ class, ok := sr.classes[id]
+ sr.lock.Unlock()
+
+ if ok {
+ class <- request
+ return
+ }
+
+ cachedResult, _ := sr.FindResult(id)
+
+ if cachedResult != nil {
+ request.SetResult(cachedResult.Result, cachedResult.Err)
+ return
+ }
+
+ <-sr.throttle
+ class = make(chan Request, bufferSize)
+ sr.lock.Lock()
+ sr.classes[id] = class
+ sr.lock.Unlock()
+ result, err := request.Run()
+ close(class)
+
+ if request.Cacheable() {
+ sr.cache.Append(&CachedResult{
+ ID: id,
+ Result: result,
+ Err: err,
+ })
+
+ for identicalRequest := range class {
+ identicalRequest.SetResult(result, err)
+ }
+ } else if !sr.isStopped {
+ for identicalRequest := range class {
+ sr.queue <- identicalRequest
+ }
+ }
+
+ sr.lock.Lock()
+ delete(sr.classes, id)
+ sr.lock.Unlock()
+ sr.throttle <- struct{}{}
+ }(request)
+ }
+
+ sr.waiter.Done()
+ }()
+
+ return sr
+}

Здрасти,

Нямам време за подробно разглеждане(навярно и ти нямаш :)), но мога да ти кажа че имаш race-condition-и които ще излязат ако си ръннеш примерния тест със -race.

п.п. Приятно изкарване на Новата Година

Радослав обнови решението на 31.12.2015 22:50 (преди над 2 години)

package main
import (
"fmt"
"sync"
)
const (
- bufferSize = 100
+ bufferSize = 1000
)
type Request interface {
// Връща идентификатор за заявката. Ако две заявки имат еднакви идентификатори
// то те са "равни".
ID() string
// Блокира докато изпълнява заявката.
// Връща резултата или грешка ако изпълнението е неуспешно.
// Резултата и грешката не трябва да бъдат подавани на SetResult
// за текущата заявка - те се запазват вътрешно преди да бъдат върнати.
Run() (result interface{}, err error)
// Връща дали заявката е кешируерма.
// Метода има неопределено поведение, ако бъде извикан преди `Run`.
Cacheable() bool
// Задава резултата на заявката.
// Не трябва да се извиква за заявки, за които е бил извикан `Run`.
SetResult(result interface{}, err error)
}
type RingBuffer struct {
buffer []interface{}
beginPos, endPos int
}
type InvalidIndexError struct {
index int
}
func (iie *InvalidIndexError) Error() string {
return fmt.Sprintf("%d is not a valid index for an element in this buffer", iie.index)
}
func (rb *RingBuffer) Length() int {
if rb.beginPos < rb.endPos {
return rb.endPos - rb.beginPos + 1
} else {
return len(rb.buffer)
}
}
func (rb *RingBuffer) Item(index int) (interface{}, error) {
if index < 0 || index >= len(rb.buffer) || rb.beginPos == 0 && index > rb.endPos {
return nil, &InvalidIndexError{index}
}
pos := rb.endPos - index
if pos < 0 {
pos += len(rb.buffer)
}
return rb.buffer[pos], nil
}
func (rb *RingBuffer) Append(value interface{}) {
if rb.beginPos == -1 || rb.endPos == -1 {
rb.beginPos = 0
rb.endPos = 0
rb.buffer[0] = value
return
}
if rb.endPos == len(rb.buffer)-1 {
rb.endPos = 0
} else {
rb.endPos++
}
if rb.beginPos == len(rb.buffer)-1 {
rb.beginPos = 0
} else if rb.beginPos == rb.endPos {
rb.beginPos++
}
rb.buffer[rb.endPos] = value
}
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
buffer: make([]interface{}, size),
beginPos: -1,
endPos: -1,
}
}
type Requester interface {
// Добавя заявка за изпълнение и я изпълнява, ако това е необходимо, при първа възможност.
AddRequest(request Request)
// Спира 'Заявчика'. Това означава, че изчаква всички вече започнали заявки да завършат
// и извиква `SetResult` на тези заявки, които вече са били добавени, но "равни" на тях вече са били изпълнявание.
// Нови заявки не трябва да бъдат започвани през това време, нито вече започнати, равни на тях, да бъдат добавяни за извикване на `SetResult`.
Stop()
}
type SimpleRequester struct {
queue chan Request
cache *RingBuffer
- throttle chan struct{}
- lock sync.RWMutex
+ throttle, receiveStopper, sendStopper, addStopper chan struct{}
+ mapLock sync.RWMutex
waiter sync.WaitGroup
- isStopped bool
- classes map[string]chan Request
+ classes map[string]*RequestClass
}
+type RequestClass struct {
+ Queue chan Request
+ Stopper chan struct{}
+}
+
type CachedResult struct {
ID string
Result interface{}
Err error
}
type RequestNotFoundError struct {
ID string
}
func (rnfe *RequestNotFoundError) Error() string {
return fmt.Sprintf("could not find result of request with ID %v in cache", rnfe.ID)
}
func (sr *SimpleRequester) FindResult(id string) (*CachedResult, error) {
cacheLen := sr.cache.Length()
for i := 0; i < cacheLen; i++ {
cacheItem, _ := sr.cache.Item(i)
if result, ok := cacheItem.(CachedResult); ok && result.ID == id {
return &result, nil
}
}
return nil, &RequestNotFoundError{id}
}
func (sr *SimpleRequester) AddRequest(request Request) {
- if !sr.isStopped {
- sr.queue <- request
- }
+ select {
+ case <-sr.addStopper:
+ sr.addStopper <- struct{}{}
+ default:
+ sr.queue <- request
+ }
}
func (sr *SimpleRequester) Stop() {
- sr.isStopped = true
- close(sr.queue)
- for range sr.queue {
- }
+ sr.addStopper <- struct{}{}
+ sr.sendStopper <- struct{}{}
+ sr.receiveStopper <- struct{}{}
sr.waiter.Wait()
- close(sr.throttle)
- for range sr.throttle {
- }
}
// Връща нов заявчик, който кешира отговорите на до cacheSize заявки,
// изпълнявайки не повече от throttleSize заявки едновременно.
func NewRequester(cacheSize int, throttleSize int) Requester {
sr := &SimpleRequester{
cache: NewRingBuffer(cacheSize),
queue: make(chan Request, bufferSize),
throttle: make(chan struct{}, throttleSize),
- classes: make(map[string]chan Request),
+ receiveStopper: make(chan struct{}, 1),
+ sendStopper: make(chan struct{}, 1),
+ addStopper: make(chan struct{}, 1),
+ classes: make(map[string]*RequestClass),
}
for i := 0; i < throttleSize; i++ {
sr.throttle <- struct{}{}
}
sr.waiter.Add(1)
go func() {
- for request := range sr.queue {
- sr.waiter.Add(1)
+ for {
+ select {
+ case request := <-sr.queue:
+ sr.waiter.Add(1)
- go func(request Request) {
- defer sr.waiter.Done()
- id := request.ID()
- sr.lock.Lock()
- class, ok := sr.classes[id]
- sr.lock.Unlock()
+ go func(request Request) {
+ defer sr.waiter.Done()
+ id := request.ID()
+ sr.mapLock.RLock()
+ class, ok := sr.classes[id]
+ sr.mapLock.RUnlock()
- if ok {
- class <- request
- return
- }
+ if ok {
+ select {
+ case <-class.Stopper:
+ class.Stopper <- struct{}{}
+ default:
+ class.Queue <- request
+ }
- cachedResult, _ := sr.FindResult(id)
+ return
+ }
- if cachedResult != nil {
- request.SetResult(cachedResult.Result, cachedResult.Err)
- return
- }
+ cachedResult, _ := sr.FindResult(id)
- <-sr.throttle
- class = make(chan Request, bufferSize)
- sr.lock.Lock()
- sr.classes[id] = class
- sr.lock.Unlock()
- result, err := request.Run()
- close(class)
+ if cachedResult != nil {
+ request.SetResult(cachedResult.Result, cachedResult.Err)
+ return
+ }
- if request.Cacheable() {
- sr.cache.Append(&CachedResult{
- ID: id,
- Result: result,
- Err: err,
- })
+ <-sr.throttle
+ newClass := &RequestClass{
+ Queue: make(chan Request, bufferSize),
+ Stopper: make(chan struct{}, 1),
+ }
+ sr.mapLock.Lock()
+ sr.classes[id] = newClass
+ sr.mapLock.Unlock()
+ result, err := request.Run()
+ newClass.Stopper <- struct{}{}
+ close(newClass.Queue)
- for identicalRequest := range class {
- identicalRequest.SetResult(result, err)
- }
- } else if !sr.isStopped {
- for identicalRequest := range class {
- sr.queue <- identicalRequest
- }
- }
+ if request.Cacheable() {
+ sr.cache.Append(&CachedResult{
+ ID: id,
+ Result: result,
+ Err: err,
+ })
- sr.lock.Lock()
- delete(sr.classes, id)
- sr.lock.Unlock()
- sr.throttle <- struct{}{}
- }(request)
- }
+ for identicalRequest := range newClass.Queue {
+ identicalRequest.SetResult(result, err)
+ }
+ } else {
+ for identicalRequest := range newClass.Queue {
+ select {
+ case <-sr.sendStopper:
+ sr.sendStopper <- struct{}{}
+ break
+ default:
+ sr.queue <- identicalRequest
+ }
+ }
+ }
- sr.waiter.Done()
+ sr.mapLock.Lock()
+ delete(sr.classes, id)
+ sr.mapLock.Unlock()
+ sr.throttle <- struct{}{}
+ }(request)
+ case <-sr.receiveStopper:
+ close(sr.queue)
+ sr.waiter.Done()
+ return
+ }
+ }
}()
return sr
}