Решение на HTTP сваляч от Юлия Недялкова

Обратно към всички решения

Към профила на Юлия Недялкова

Резултати

  • 7 точки от тестове
  • 0 бонус точки
  • 7 точки общо
  • 9 успешни тест(а)
  • 4 неуспешни тест(а)

Код

package main
import "sync"
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type CachedRequest struct {
request Request
result interface{}
err error
}
type Added struct {
toRun chan *Request
closed bool
mutex *sync.Mutex
}
func (a *Added) close() {
a.mutex.Lock()
defer a.mutex.Unlock()
a.closed = true
}
func (a *Added) getStatusClosed() bool {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.closed
}
type Running struct {
counter chan struct{}
runningRequests []*SyncRequest
mutex *sync.Mutex
}
func (r *Running) addToRuning(request *SyncRequest) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.runningRequests = append(r.runningRequests, request)
}
func (r *Running) removeFromRuning(request *SyncRequest) {
r.mutex.Lock()
defer r.mutex.Unlock()
for i := 0; i < len(r.runningRequests); i++ {
if r.runningRequests[i] == request {
r.runningRequests = append(r.runningRequests[:i], append([]*SyncRequest{request}, r.runningRequests[i+1:]...)...)
return
}
}
}
func (r *Running) doubleIsRunning(request Request) *SyncRequest {
r.mutex.Lock()
defer r.mutex.Unlock()
for i := 0; i < len(r.runningRequests); i++ {
if (*r.runningRequests[i].request).ID() == request.ID() {
return r.runningRequests[i]
}
}
return nil
}
type Cached struct {
buffer []CachedRequest
positionToInsert int
cacheSize int
mutex *sync.Mutex
}
func (c *Cached) AddToCache(toCache *CachedRequest) {
c.mutex.Lock()
defer c.mutex.Unlock()
var left []CachedRequest
if c.positionToInsert != c.cacheSize-1 && c.positionToInsert != len(c.buffer) {
left = append([]CachedRequest{*toCache}, c.buffer[c.positionToInsert+1:]...)
} else {
left = []CachedRequest{*toCache}
}
c.buffer = append(c.buffer[:c.positionToInsert], left...)
c.positionToInsert = (c.positionToInsert + 1) % c.cacheSize
}
func (c *Cached) isInCache(request Request) *CachedRequest {
c.mutex.Lock()
defer c.mutex.Unlock()
for i := 0; i < len(c.buffer); i++ {
if c.buffer[i].request.ID() == request.ID() {
return &(c.buffer[i])
}
}
return nil
}
type Cache struct {
addedRequests *Added
runningRequests *Running
cachedRequests *Cached
wg, finished sync.WaitGroup
mutex *sync.Mutex
}
type SyncRequest struct {
request *Request
ready chan struct{}
}
func NewRequester(cacheSize, throttleSize int) Requester {
toRun := make(chan *Request, 100)
closed := new(sync.Mutex)
addedRequests := &Added{toRun, false, closed}
counter := make(chan struct{}, throttleSize)
var runningBuffer []*SyncRequest
runningMutex := new(sync.Mutex)
for i := 0; i < throttleSize; i++ {
counter <- struct{}{}
}
runningRequests := &Running{counter, runningBuffer, runningMutex}
var cacheBuffer []CachedRequest
cacheMutex := new(sync.Mutex)
cachedRequests := &Cached{cacheBuffer, 0, cacheSize, cacheMutex}
var wg, finished sync.WaitGroup
mutex := new(sync.Mutex)
requester := &Cache{addedRequests, runningRequests, cachedRequests, wg, finished, mutex}
go requester.serveRequests()
return requester
}
func (c *Cache) AddRequest(request Request) {
if c.addedRequests.getStatusClosed() == false {
c.wg.Add(1)
c.finished.Add(1)
c.addedRequests.toRun <- &request
c.wg.Wait()
}
}
func (c *Cache) serveRequests() {
for {
request := <-c.addedRequests.toRun
if c.addedRequests.getStatusClosed() == true {
c.finishRequests()
}
if cacheResult := c.cachedRequests.isInCache(*request); cacheResult != nil {
(*request).SetResult(cacheResult.result, cacheResult.err)
} else if double := c.runningRequests.doubleIsRunning(*request); double != nil {
<-double.ready
if cached := c.cachedRequests.isInCache(*request); cached != nil {
(*request).SetResult(cached.result, cached.err)
} else {
c.runRequest(request)
}
} else {
c.runRequest(request)
}
c.wg.Done()
c.finished.Done()
}
}
func (c *Cache) runRequest(request *Request) (interface{}, error) {
ready := make(chan struct{}, 1)
req := &SyncRequest{request, ready}
<-c.runningRequests.counter
c.runningRequests.addToRuning(req)
result, err := (*request).Run()
if (*request).Cacheable() {
c.cachedRequests.AddToCache(&CachedRequest{*request, result, err})
}
ready <- struct{}{}
c.runningRequests.counter <- struct{}{}
return result, err
}
func (c *Cache) finishRequests() {
for request := range c.addedRequests.toRun {
if cached := c.cachedRequests.isInCache(*request); cached != nil {
(*request).SetResult(cached.result, cached.err)
}
c.wg.Done()
c.finished.Done()
}
}
func (c *Cache) Stop() {
c.addedRequests.close()
c.finished.Wait()
}

Лог от изпълнението

PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.003s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.003s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.103s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.003s
panic: test timed out after 1s

goroutine 10 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2be0, 0x670e20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-19lo7it/_test/_testmain.go:78 +0x116

goroutine 6 [chan receive]:
_/tmp/d20160101-5892-19lo7it.TestRequestsAreRunAsyncly(0xc82008e000)
	/tmp/d20160101-5892-19lo7it/solution_test.go:190 +0x586
testing.tRunner(0xc82008e000, 0x670e80)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 7 [chan receive]:
_/tmp/d20160101-5892-19lo7it.TestRequestsAreRunAsyncly.func1(0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-19lo7it/solution_test.go:171 +0xb1
_/tmp/d20160101-5892-19lo7it.(*request).Run(0xc82000a4e0, 0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-19lo7it/solution_test.go:37 +0xa5
_/tmp/d20160101-5892-19lo7it.(*Cache).runRequest(0xc8200166c0, 0xc820010720, 0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-19lo7it/solution.go:185 +0x14c
_/tmp/d20160101-5892-19lo7it.(*Cache).serveRequests(0xc8200166c0)
	/tmp/d20160101-5892-19lo7it/solution.go:172 +0x2a4
created by _/tmp/d20160101-5892-19lo7it.NewRequester
	/tmp/d20160101-5892-19lo7it/solution.go:141 +0x3fc

goroutine 8 [semacquire]:
sync.runtime_Semacquire(0xc8200166e4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc8200166d8)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc8200166c0, 0x7f58a5220578, 0xc82000a4e0)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestRequestsAreRunAsyncly
	/tmp/d20160101-5892-19lo7it/solution_test.go:187 +0x4d1

goroutine 9 [semacquire]:
sync.runtime_Semacquire(0xc8200166e4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc8200166d8)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc8200166c0, 0x7f58a5220578, 0xc82000a540)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestRequestsAreRunAsyncly
	/tmp/d20160101-5892-19lo7it/solution_test.go:189 +0x563
exit status 2
FAIL	_/tmp/d20160101-5892-19lo7it	1.005s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.043s
panic: sync: WaitGroup is reused before previous Wait has returned

goroutine 26 [running]:
sync.(*WaitGroup).Wait(0xc82005e698)
	/usr/local/go/src/sync/waitgroup.go:128 +0x114
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e680, 0x7fa123423550, 0xc820062620)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestCacheSize
	/tmp/d20160101-5892-19lo7it/solution_test.go:298 +0x95f

goroutine 1 [chan receive]:
testing.RunTests(0x5e2be0, 0x670e20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820062510)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-19lo7it/_test/_testmain.go:78 +0x116

goroutine 20 [runnable]:
_/tmp/d20160101-5892-19lo7it.TestCacheSize(0xc8200a0000)
	/tmp/d20160101-5892-19lo7it/solution_test.go:299 +0x982
testing.tRunner(0xc8200a0000, 0x670eb0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 21 [chan receive]:
_/tmp/d20160101-5892-19lo7it.(*Cache).serveRequests(0xc82005e680)
	/tmp/d20160101-5892-19lo7it/solution.go:156 +0x4a
created by _/tmp/d20160101-5892-19lo7it.NewRequester
	/tmp/d20160101-5892-19lo7it/solution.go:141 +0x3fc

goroutine 22 [runnable]:
sync.runtime_Semacquire(0xc82005e6a4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e698)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e680, 0x7fa123423550, 0xc820062610)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestCacheSize
	/tmp/d20160101-5892-19lo7it/solution_test.go:277 +0x585

goroutine 23 [runnable]:
sync.runtime_Semacquire(0xc82005e6a4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e698)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e680, 0x7fa123423550, 0xc820062620)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestCacheSize
	/tmp/d20160101-5892-19lo7it/solution_test.go:281 +0x643

goroutine 24 [runnable]:
sync.runtime_Semacquire(0xc82005e6a4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e698)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e680, 0x7fa123423550, 0xc820062630)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestCacheSize
	/tmp/d20160101-5892-19lo7it/solution_test.go:285 +0x701
exit status 2
FAIL	_/tmp/d20160101-5892-19lo7it	0.006s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.063s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.053s
panic: test timed out after 1s

goroutine 4 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2be0, 0x670e20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820062510)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-19lo7it/_test/_testmain.go:78 +0x116

goroutine 20 [chan receive]:
_/tmp/d20160101-5892-19lo7it.TestStopWithQueue(0xc8200a2000)
	/tmp/d20160101-5892-19lo7it/solution_test.go:454 +0x7f1
testing.tRunner(0xc8200a2000, 0x670ef8)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 21 [chan receive]:
_/tmp/d20160101-5892-19lo7it.(*Cache).finishRequests(0xc82005e600)
	/tmp/d20160101-5892-19lo7it/solution.go:195 +0x54
_/tmp/d20160101-5892-19lo7it.(*Cache).serveRequests(0xc82005e600)
	/tmp/d20160101-5892-19lo7it/solution.go:159 +0x88
created by _/tmp/d20160101-5892-19lo7it.NewRequester
	/tmp/d20160101-5892-19lo7it/solution.go:141 +0x3fc

goroutine 22 [semacquire]:
sync.runtime_Semacquire(0xc82005e624)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e618)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e600, 0x7f5dc5559550, 0xc8200663f0)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestStopWithQueue
	/tmp/d20160101-5892-19lo7it/solution_test.go:439 +0x5ae

goroutine 23 [semacquire]:
sync.runtime_Semacquire(0xc82005e624)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e618)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e600, 0x7f5dc5559550, 0xc820066420)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestStopWithQueue
	/tmp/d20160101-5892-19lo7it/solution_test.go:441 +0x646

goroutine 24 [semacquire]:
sync.runtime_Semacquire(0xc82005e624)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e618)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc82005e600, 0x7f5dc5559550, 0xc820066450)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestStopWithQueue
	/tmp/d20160101-5892-19lo7it/solution_test.go:442 +0x6bb

goroutine 25 [semacquire]:
sync.runtime_Semacquire(0xc82005e634)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc82005e628)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).Stop(0xc82005e600)
	/tmp/d20160101-5892-19lo7it/solution.go:206 +0x3e
_/tmp/d20160101-5892-19lo7it.TestStopWithQueue.func3(0xc8200603c0, 0x7f5dc5559520, 0xc82005e600, 0xc820060420)
	/tmp/d20160101-5892-19lo7it/solution_test.go:446 +0x35
created by _/tmp/d20160101-5892-19lo7it.TestStopWithQueue
	/tmp/d20160101-5892-19lo7it/solution_test.go:448 +0x70e
exit status 2
FAIL	_/tmp/d20160101-5892-19lo7it	1.006s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.033s
PASS
ok  	_/tmp/d20160101-5892-19lo7it	0.053s
panic: test timed out after 1s

goroutine 17 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2be0, 0x670e20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-19lo7it/_test/_testmain.go:78 +0x116

goroutine 6 [chan receive]:
_/tmp/d20160101-5892-19lo7it.TestStopWithQueueFromForum(0xc82008e000)
	/tmp/d20160101-5892-19lo7it/solution_test.go:614 +0xa5c
testing.tRunner(0xc82008e000, 0x670f40)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 7 [chan receive]:
_/tmp/d20160101-5892-19lo7it.TestStopWithQueueFromForum.func3(0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-19lo7it/solution_test.go:584 +0x71
_/tmp/d20160101-5892-19lo7it.(*request).Run(0xc82000a570, 0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-19lo7it/solution_test.go:37 +0xa5
_/tmp/d20160101-5892-19lo7it.(*Cache).runRequest(0xc8200166c0, 0xc820010730, 0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-19lo7it/solution.go:185 +0x14c
_/tmp/d20160101-5892-19lo7it.(*Cache).serveRequests(0xc8200166c0)
	/tmp/d20160101-5892-19lo7it/solution.go:172 +0x2a4
created by _/tmp/d20160101-5892-19lo7it.NewRequester
	/tmp/d20160101-5892-19lo7it/solution.go:141 +0x3fc

goroutine 8 [semacquire]:
sync.runtime_Semacquire(0xc8200166e4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc8200166d8)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc8200166c0, 0x7f14fe74a578, 0xc82000a4e0)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestStopWithQueueFromForum
	/tmp/d20160101-5892-19lo7it/solution_test.go:612 +0x9c1

goroutine 9 [semacquire]:
sync.runtime_Semacquire(0xc8200166e4)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc8200166d8)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-19lo7it.(*Cache).AddRequest(0xc8200166c0, 0x7f14fe74a578, 0xc82000a570)
	/tmp/d20160101-5892-19lo7it/solution.go:150 +0x109
created by _/tmp/d20160101-5892-19lo7it.TestStopWithQueueFromForum
	/tmp/d20160101-5892-19lo7it/solution_test.go:613 +0xa36
exit status 2
FAIL	_/tmp/d20160101-5892-19lo7it	1.006s

История (2 версии и 1 коментар)

Юлия обнови решението на 29.12.2015 19:47 (преди над 2 години)

+package main
+
+import "sync"
+
+type Request interface {
+ ID() string
+ Run() (result interface{}, err error)
+ Cacheable() bool
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+ Stop()
+}
+
+type CachedRequest struct {
+ request Request
+ result interface{}
+ err error
+}
+
+type Added struct {
+ toRun chan *Request
+ closed bool
+ mutex *sync.Mutex
+}
+
+type Running struct {
+ counter chan struct{}
+ runningRequests []*SyncRequest
+ mutex *sync.Mutex
+}
+
+func (r *Running) addToRuning(request *SyncRequest) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ r.runningRequests = append(r.runningRequests, request)
+}
+
+func (r *Running) removeFromRuning(request *SyncRequest) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ for i := 0; i < len(r.runningRequests); i++ {
+ if r.runningRequests[i] == request {
+ r.runningRequests = append(r.runningRequests[:i], append([]*SyncRequest{request}, r.runningRequests[i+1:]...)...)
+ return
+ }
+ }
+}
+
+func (r *Running) doubleIsRunning(request Request) *SyncRequest {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ for i := 0; i < len(r.runningRequests); i++ {
+ if (*r.runningRequests[i].request).ID() == request.ID() {
+ return r.runningRequests[i]
+ }
+ }
+ return nil
+}
+
+type Cached struct {
+ buffer []CachedRequest
+ positionToInsert int
+ cacheSize int
+ mutex *sync.Mutex
+}
+
+func (c *Cached) AddToCache(toCache *CachedRequest) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ var left []CachedRequest
+
+ if c.positionToInsert != c.cacheSize-1 && c.positionToInsert != len(c.buffer) {
+ left = append([]CachedRequest{*toCache}, c.buffer[c.positionToInsert+1:]...)
+ } else {
+ left = []CachedRequest{*toCache}
+ }
+ c.buffer = append(c.buffer[:c.positionToInsert], left...)
+ c.positionToInsert = (c.positionToInsert + 1) % c.cacheSize
+}
+
+func (c *Cached) isInCache(request Request) *CachedRequest {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ for i := 0; i < len(c.buffer); i++ {
+ if c.buffer[i].request.ID() == request.ID() {
+ return &(c.buffer[i])
+ }
+ }
+ return nil
+}
+
+type Cache struct {
+ addedRequests *Added
+ runningRequests *Running
+ cachedRequests *Cached
+ wg, finished sync.WaitGroup
+ mutex *sync.Mutex
+}
+
+type SyncRequest struct {
+ request *Request
+ ready chan struct{}
+}
+
+func NewRequester(cacheSize, throttleSize int) Requester {
+ toRun := make(chan *Request, 100)
+ closed := new(sync.Mutex)
+ addedRequests := &Added{toRun, false, closed}
+
+ counter := make(chan struct{}, throttleSize)
+ var runningBuffer []*SyncRequest
+ runningMutex := new(sync.Mutex)
+ for i := 0; i < throttleSize; i++ {
+ counter <- struct{}{}
+ }
+ runningRequests := &Running{counter, runningBuffer, runningMutex}
+
+ var cacheBuffer []CachedRequest
+ cacheMutex := new(sync.Mutex)
+ cachedRequests := &Cached{cacheBuffer, 0, cacheSize, cacheMutex}
+
+ var wg, finished sync.WaitGroup
+ mutex := new(sync.Mutex)
+ requester := &Cache{addedRequests, runningRequests, cachedRequests, wg, finished, mutex}
+
+ go requester.serveRequests()
+ return requester
+}
+
+func (c *Cache) AddRequest(request Request) {
+ c.addedRequests.mutex.Lock()
+ closed := c.addedRequests.closed
+ c.addedRequests.mutex.Unlock()
+ if closed == false {
+ c.wg.Add(1)
+ c.finished.Add(1)
+ c.addedRequests.toRun <- &request
+ c.wg.Wait()
+ }
+}
+
+func (c *Cache) serveRequests() {
+ for {
+ request := <-c.addedRequests.toRun
+ if request == nil {
+ c.finishRequests()
+ return
+ }
+ if cacheResult := c.cachedRequests.isInCache(*request); cacheResult != nil {
+ (*request).SetResult(cacheResult.result, cacheResult.err)
+ } else if double := c.runningRequests.doubleIsRunning(*request); double != nil {
+ <-double.ready
+ if cached := c.cachedRequests.isInCache(*request); cached != nil {
+ (*request).SetResult(cached.result, cached.err)
+ } else {
+ c.runRequest(request)
+ }
+ } else {
+ c.runRequest(request)
+ }
+ c.wg.Done()
+ c.finished.Done()
+ }
+}
+
+func (c *Cache) runRequest(request *Request) (interface{}, error) {
+ ready := make(chan struct{}, 1)
+ req := &SyncRequest{request, ready}
+
+ <-c.runningRequests.counter
+ c.runningRequests.addToRuning(req)
+ result, err := (*request).Run()
+ if (*request).Cacheable() {
+ c.cachedRequests.AddToCache(&CachedRequest{*request, result, err})
+ }
+ ready <- struct{}{}
+ c.runningRequests.counter <- struct{}{}
+ return result, err
+}
+
+func (c *Cache) finishRequests() {
+ for request := range c.addedRequests.toRun {
+ if cached := c.cachedRequests.isInCache(*request); cached != nil {
+ (*request).SetResult(cached.result, cached.err)
+ }
+ c.wg.Done()
+ c.finished.Done()
+ }
+}
+
+func (c *Cache) Stop() {
+ c.addedRequests.toRun <- nil
+ c.addedRequests.mutex.Lock()
+ c.addedRequests.closed = true
+ c.addedRequests.mutex.Unlock()
+ c.finished.Wait()
+}

Юлия обнови решението на 29.12.2015 20:14 (преди над 2 години)

package main
import "sync"
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type CachedRequest struct {
request Request
result interface{}
err error
}
type Added struct {
toRun chan *Request
closed bool
mutex *sync.Mutex
}
+func (a *Added) close() {
+ a.mutex.Lock()
+ defer a.mutex.Unlock()
+ a.closed = true
+}
+
+func (a *Added) getStatusClosed() bool {
+ a.mutex.Lock()
+ defer a.mutex.Unlock()
+ return a.closed
+}
+
type Running struct {
counter chan struct{}
runningRequests []*SyncRequest
mutex *sync.Mutex
}
func (r *Running) addToRuning(request *SyncRequest) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.runningRequests = append(r.runningRequests, request)
}
func (r *Running) removeFromRuning(request *SyncRequest) {
r.mutex.Lock()
defer r.mutex.Unlock()
for i := 0; i < len(r.runningRequests); i++ {
if r.runningRequests[i] == request {
r.runningRequests = append(r.runningRequests[:i], append([]*SyncRequest{request}, r.runningRequests[i+1:]...)...)
return
}
}
}
func (r *Running) doubleIsRunning(request Request) *SyncRequest {
r.mutex.Lock()
defer r.mutex.Unlock()
for i := 0; i < len(r.runningRequests); i++ {
if (*r.runningRequests[i].request).ID() == request.ID() {
return r.runningRequests[i]
}
}
return nil
}
type Cached struct {
buffer []CachedRequest
positionToInsert int
cacheSize int
mutex *sync.Mutex
}
func (c *Cached) AddToCache(toCache *CachedRequest) {
c.mutex.Lock()
defer c.mutex.Unlock()
var left []CachedRequest
if c.positionToInsert != c.cacheSize-1 && c.positionToInsert != len(c.buffer) {
left = append([]CachedRequest{*toCache}, c.buffer[c.positionToInsert+1:]...)
} else {
left = []CachedRequest{*toCache}
}
c.buffer = append(c.buffer[:c.positionToInsert], left...)
c.positionToInsert = (c.positionToInsert + 1) % c.cacheSize
}
func (c *Cached) isInCache(request Request) *CachedRequest {
c.mutex.Lock()
defer c.mutex.Unlock()
for i := 0; i < len(c.buffer); i++ {
if c.buffer[i].request.ID() == request.ID() {
return &(c.buffer[i])
}
}
return nil
}
type Cache struct {
addedRequests *Added
runningRequests *Running
cachedRequests *Cached
wg, finished sync.WaitGroup
mutex *sync.Mutex
}
type SyncRequest struct {
request *Request
ready chan struct{}
}
func NewRequester(cacheSize, throttleSize int) Requester {
toRun := make(chan *Request, 100)
closed := new(sync.Mutex)
addedRequests := &Added{toRun, false, closed}
counter := make(chan struct{}, throttleSize)
var runningBuffer []*SyncRequest
runningMutex := new(sync.Mutex)
for i := 0; i < throttleSize; i++ {
counter <- struct{}{}
}
runningRequests := &Running{counter, runningBuffer, runningMutex}
var cacheBuffer []CachedRequest
cacheMutex := new(sync.Mutex)
cachedRequests := &Cached{cacheBuffer, 0, cacheSize, cacheMutex}
var wg, finished sync.WaitGroup
mutex := new(sync.Mutex)
requester := &Cache{addedRequests, runningRequests, cachedRequests, wg, finished, mutex}
go requester.serveRequests()
return requester
}
func (c *Cache) AddRequest(request Request) {
- c.addedRequests.mutex.Lock()
- closed := c.addedRequests.closed
- c.addedRequests.mutex.Unlock()
- if closed == false {
+ if c.addedRequests.getStatusClosed() == false {
c.wg.Add(1)
c.finished.Add(1)
c.addedRequests.toRun <- &request
c.wg.Wait()
}
}
func (c *Cache) serveRequests() {
for {
request := <-c.addedRequests.toRun
- if request == nil {
+
+ if c.addedRequests.getStatusClosed() == true {
c.finishRequests()
- return
}
+
if cacheResult := c.cachedRequests.isInCache(*request); cacheResult != nil {
(*request).SetResult(cacheResult.result, cacheResult.err)
} else if double := c.runningRequests.doubleIsRunning(*request); double != nil {
<-double.ready
if cached := c.cachedRequests.isInCache(*request); cached != nil {
(*request).SetResult(cached.result, cached.err)
} else {
c.runRequest(request)
}
} else {
c.runRequest(request)
}
c.wg.Done()
c.finished.Done()
}
}
func (c *Cache) runRequest(request *Request) (interface{}, error) {
ready := make(chan struct{}, 1)
req := &SyncRequest{request, ready}
<-c.runningRequests.counter
c.runningRequests.addToRuning(req)
result, err := (*request).Run()
if (*request).Cacheable() {
c.cachedRequests.AddToCache(&CachedRequest{*request, result, err})
}
ready <- struct{}{}
c.runningRequests.counter <- struct{}{}
return result, err
}
func (c *Cache) finishRequests() {
for request := range c.addedRequests.toRun {
if cached := c.cachedRequests.isInCache(*request); cached != nil {
(*request).SetResult(cached.result, cached.err)
}
c.wg.Done()
c.finished.Done()
}
}
func (c *Cache) Stop() {
- c.addedRequests.toRun <- nil
- c.addedRequests.mutex.Lock()
- c.addedRequests.closed = true
- c.addedRequests.mutex.Unlock()
+ c.addedRequests.close()
c.finished.Wait()
}

Здрасти,

на бързо:

  • на 150 ред ти се панира един тест със

    panic: sync: WaitGroup is reused before previous Wait has returned
    
  • AddRequest-а е мислен асинхронно - тоест не чака задачата да завърши, така че тази WaitGroup-а може да я пропуснеш

  • Stop-а ти нещо не работи в примера от форума. Но мисля че е просто от WaitGroup-ата, тоест нея чака.

Довечера може и още, ако ли не Приятна Нова Година