Решение на HTTP сваляч от Стоян Иванов

Обратно към всички решения

Към профила на Стоян Иванов

Резултати

  • 8 точки от тестове
  • 0 бонус точки
  • 8 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"fmt"
"sync"
)
type Request interface {
// Връща идентификатор за заявката. Ако две заявки имат еднакви идентификатори
// то те са "равни".
ID() string
// Блокира докато изпълнява заявката.
// Връща резултата или грешка ако изпълнението е неуспешно.
// Резултата и грешката не трябва да бъдат подавани на SetResult
// за текущата заявка - те се запазват вътрешно преди да бъдат върнати.
Run() (result interface{}, err error)
// Връща дали заявката е кешируерма.
// Метода има неопределено поведение, ако бъде извикан преди `Run`.
Cacheable() bool
// Задава резултата на заявката.
// Не трябва да се извиква за заявки, за които е бил извикан `Run`.
SetResult(result interface{}, err error)
}
type Requester interface {
// Добавя заявка за изпълнение и я изпълнява, ако това е необходимо, при първа възможност.
AddRequest(request Request)
// Спира 'Заявчика'. Това означава, че изчаква всички вече започнали заявки да завършат
// и извиква `SetResult` на тези заявки, които вече са били добавени, но "равни" на тях вече са били изпълнявание.
// Нови заявки не трябва да бъдат започвани през това време, нито вече започнати, равни на тях, да бъдат добавяни за извикване на `SetResult`.
Stop()
}
type CyclicBuffer struct {
buffer []interface{}
start, end int
}
func (cb *CyclicBuffer) Length() int {
if cb.start < cb.end {
return cb.end - cb.start + 1
} else {
return cb.start - cb.end + 1
}
}
type InvalidIndexError struct {
index int
}
func (e *InvalidIndexError) Error() string {
return fmt.Sprintf("%d is not a valid index", e.index)
}
func (cb *CyclicBuffer) Get(index int) (interface{}, error) {
if index < 0 || index >= len(cb.buffer) || cb.start == 0 && index > cb.end {
return nil, &InvalidIndexError{index}
}
pos := cb.end - index
if pos < 0 {
pos += len(cb.buffer)
}
return cb.buffer[pos], nil
}
func (cb *CyclicBuffer) Add(value interface{}) {
if cb.start == -1 || cb.end == -1 {
cb.start = 0
cb.end = 0
cb.buffer[0] = value
return
}
if cb.end == len(cb.buffer)-1 {
cb.end = 0
} else {
cb.end += 1
}
if cb.start == len(cb.buffer)-1 {
cb.start = 0
} else if cb.start == cb.end {
cb.start += 1
}
cb.buffer[cb.end] = value
}
func NewCyclicBuffer(size int) *CyclicBuffer {
return &CyclicBuffer{
buffer: make([]interface{}, size),
start: -1,
end: -1,
}
}
type BufferRequester struct {
queue chan Request
cache *CyclicBuffer
throttle chan struct{}
lock sync.RWMutex
waiter sync.WaitGroup
isStopped bool
classes map[string]chan Request
}
type Result struct {
ID string
Result interface{}
Err error
}
type RequestNotFoundError struct {
ID string
}
func (e *RequestNotFoundError) Error() string {
return fmt.Sprintf("Could not find result of request which ID is %v", e.ID)
}
func (br *BufferRequester) FindResult(id string) (*Result, error) {
cacheLen := br.cache.Length()
for i := 0; i < cacheLen; i++ {
cacheItem, _ := br.cache.Get(i)
if result, ok := cacheItem.(Result); ok && result.ID == id {
return &result, nil
}
}
return nil, &RequestNotFoundError{id}
}
func (br *BufferRequester) AddRequest(request Request) {
if !br.isStopped {
br.queue <- request
}
}
func (br *BufferRequester) Stop() {
br.isStopped = true
close(br.queue)
for range br.queue {
}
br.waiter.Wait()
close(br.throttle)
for range br.throttle {
}
}
// Връща нов заявчик, който кешира отговорите на до cacheSize заявки,
// изпълнявайки не повече от throttleSize заявки едновременно.
func NewRequester(cacheSize int, throttleSize int) Requester {
br := &BufferRequester{
cache: NewCyclicBuffer(cacheSize),
queue: make(chan Request, 100),
throttle: make(chan struct{}, throttleSize),
classes: make(map[string]chan Request),
}
for i := 0; i < throttleSize; i++ {
br.throttle <- struct{}{}
}
br.waiter.Add(1)
go func() {
for request := range br.queue {
br.waiter.Add(1)
go func(request Request) {
defer br.waiter.Done()
id := request.ID()
br.lock.Lock()
class, ok := br.classes[id]
br.lock.Unlock()
if ok {
class <- request
return
}
cachedResult, _ := br.FindResult(id)
if cachedResult != nil {
request.SetResult(cachedResult.Result, cachedResult.Err)
return
}
<-br.throttle
class = make(chan Request, 100)
br.lock.Lock()
br.classes[id] = class
br.lock.Unlock()
result, err := request.Run()
close(class)
if request.Cacheable() {
br.cache.Add(&Result{
ID: id,
Result: result,
Err: err,
})
for identicalRequest := range class {
identicalRequest.SetResult(result, err)
}
} else if !br.isStopped {
for identicalRequest := range class {
br.queue <- identicalRequest
}
}
br.lock.Lock()
delete(br.classes, id)
br.lock.Unlock()
br.throttle <- struct{}{}
}(request)
}
br.waiter.Done()
}()
return br
}

Лог от изпълнението

PASS
ok  	_/tmp/d20160101-5892-ve92np	0.003s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.003s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.104s
panic: test timed out after 1s

goroutine 3 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2788, 0x66fe20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820062510)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-ve92np/_test/_testmain.go:78 +0x116

goroutine 20 [chan receive]:
_/tmp/d20160101-5892-ve92np.TestAddRequestRunsOnlyFirstRequest(0xc820098120)
	/tmp/d20160101-5892-ve92np/solution_test.go:152 +0x3f2
testing.tRunner(0xc820098120, 0x66fe68)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 21 [chan receive]:
_/tmp/d20160101-5892-ve92np.NewRequester.func1(0xc8200840f0)
	/tmp/d20160101-5892-ve92np/solution.go:177 +0x54
created by _/tmp/d20160101-5892-ve92np.NewRequester
	/tmp/d20160101-5892-ve92np/solution.go:231 +0x299
exit status 2
FAIL	_/tmp/d20160101-5892-ve92np	1.005s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.003s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.043s
panic: test timed out after 1s

goroutine 18 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e2788, 0x66fe20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-ve92np/_test/_testmain.go:78 +0x116

goroutine 6 [chan receive]:
_/tmp/d20160101-5892-ve92np.TestCacheSize(0xc82008e000)
	/tmp/d20160101-5892-ve92np/solution_test.go:299 +0x982
testing.tRunner(0xc82008e000, 0x66feb0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 7 [chan receive]:
_/tmp/d20160101-5892-ve92np.NewRequester.func1(0xc82001a140)
	/tmp/d20160101-5892-ve92np/solution.go:177 +0x54
created by _/tmp/d20160101-5892-ve92np.NewRequester
	/tmp/d20160101-5892-ve92np/solution.go:231 +0x299
exit status 2
FAIL	_/tmp/d20160101-5892-ve92np	1.007s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.043s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.053s
--- FAIL: TestStopWithQueue (0.10s)
	solution_test.go:57: Should not be Ran after Stop
FAIL
exit status 1
FAIL	_/tmp/d20160101-5892-ve92np	0.217s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.033s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.053s
PASS
ok  	_/tmp/d20160101-5892-ve92np	0.114s

История (1 версия и 0 коментара)

Стоян обнови решението на 31.12.2015 18:40 (преди над 2 години)

+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+type Request interface {
+ // Връща идентификатор за заявката. Ако две заявки имат еднакви идентификатори
+ // то те са "равни".
+ ID() string
+
+ // Блокира докато изпълнява заявката.
+ // Връща резултата или грешка ако изпълнението е неуспешно.
+ // Резултата и грешката не трябва да бъдат подавани на SetResult
+ // за текущата заявка - те се запазват вътрешно преди да бъдат върнати.
+ Run() (result interface{}, err error)
+
+ // Връща дали заявката е кешируерма.
+ // Метода има неопределено поведение, ако бъде извикан преди `Run`.
+ Cacheable() bool
+
+ // Задава резултата на заявката.
+ // Не трябва да се извиква за заявки, за които е бил извикан `Run`.
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ // Добавя заявка за изпълнение и я изпълнява, ако това е необходимо, при първа възможност.
+ AddRequest(request Request)
+
+ // Спира 'Заявчика'. Това означава, че изчаква всички вече започнали заявки да завършат
+ // и извиква `SetResult` на тези заявки, които вече са били добавени, но "равни" на тях вече са били изпълнявание.
+ // Нови заявки не трябва да бъдат започвани през това време, нито вече започнати, равни на тях, да бъдат добавяни за извикване на `SetResult`.
+ Stop()
+}
+
+type CyclicBuffer struct {
+ buffer []interface{}
+ start, end int
+}
+
+func (cb *CyclicBuffer) Length() int {
+ if cb.start < cb.end {
+ return cb.end - cb.start + 1
+ } else {
+ return cb.start - cb.end + 1
+ }
+}
+
+type InvalidIndexError struct {
+ index int
+}
+
+func (e *InvalidIndexError) Error() string {
+ return fmt.Sprintf("%d is not a valid index", e.index)
+}
+
+func (cb *CyclicBuffer) Get(index int) (interface{}, error) {
+ if index < 0 || index >= len(cb.buffer) || cb.start == 0 && index > cb.end {
+ return nil, &InvalidIndexError{index}
+ }
+
+ pos := cb.end - index
+
+ if pos < 0 {
+ pos += len(cb.buffer)
+ }
+
+ return cb.buffer[pos], nil
+
+}
+
+func (cb *CyclicBuffer) Add(value interface{}) {
+ if cb.start == -1 || cb.end == -1 {
+ cb.start = 0
+ cb.end = 0
+ cb.buffer[0] = value
+ return
+ }
+
+ if cb.end == len(cb.buffer)-1 {
+ cb.end = 0
+ } else {
+ cb.end += 1
+ }
+
+ if cb.start == len(cb.buffer)-1 {
+ cb.start = 0
+ } else if cb.start == cb.end {
+ cb.start += 1
+ }
+
+ cb.buffer[cb.end] = value
+}
+
+func NewCyclicBuffer(size int) *CyclicBuffer {
+ return &CyclicBuffer{
+ buffer: make([]interface{}, size),
+ start: -1,
+ end: -1,
+ }
+}
+
+type BufferRequester struct {
+ queue chan Request
+ cache *CyclicBuffer
+ throttle chan struct{}
+ lock sync.RWMutex
+ waiter sync.WaitGroup
+ isStopped bool
+ classes map[string]chan Request
+}
+
+type Result struct {
+ ID string
+ Result interface{}
+ Err error
+}
+
+type RequestNotFoundError struct {
+ ID string
+}
+
+func (e *RequestNotFoundError) Error() string {
+ return fmt.Sprintf("Could not find result of request which ID is %v", e.ID)
+}
+
+func (br *BufferRequester) FindResult(id string) (*Result, error) {
+ cacheLen := br.cache.Length()
+
+ for i := 0; i < cacheLen; i++ {
+ cacheItem, _ := br.cache.Get(i)
+
+ if result, ok := cacheItem.(Result); ok && result.ID == id {
+ return &result, nil
+ }
+ }
+
+ return nil, &RequestNotFoundError{id}
+}
+
+func (br *BufferRequester) AddRequest(request Request) {
+ if !br.isStopped {
+ br.queue <- request
+ }
+}
+
+func (br *BufferRequester) Stop() {
+ br.isStopped = true
+ close(br.queue)
+ for range br.queue {
+ }
+ br.waiter.Wait()
+ close(br.throttle)
+ for range br.throttle {
+ }
+}
+
+// Връща нов заявчик, който кешира отговорите на до cacheSize заявки,
+// изпълнявайки не повече от throttleSize заявки едновременно.
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ br := &BufferRequester{
+ cache: NewCyclicBuffer(cacheSize),
+ queue: make(chan Request, 100),
+ throttle: make(chan struct{}, throttleSize),
+ classes: make(map[string]chan Request),
+ }
+
+ for i := 0; i < throttleSize; i++ {
+ br.throttle <- struct{}{}
+ }
+
+ br.waiter.Add(1)
+
+ go func() {
+ for request := range br.queue {
+ br.waiter.Add(1)
+
+ go func(request Request) {
+ defer br.waiter.Done()
+ id := request.ID()
+ br.lock.Lock()
+ class, ok := br.classes[id]
+ br.lock.Unlock()
+
+ if ok {
+ class <- request
+ return
+ }
+
+ cachedResult, _ := br.FindResult(id)
+
+ if cachedResult != nil {
+ request.SetResult(cachedResult.Result, cachedResult.Err)
+ return
+ }
+
+ <-br.throttle
+ class = make(chan Request, 100)
+ br.lock.Lock()
+ br.classes[id] = class
+ br.lock.Unlock()
+ result, err := request.Run()
+ close(class)
+
+ if request.Cacheable() {
+ br.cache.Add(&Result{
+ ID: id,
+ Result: result,
+ Err: err,
+ })
+
+ for identicalRequest := range class {
+ identicalRequest.SetResult(result, err)
+ }
+ } else if !br.isStopped {
+ for identicalRequest := range class {
+ br.queue <- identicalRequest
+ }
+ }
+
+ br.lock.Lock()
+ delete(br.classes, id)
+ br.lock.Unlock()
+ br.throttle <- struct{}{}
+ }(request)
+ }
+
+ br.waiter.Done()
+ }()
+
+ return br
+}