Решение на HTTP сваляч от Михаил Здравков

Обратно към всички решения

Към профила на Михаил Здравков

Резултати

  • 7 точки от тестове
  • 0 бонус точки
  • 7 точки общо
  • 9 успешни тест(а)
  • 4 неуспешни тест(а)

Код

package main
import "sync"
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type Result struct {
Result interface{}
Error error
}
type SomeRequester struct {
cache map[string]Result
size int
maxProc int
queues map[string][]Request
mutex sync.Mutex
workerScheduler chan struct{}
order []string
stopped *bool
}
func (sr SomeRequester) AddRequest(request Request) {
if *sr.stopped {
return
}
if result, ok := sr.cache[request.ID()]; ok {
request.SetResult(result.Result, result.Error)
return
}
go sr.runRequest(request)
}
func (sr *SomeRequester) runRequest(request Request) {
index := sr.appendToQueues(request.ID(), request)
if index > 0 {
return
}
sr.workerScheduler <- struct{}{}
defer func() { <-sr.workerScheduler }()
if *sr.stopped {
return
}
result, err := request.Run()
sr.mutex.Lock()
defer func() { sr.mutex.Unlock() }()
defer func() { sr.queues[request.ID()] = make([]Request, 0) }()
if request.Cacheable() {
sr.order = append(sr.order, request.ID())
if len(sr.cache) == sr.size {
delete(sr.cache, sr.order[0])
sr.order = sr.order[1:]
}
sr.cache[request.ID()] = Result{Result: result, Error: err}
sr.processQueueAfterCaching(request.ID())
return
}
for _, queuedRequest := range sr.queues[request.ID()][1:] {
go sr.runRequest(queuedRequest)
}
}
func (sr *SomeRequester) appendToQueues(id string, request Request) int {
sr.mutex.Lock()
index := len(sr.queues[request.ID()])
sr.queues[request.ID()] = append(sr.queues[request.ID()], request)
sr.mutex.Unlock()
return index
}
func (sr *SomeRequester) processQueueAfterCaching(id string) {
for _, request := range sr.queues[id][1:] {
request.SetResult(sr.cache[id].Result, sr.cache[id].Error)
}
}
func (sr SomeRequester) Stop() {
*sr.stopped = true
}
func NewRequester(cacheSize int, throttleSize int) Requester {
requester := SomeRequester{
cache: make(map[string]Result),
size: cacheSize,
workerScheduler: make(chan struct{}, throttleSize),
queues: make(map[string][]Request),
stopped: new(bool),
}
return requester
}

Лог от изпълнението

PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.003s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.003s
--- FAIL: TestNonCacheableRequestsBeingWaitedByStop (0.05s)
	solution_test.go:531: Stop shouldn't have finished before non-cacheable
FAIL
exit status 1
FAIL	_/tmp/d20160101-5892-1r35g06	0.103s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.003s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.003s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.043s
panic: test timed out after 1s

goroutine 17 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e3b18, 0x671e20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-1r35g06/_test/_testmain.go:78 +0x116

goroutine 6 [chan send]:
_/tmp/d20160101-5892-1r35g06.TestCacheSize(0xc82008c000)
	/tmp/d20160101-5892-1r35g06/solution_test.go:303 +0xbe5
testing.tRunner(0xc82008c000, 0x671eb0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d
exit status 2
FAIL	_/tmp/d20160101-5892-1r35g06	1.005s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.043s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.053s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.214s
--- FAIL: TestStopWaits (0.00s)
	solution_test.go:482: Stop finished before Ran was
FAIL
exit status 1
FAIL	_/tmp/d20160101-5892-1r35g06	0.003s
--- FAIL: TestNonCacheableRequestsBeingWaitedByStop (0.05s)
	solution_test.go:531: Stop shouldn't have finished before non-cacheable
FAIL
exit status 1
FAIL	_/tmp/d20160101-5892-1r35g06	0.053s
PASS
ok  	_/tmp/d20160101-5892-1r35g06	0.113s

История (3 версии и 3 коментара)

Михаил обнови решението на 28.12.2015 13:35 (преди над 2 години)

+package main
+
+import (
+ "sync"
+)
+
+type Request interface {
+ ID() string
+
+ Run() (result interface{}, err error)
+
+ Cacheable() bool
+
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+
+ Stop()
+}
+
+type Result struct {
+ Result interface{}
+ Error error
+}
+
+type SomeRequester struct {
+ cache map[string]Result
+ size int
+ maxProc int
+ queues map[string][]Request
+ queuesMutex sync.Mutex
+ workerScheduler chan chan struct{}
+ order []string
+ stopped bool
+}
+
+func (sr SomeRequester) AddRequest(request Request) {
+ if sr.stopped {
+ return
+ }
+
+ if result, ok := sr.cache[request.ID()]; ok {
+ request.SetResult(result.Result, result.Error)
+ return
+ }
+
+ go sr.runRequest(request)
+}
+
+func (sr *SomeRequester) runRequest(request Request) {
+ ch := make(chan struct{})
+ sr.workerScheduler <- ch
+ defer func() { ch <- struct{}{} }()
+ index := sr.appendToQueues(request.ID(), request)
+ if index > 0 {
+ return
+ }
+
+ result, err := request.Run()
+
+ if request.Cacheable() {
+ sr.order = append(sr.order, request.ID())
+
+ if len(sr.cache) == sr.size {
+ delete(sr.cache, sr.order[0])
+ sr.order = sr.order[1:]
+ }
+
+ sr.cache[request.ID()] = Result{Result: result, Error: err}
+ sr.processQueueAfterCaching(request.ID())
+ return
+ }
+
+ sr.queues[request.ID()] = sr.queues[request.ID()][1:]
+ if len(sr.queues[request.ID()]) > 0 {
+ sr.runRequest(sr.queues[request.ID()][0])
+ }
+
+}
+
+func (sr *SomeRequester) appendToQueues(id string, request Request) int {
+ sr.queuesMutex.Lock()
+ index := len(sr.queues[request.ID()])
+ sr.queues[request.ID()] = append(sr.queues[request.ID()], request)
+ sr.queuesMutex.Unlock()
+
+ return index
+}
+
+func (sr *SomeRequester) processQueueAfterCaching(id string) {
+ for _, request := range sr.queues[id][1:] {
+ request.SetResult(sr.cache[id].Result, sr.cache[id].Error)
+ }
+}
+
+func (sr SomeRequester) Stop() {
+ sr.stopped = true
+}
+
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ requester := SomeRequester{
+ cache: make(map[string]Result),
+ size: cacheSize,
+ workerScheduler: make(chan chan struct{}),
+ queues: make(map[string][]Request),
+ }
+
+ for i := 0; i < throttleSize; i++ {
+ go func() {
+ for {
+ ch := <-requester.workerScheduler
+ <-ch
+ }
+ }()
+ }
+
+ return requester
+}

Добър вечер,

Бърз преглед на градус ми казва че :

  1. не си прочел условието за Stop докрай или не съм го обяснил както трябва.
  2. Усетил си се че може би AddRequest не трябва да блокира предвид това че не връща резултат.
  3. имаш race condition-и на ляво и дясно, примерно конкурентното писане и четене в map не е добра идея.
  4. на ред 74-ти си изял един else.

Лека вечер, приятно кодене и Весела Нова Година

Привет,

  1. Мисля, че Stop-a ми е правилен. Реално прави AddRequest да не добавя нови рекуести, а другата част от условието за Stop - рекуестите, които са били добавени, но еднакви на тях се изпълняват, им бива извикан SetResult (ако еднаквия, който се е изпълнявал е кешируем, или биват накарани да се изпълнят, ако не е бил кешируем) Тук въпроса ми е, ако сме спрели Requester-a и имаме чакащи копия на рекуест, когато този рекуест се изпълни и не е кешируем, еднаквите с него трябва да се изпълнят нали?
  2. :)
  3. знам за race condition-ите, просто ме мързи да ги оправя и отлагам :D
  4. вярвам, че elsa не е нужен, защото имам return в края на if block-a
  1. виж този мой коментар
  2. :smile:
  3. ех
  4. ще се съглася с това ти твърдение, но ще поясня че Cacheable за равни request-и винаги връща едно и също нещо. Тоест няма нужда да ги викаш така последователно. Имаше такова пояснение в условието по-време на някоя ревизия но явно е изпаднало.

Михаил обнови решението на 31.12.2015 12:40 (преди над 2 години)

package main
-import (
- "sync"
-)
+import "sync"
type Request interface {
ID() string
-
Run() (result interface{}, err error)
-
Cacheable() bool
-
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
-
Stop()
}
type Result struct {
Result interface{}
Error error
}
type SomeRequester struct {
cache map[string]Result
size int
maxProc int
queues map[string][]Request
- queuesMutex sync.Mutex
- workerScheduler chan chan struct{}
+ mutex sync.Mutex
+ workerScheduler chan struct{}
order []string
stopped bool
}
func (sr SomeRequester) AddRequest(request Request) {
if sr.stopped {
return
}
if result, ok := sr.cache[request.ID()]; ok {
request.SetResult(result.Result, result.Error)
return
}
go sr.runRequest(request)
}
func (sr *SomeRequester) runRequest(request Request) {
- ch := make(chan struct{})
- sr.workerScheduler <- ch
- defer func() { ch <- struct{}{} }()
+ sr.workerScheduler <- struct{}{}
+ defer func() { <-sr.workerScheduler }()
+
index := sr.appendToQueues(request.ID(), request)
if index > 0 {
return
}
result, err := request.Run()
+ if sr.stopped {
+ return
+ }
+
+ sr.mutex.Lock()
+ defer func() { sr.mutex.Unlock() }()
+
if request.Cacheable() {
sr.order = append(sr.order, request.ID())
if len(sr.cache) == sr.size {
delete(sr.cache, sr.order[0])
sr.order = sr.order[1:]
}
sr.cache[request.ID()] = Result{Result: result, Error: err}
sr.processQueueAfterCaching(request.ID())
+
return
}
sr.queues[request.ID()] = sr.queues[request.ID()][1:]
- if len(sr.queues[request.ID()]) > 0 {
- sr.runRequest(sr.queues[request.ID()][0])
+ for _, queuedRequest := range sr.queues[request.ID()] {
+ go sr.runRequest(queuedRequest)
}
-
+ sr.queues[request.ID()] = make([]Request, 0)
}
func (sr *SomeRequester) appendToQueues(id string, request Request) int {
- sr.queuesMutex.Lock()
+ sr.mutex.Lock()
index := len(sr.queues[request.ID()])
sr.queues[request.ID()] = append(sr.queues[request.ID()], request)
- sr.queuesMutex.Unlock()
+ sr.mutex.Unlock()
return index
}
func (sr *SomeRequester) processQueueAfterCaching(id string) {
for _, request := range sr.queues[id][1:] {
request.SetResult(sr.cache[id].Result, sr.cache[id].Error)
}
}
func (sr SomeRequester) Stop() {
sr.stopped = true
}
func NewRequester(cacheSize int, throttleSize int) Requester {
requester := SomeRequester{
cache: make(map[string]Result),
size: cacheSize,
- workerScheduler: make(chan chan struct{}),
+ workerScheduler: make(chan struct{}, throttleSize),
queues: make(map[string][]Request),
- }
-
- for i := 0; i < throttleSize; i++ {
- go func() {
- for {
- ch := <-requester.workerScheduler
- <-ch
- }
- }()
}
return requester
}

Михаил обнови решението на 31.12.2015 16:00 (преди над 2 години)

package main
import "sync"
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type Result struct {
Result interface{}
Error error
}
type SomeRequester struct {
cache map[string]Result
size int
maxProc int
queues map[string][]Request
mutex sync.Mutex
workerScheduler chan struct{}
order []string
- stopped bool
+ stopped *bool
}
func (sr SomeRequester) AddRequest(request Request) {
- if sr.stopped {
+ if *sr.stopped {
return
}
if result, ok := sr.cache[request.ID()]; ok {
request.SetResult(result.Result, result.Error)
return
}
go sr.runRequest(request)
}
func (sr *SomeRequester) runRequest(request Request) {
- sr.workerScheduler <- struct{}{}
- defer func() { <-sr.workerScheduler }()
-
index := sr.appendToQueues(request.ID(), request)
if index > 0 {
return
}
- result, err := request.Run()
+ sr.workerScheduler <- struct{}{}
+ defer func() { <-sr.workerScheduler }()
- if sr.stopped {
+ if *sr.stopped {
return
}
+ result, err := request.Run()
+
sr.mutex.Lock()
defer func() { sr.mutex.Unlock() }()
+ defer func() { sr.queues[request.ID()] = make([]Request, 0) }()
if request.Cacheable() {
sr.order = append(sr.order, request.ID())
if len(sr.cache) == sr.size {
delete(sr.cache, sr.order[0])
sr.order = sr.order[1:]
}
sr.cache[request.ID()] = Result{Result: result, Error: err}
sr.processQueueAfterCaching(request.ID())
-
return
}
- sr.queues[request.ID()] = sr.queues[request.ID()][1:]
- for _, queuedRequest := range sr.queues[request.ID()] {
+ for _, queuedRequest := range sr.queues[request.ID()][1:] {
go sr.runRequest(queuedRequest)
}
- sr.queues[request.ID()] = make([]Request, 0)
}
func (sr *SomeRequester) appendToQueues(id string, request Request) int {
sr.mutex.Lock()
index := len(sr.queues[request.ID()])
sr.queues[request.ID()] = append(sr.queues[request.ID()], request)
sr.mutex.Unlock()
return index
}
func (sr *SomeRequester) processQueueAfterCaching(id string) {
for _, request := range sr.queues[id][1:] {
request.SetResult(sr.cache[id].Result, sr.cache[id].Error)
}
}
func (sr SomeRequester) Stop() {
- sr.stopped = true
+ *sr.stopped = true
}
func NewRequester(cacheSize int, throttleSize int) Requester {
requester := SomeRequester{
cache: make(map[string]Result),
size: cacheSize,
workerScheduler: make(chan struct{}, throttleSize),
queues: make(map[string][]Request),
+ stopped: new(bool),
}
return requester
}