Решение на HTTP сваляч от Андон Мицов

Обратно към всички решения

Към профила на Андон Мицов

Резултати

  • 6 точки от тестове
  • 0 бонус точки
  • 6 точки общо
  • 8 успешни тест(а)
  • 5 неуспешни тест(а)

Код

package main
import "sync"
import "time"
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type Result struct {
id string
resData interface{}
err error
}
func (r *Result) ID() string {
return r.id
}
type Cache struct {
cachedData []Result
startIndex uint32
endIndex uint32
elementsSize uint32
maxSize uint32
}
func (q *Cache) Push(res Result) {
q.endIndex++
if q.endIndex >= q.maxSize {
q.endIndex = 0
}
if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
q.startIndex++
if q.startIndex >= q.maxSize {
q.startIndex = 0
}
} else {
q.elementsSize++
}
q.cachedData[q.endIndex] = res
}
func (q *Cache) GetResult(id string) *Result {
for _, res := range q.cachedData {
if id == res.ID() {
return &res
}
}
return nil
}
type RequestQueue []Request
func (q *RequestQueue) Push(r Request) {
*q = append(*q, r)
}
func (q *RequestQueue) Pop() (r Request) {
if len(*q) == 0 {
return nil
}
r = (*q)[0]
*q = (*q)[1:]
return
}
type Worker struct {
isStopped bool
unresolvedRequests RequestQueue
workingRequestsNum int
cacheLock sync.RWMutex
unresolvedRequestsLocker sync.Mutex
stopLocker sync.RWMutex
workersNumLocker sync.RWMutex
workersWaitGr sync.WaitGroup
//slice с канали
maxWorkers int
cachedRequests Cache
}
//WIP
//thread safe methods for increasing the workerQueue number
func (w *Worker) workersNumIncr() {
w.workersNumLocker.Lock()
w.workingRequestsNum++
w.workersNumLocker.Unlock()
}
func (w *Worker) workersNumDecr() {
w.workersNumLocker.Lock()
w.workingRequestsNum--
w.workersNumLocker.Unlock()
}
func (w *Worker) workersNum() int {
w.workersNumLocker.RLock()
defer w.workersNumLocker.RUnlock()
return w.workingRequestsNum
}
//this method handles all the requests of the Requester
func (w *Worker) handleRequests() {
for {
w.stopLocker.RLock()
if w.isStopped {
w.stopLocker.RUnlock()
return
}
w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
r := w.unresolvedRequests.Pop()
w.unresolvedRequestsLocker.Unlock()
if r != nil {
for w.workersNum() >= w.maxWorkers {
time.Sleep(10*time.Millisecond)
}
w.cacheLock.RLock()
c := w.cachedRequests.GetResult(r.ID())
w.cacheLock.RUnlock()
if c != nil {
r.SetResult(c.resData, c.err)
} else {
w.workersNumIncr()
go func() {
w.workersWaitGr.Add(1)
res, e := r.Run()
if r.Cacheable() {
cacheEntry := Result{id: r.ID(), resData: res, err: e}
w.cacheLock.Lock()
w.cachedRequests.Push(cacheEntry)
w.cacheLock.Unlock()
}
w.workersNumDecr()
w.workersWaitGr.Done()
}()
}
}
}
}
func (w *Worker) AddRequest(request Request) {
w.stopLocker.RLock()
if w.isStopped {
w.stopLocker.RUnlock()
return
}
w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
w.unresolvedRequests.Push(request)
w.unresolvedRequestsLocker.Unlock()
}
func (w *Worker) Stop() {
w.stopLocker.Lock()
w.isStopped = true
w.stopLocker.Unlock()
w.workersWaitGr.Wait()
}
func NewRequester(cacheSize int, throttleSize int) Requester {
var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
cachedRequests: c, maxWorkers: throttleSize}
go newRequester.handleRequests()
return newRequester
}

Лог от изпълнението

PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.004s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.003s
panic: test timed out after 1s

goroutine 12 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e1288, 0x66ee20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-s2e32d/_test/_testmain.go:78 +0x116

goroutine 18 [semacquire]:
sync.runtime_Semacquire(0xc8200a0084)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc8200a0078)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-s2e32d.(*Worker).Stop(0xc8200a0000)
	/tmp/d20160101-5892-s2e32d/solution.go:193 +0x6b
runtime.Goexit()
	/usr/local/go/src/runtime/panic.go:320 +0xfb
testing.(*common).FailNow(0xc82009a000)
	/usr/local/go/src/testing/testing.go:336 +0x36
testing.(*common).Fatalf(0xc82009a000, 0x5c81c0, 0x21, 0x0, 0x0, 0x0)
	/usr/local/go/src/testing/testing.go:379 +0x83
_/tmp/d20160101-5892-s2e32d.TestNonCacheableRequestsFast(0xc82009a000)
	/tmp/d20160101-5892-s2e32d/solution_test.go:391 +0x53a
testing.tRunner(0xc82009a000, 0x66eee0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 21 [chan receive]:
_/tmp/d20160101-5892-s2e32d.TestNonCacheableRequestsFast.func1(0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-s2e32d/solution_test.go:372 +0x7d
_/tmp/d20160101-5892-s2e32d.(*request).Run(0xc8200a4000, 0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-s2e32d/solution_test.go:37 +0xa5
_/tmp/d20160101-5892-s2e32d.(*Worker).handleRequests.func1(0xc8200a0000, 0x7f232ced7578, 0xc8200a4000)
	/tmp/d20160101-5892-s2e32d/solution.go:157 +0x63
created by _/tmp/d20160101-5892-s2e32d.(*Worker).handleRequests
	/tmp/d20160101-5892-s2e32d/solution.go:168 +0x2b3
exit status 2
FAIL	_/tmp/d20160101-5892-s2e32d	1.005s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.003s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.003s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.043s
panic: test timed out after 1s

goroutine 20 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e1288, 0x66ee20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-s2e32d/_test/_testmain.go:78 +0x116

goroutine 6 [chan receive]:
_/tmp/d20160101-5892-s2e32d.TestCacheSize(0xc82008c000)
	/tmp/d20160101-5892-s2e32d/solution_test.go:293 +0x866
testing.tRunner(0xc82008c000, 0x66eeb0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 7 [runnable]:
sync.(*Mutex).Lock(0xc82008e040)
	/usr/local/go/src/sync/mutex.go:41
_/tmp/d20160101-5892-s2e32d.(*Worker).handleRequests(0xc82008e000)
	/tmp/d20160101-5892-s2e32d/solution.go:138 +0xa5
created by _/tmp/d20160101-5892-s2e32d.NewRequester
	/tmp/d20160101-5892-s2e32d/solution.go:202 +0x1f4
exit status 2
FAIL	_/tmp/d20160101-5892-s2e32d	1.006s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.055s
panic: test timed out after 1s

goroutine 11 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e1288, 0x66ee20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-s2e32d/_test/_testmain.go:78 +0x116

goroutine 6 [semacquire]:
sync.runtime_Semacquire(0xc820092084)
	/usr/local/go/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc820092078)
	/usr/local/go/src/sync/waitgroup.go:126 +0xb4
_/tmp/d20160101-5892-s2e32d.(*Worker).Stop(0xc820092000)
	/tmp/d20160101-5892-s2e32d/solution.go:193 +0x6b
runtime.Goexit()
	/usr/local/go/src/runtime/panic.go:320 +0xfb
testing.(*common).FailNow(0xc820090000)
	/usr/local/go/src/testing/testing.go:336 +0x36
testing.(*common).Fatalf(0xc820090000, 0x5c81c0, 0x21, 0x0, 0x0, 0x0)
	/usr/local/go/src/testing/testing.go:379 +0x83
_/tmp/d20160101-5892-s2e32d.TestNonCacheableRequestsFast(0xc820090000)
	/tmp/d20160101-5892-s2e32d/solution_test.go:391 +0x53a
testing.tRunner(0xc820090000, 0x66eee0)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 9 [chan receive]:
_/tmp/d20160101-5892-s2e32d.TestNonCacheableRequestsFast.func1(0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-s2e32d/solution_test.go:372 +0x7d
_/tmp/d20160101-5892-s2e32d.(*request).Run(0xc82000a480, 0x0, 0x0, 0x0, 0x0)
	/tmp/d20160101-5892-s2e32d/solution_test.go:37 +0xa5
_/tmp/d20160101-5892-s2e32d.(*Worker).handleRequests.func1(0xc820092000, 0x7fe9d6a69578, 0xc82000a480)
	/tmp/d20160101-5892-s2e32d/solution.go:157 +0x63
created by _/tmp/d20160101-5892-s2e32d.(*Worker).handleRequests
	/tmp/d20160101-5892-s2e32d/solution.go:168 +0x2b3
exit status 2
FAIL	_/tmp/d20160101-5892-s2e32d	1.006s
panic: test timed out after 1s

goroutine 12 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e1288, 0x66ee20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820010650)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-s2e32d/_test/_testmain.go:78 +0x116

goroutine 6 [chan receive]:
_/tmp/d20160101-5892-s2e32d.TestStopWithQueue(0xc82008e000)
	/tmp/d20160101-5892-s2e32d/solution_test.go:455 +0x814
testing.tRunner(0xc82008e000, 0x66eef8)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d
exit status 2
FAIL	_/tmp/d20160101-5892-s2e32d	1.005s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.033s
PASS
ok  	_/tmp/d20160101-5892-s2e32d	0.053s
panic: test timed out after 1s

goroutine 8 [running]:
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:703 +0x132
created by time.goFunc
	/usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x5e1288, 0x66ee20, 0xd, 0xd, 0x1)
	/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc82003fef8, 0xc820062510)
	/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
	_/tmp/d20160101-5892-s2e32d/_test/_testmain.go:78 +0x116

goroutine 20 [chan receive]:
_/tmp/d20160101-5892-s2e32d.TestStopWithQueueFromForum(0xc8200a4000)
	/tmp/d20160101-5892-s2e32d/solution_test.go:633 +0xd74
testing.tRunner(0xc8200a4000, 0x66ef40)
	/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
	/usr/local/go/src/testing/testing.go:561 +0x86d
exit status 2
FAIL	_/tmp/d20160101-5892-s2e32d	1.005s

История (2 версии и 1 коментар)

Андон обнови решението на 31.12.2015 02:54 (преди над 2 години)

+package main
+
+import "sync"
+
+
+type Request interface {
+ ID() string
+ Run() (result interface{}, err error)
+
+ Cacheable() bool
+
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+ Stop()
+}
+
+type Result struct {
+ id string
+ resData interface{}
+ err error
+}
+
+func (r *Result) ID() string {
+ return r.id
+}
+
+type Cache struct {
+ cachedData []Result
+
+ startIndex uint32
+ endIndex uint32
+
+ elementsSize uint32
+ maxSize uint32
+}
+
+func (q *Cache) Push(res Result) {
+ q.endIndex++
+ if q.endIndex >= q.maxSize {
+ q.endIndex = 0
+ }
+
+ if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
+ q.startIndex++
+ if q.startIndex >= q.maxSize {
+ q.startIndex = 0
+ }
+ } else {
+ q.elementsSize++
+ }
+
+ q.cachedData[q.endIndex] = res
+}
+
+func (q *Cache) GetResult(id string) *Result {
+ for _, res := range q.cachedData {
+ if id == res.ID() {
+ return &res
+ }
+ }
+
+ return nil
+}
+
+type RequestQueue []Request
+
+func (q *RequestQueue) Push(r Request) {
+ *q = append(*q, r)
+}
+
+func (q *RequestQueue) Pop() (r Request) {
+ if len(*q) == 0 {
+ return nil
+ }
+
+ r = (*q)[0]
+
+ *q = (*q)[1:]
+
+ return
+}
+
+type Worker struct {
+ isStopped bool
+
+ unresolvedRequests RequestQueue
+
+ workingRequestsNum int
+ cacheLock sync.Mutex
+ unresolvedRequestsLocker sync.Mutex
+
+ workersWaitGr sync.WaitGroup
+ //slice с канали
+ maxWorkers int
+
+ cachedRequests Cache
+}
+
+//WIP
+
+//this method handles all the requests of the Requester
+func (w *Worker) handleRequests() {
+ for !w.isStopped {
+ if len(w.unresolvedRequests) == 0 {
+ continue
+ }
+
+ for w.workingRequestsNum >= w.maxWorkers {
+ }
+
+ w.unresolvedRequestsLocker.Lock()
+ r := w.unresolvedRequests.Pop()
+ w.unresolvedRequestsLocker.Unlock()
+ if r != nil {
+ if c := w.cachedRequests.GetResult(r.ID()); c != nil {
+
+ r.SetResult(c.resData, c.err)
+ } else {
+
+ w.workingRequestsNum++
+
+ go func() {
+ w.workersWaitGr.Add(1)
+ res, e := r.Run()
+
+ if r.Cacheable() {
+ cacheEntry := Result{id: r.ID(), resData: res, err: e}
+ w.cacheLock.Lock()
+ w.cachedRequests.Push(cacheEntry)
+ w.cacheLock.Unlock()
+ }
+ w.workingRequestsNum--
+ w.workersWaitGr.Done()
+ }()
+
+ }
+ }
+ }
+}
+
+func (w *Worker) AddRequest(request Request) {
+ if w.isStopped {
+ return
+ }
+
+ w.unresolvedRequestsLocker.Lock()
+ w.unresolvedRequests.Push(request)
+ w.unresolvedRequestsLocker.Unlock()
+}
+
+func (w *Worker) Stop() {
+ w.isStopped = true
+
+ w.workersWaitGr.Wait()
+}
+
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
+
+ var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
+ cachedRequests: c, maxWorkers: throttleSize}
+
+ go newRequester.handleRequests()
+ return newRequester
+}

Андон обнови решението на 31.12.2015 17:08 (преди над 2 години)

package main
import "sync"
+import "time"
-
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type Result struct {
id string
resData interface{}
err error
}
func (r *Result) ID() string {
return r.id
}
type Cache struct {
cachedData []Result
startIndex uint32
endIndex uint32
elementsSize uint32
maxSize uint32
}
func (q *Cache) Push(res Result) {
q.endIndex++
if q.endIndex >= q.maxSize {
q.endIndex = 0
}
if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
q.startIndex++
if q.startIndex >= q.maxSize {
q.startIndex = 0
}
} else {
q.elementsSize++
}
q.cachedData[q.endIndex] = res
}
func (q *Cache) GetResult(id string) *Result {
for _, res := range q.cachedData {
if id == res.ID() {
return &res
}
}
return nil
}
type RequestQueue []Request
func (q *RequestQueue) Push(r Request) {
*q = append(*q, r)
}
func (q *RequestQueue) Pop() (r Request) {
if len(*q) == 0 {
return nil
}
r = (*q)[0]
*q = (*q)[1:]
return
}
type Worker struct {
isStopped bool
unresolvedRequests RequestQueue
workingRequestsNum int
- cacheLock sync.Mutex
+ cacheLock sync.RWMutex
unresolvedRequestsLocker sync.Mutex
- workersWaitGr sync.WaitGroup
+ stopLocker sync.RWMutex
+
+ workersNumLocker sync.RWMutex
+ workersWaitGr sync.WaitGroup
//slice с канали
maxWorkers int
cachedRequests Cache
}
//WIP
+//thread safe methods for increasing the workerQueue number
+func (w *Worker) workersNumIncr() {
+ w.workersNumLocker.Lock()
+ w.workingRequestsNum++
+ w.workersNumLocker.Unlock()
+}
+
+func (w *Worker) workersNumDecr() {
+ w.workersNumLocker.Lock()
+ w.workingRequestsNum--
+ w.workersNumLocker.Unlock()
+}
+
+func (w *Worker) workersNum() int {
+ w.workersNumLocker.RLock()
+ defer w.workersNumLocker.RUnlock()
+
+ return w.workingRequestsNum
+}
+
//this method handles all the requests of the Requester
func (w *Worker) handleRequests() {
- for !w.isStopped {
- if len(w.unresolvedRequests) == 0 {
- continue
- }
+ for {
+ w.stopLocker.RLock()
- for w.workingRequestsNum >= w.maxWorkers {
+ if w.isStopped {
+ w.stopLocker.RUnlock()
+ return
}
+ w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
r := w.unresolvedRequests.Pop()
w.unresolvedRequestsLocker.Unlock()
+
if r != nil {
- if c := w.cachedRequests.GetResult(r.ID()); c != nil {
+ for w.workersNum() >= w.maxWorkers {
+ time.Sleep(10*time.Millisecond)
+ }
+ w.cacheLock.RLock()
+ c := w.cachedRequests.GetResult(r.ID())
+ w.cacheLock.RUnlock()
+ if c != nil {
r.SetResult(c.resData, c.err)
} else {
-
- w.workingRequestsNum++
-
+ w.workersNumIncr()
go func() {
+
w.workersWaitGr.Add(1)
res, e := r.Run()
if r.Cacheable() {
cacheEntry := Result{id: r.ID(), resData: res, err: e}
w.cacheLock.Lock()
w.cachedRequests.Push(cacheEntry)
w.cacheLock.Unlock()
}
- w.workingRequestsNum--
+
+ w.workersNumDecr()
w.workersWaitGr.Done()
}()
}
}
}
}
func (w *Worker) AddRequest(request Request) {
+ w.stopLocker.RLock()
if w.isStopped {
+ w.stopLocker.RUnlock()
return
}
+ w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
w.unresolvedRequests.Push(request)
w.unresolvedRequestsLocker.Unlock()
}
func (w *Worker) Stop() {
+ w.stopLocker.Lock()
w.isStopped = true
+ w.stopLocker.Unlock()
w.workersWaitGr.Wait()
}
func NewRequester(cacheSize int, throttleSize int) Requester {
var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
cachedRequests: c, maxWorkers: throttleSize}
go newRequester.handleRequests()
return newRequester
}