Решение на HTTP сваляч от Андон Мицов

Обратно към всички решения

Към профила на Андон Мицов

Резултати

  • 6 точки от тестове
  • 0 бонус точки
  • 6 точки общо
  • 8 успешни тест(а)
  • 5 неуспешни тест(а)

Код

package main
import "sync"
import "time"
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type Result struct {
id string
resData interface{}
err error
}
func (r *Result) ID() string {
return r.id
}
type Cache struct {
cachedData []Result
startIndex uint32
endIndex uint32
elementsSize uint32
maxSize uint32
}
func (q *Cache) Push(res Result) {
q.endIndex++
if q.endIndex >= q.maxSize {
q.endIndex = 0
}
if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
q.startIndex++
if q.startIndex >= q.maxSize {
q.startIndex = 0
}
} else {
q.elementsSize++
}
q.cachedData[q.endIndex] = res
}
func (q *Cache) GetResult(id string) *Result {
for _, res := range q.cachedData {
if id == res.ID() {
return &res
}
}
return nil
}
type RequestQueue []Request
func (q *RequestQueue) Push(r Request) {
*q = append(*q, r)
}
func (q *RequestQueue) Pop() (r Request) {
if len(*q) == 0 {
return nil
}
r = (*q)[0]
*q = (*q)[1:]
return
}
type Worker struct {
isStopped bool
unresolvedRequests RequestQueue
workingRequestsNum int
cacheLock sync.RWMutex
unresolvedRequestsLocker sync.Mutex
stopLocker sync.RWMutex
workersNumLocker sync.RWMutex
workersWaitGr sync.WaitGroup
//slice с канали
maxWorkers int
cachedRequests Cache
}
//WIP
//thread safe methods for increasing the workerQueue number
func (w *Worker) workersNumIncr() {
w.workersNumLocker.Lock()
w.workingRequestsNum++
w.workersNumLocker.Unlock()
}
func (w *Worker) workersNumDecr() {
w.workersNumLocker.Lock()
w.workingRequestsNum--
w.workersNumLocker.Unlock()
}
func (w *Worker) workersNum() int {
w.workersNumLocker.RLock()
defer w.workersNumLocker.RUnlock()
return w.workingRequestsNum
}
//this method handles all the requests of the Requester
func (w *Worker) handleRequests() {
for {
w.stopLocker.RLock()
if w.isStopped {
w.stopLocker.RUnlock()
return
}
w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
r := w.unresolvedRequests.Pop()
w.unresolvedRequestsLocker.Unlock()
if r != nil {
for w.workersNum() >= w.maxWorkers {
time.Sleep(10*time.Millisecond)
}
w.cacheLock.RLock()
c := w.cachedRequests.GetResult(r.ID())
w.cacheLock.RUnlock()
if c != nil {
r.SetResult(c.resData, c.err)
} else {
w.workersNumIncr()
go func() {
w.workersWaitGr.Add(1)
res, e := r.Run()
if r.Cacheable() {
cacheEntry := Result{id: r.ID(), resData: res, err: e}
w.cacheLock.Lock()
w.cachedRequests.Push(cacheEntry)
w.cacheLock.Unlock()
}
w.workersNumDecr()
w.workersWaitGr.Done()
}()
}
}
}
}
func (w *Worker) AddRequest(request Request) {
w.stopLocker.RLock()
if w.isStopped {
w.stopLocker.RUnlock()
return
}
w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
w.unresolvedRequests.Push(request)
w.unresolvedRequestsLocker.Unlock()
}
func (w *Worker) Stop() {
w.stopLocker.Lock()
w.isStopped = true
w.stopLocker.Unlock()
w.workersWaitGr.Wait()
}
func NewRequester(cacheSize int, throttleSize int) Requester {
var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
cachedRequests: c, maxWorkers: throttleSize}
go newRequester.handleRequests()
return newRequester
}

Лог от изпълнението

▸ Покажи лога

История (2 версии и 1 коментар)

Андон обнови решението на 31.12.2015 02:54 (преди над 2 години)

▸ Покажи разликите
+package main
+
+import "sync"
+
+
+type Request interface {
+ ID() string
+ Run() (result interface{}, err error)
+
+ Cacheable() bool
+
+ SetResult(result interface{}, err error)
+}
+
+type Requester interface {
+ AddRequest(request Request)
+ Stop()
+}
+
+type Result struct {
+ id string
+ resData interface{}
+ err error
+}
+
+func (r *Result) ID() string {
+ return r.id
+}
+
+type Cache struct {
+ cachedData []Result
+
+ startIndex uint32
+ endIndex uint32
+
+ elementsSize uint32
+ maxSize uint32
+}
+
+func (q *Cache) Push(res Result) {
+ q.endIndex++
+ if q.endIndex >= q.maxSize {
+ q.endIndex = 0
+ }
+
+ if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
+ q.startIndex++
+ if q.startIndex >= q.maxSize {
+ q.startIndex = 0
+ }
+ } else {
+ q.elementsSize++
+ }
+
+ q.cachedData[q.endIndex] = res
+}
+
+func (q *Cache) GetResult(id string) *Result {
+ for _, res := range q.cachedData {
+ if id == res.ID() {
+ return &res
+ }
+ }
+
+ return nil
+}
+
+type RequestQueue []Request
+
+func (q *RequestQueue) Push(r Request) {
+ *q = append(*q, r)
+}
+
+func (q *RequestQueue) Pop() (r Request) {
+ if len(*q) == 0 {
+ return nil
+ }
+
+ r = (*q)[0]
+
+ *q = (*q)[1:]
+
+ return
+}
+
+type Worker struct {
+ isStopped bool
+
+ unresolvedRequests RequestQueue
+
+ workingRequestsNum int
+ cacheLock sync.Mutex
+ unresolvedRequestsLocker sync.Mutex
+
+ workersWaitGr sync.WaitGroup
+ //slice с канали
+ maxWorkers int
+
+ cachedRequests Cache
+}
+
+//WIP
+
+//this method handles all the requests of the Requester
+func (w *Worker) handleRequests() {
+ for !w.isStopped {
+ if len(w.unresolvedRequests) == 0 {
+ continue
+ }
+
+ for w.workingRequestsNum >= w.maxWorkers {
+ }
+
+ w.unresolvedRequestsLocker.Lock()
+ r := w.unresolvedRequests.Pop()
+ w.unresolvedRequestsLocker.Unlock()
+ if r != nil {
+ if c := w.cachedRequests.GetResult(r.ID()); c != nil {
+
+ r.SetResult(c.resData, c.err)
+ } else {
+
+ w.workingRequestsNum++
+
+ go func() {
+ w.workersWaitGr.Add(1)
+ res, e := r.Run()
+
+ if r.Cacheable() {
+ cacheEntry := Result{id: r.ID(), resData: res, err: e}
+ w.cacheLock.Lock()
+ w.cachedRequests.Push(cacheEntry)
+ w.cacheLock.Unlock()
+ }
+ w.workingRequestsNum--
+ w.workersWaitGr.Done()
+ }()
+
+ }
+ }
+ }
+}
+
+func (w *Worker) AddRequest(request Request) {
+ if w.isStopped {
+ return
+ }
+
+ w.unresolvedRequestsLocker.Lock()
+ w.unresolvedRequests.Push(request)
+ w.unresolvedRequestsLocker.Unlock()
+}
+
+func (w *Worker) Stop() {
+ w.isStopped = true
+
+ w.workersWaitGr.Wait()
+}
+
+func NewRequester(cacheSize int, throttleSize int) Requester {
+ var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
+
+ var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
+ cachedRequests: c, maxWorkers: throttleSize}
+
+ go newRequester.handleRequests()
+ return newRequester
+}

Андон обнови решението на 31.12.2015 17:08 (преди над 2 години)

▸ Покажи разликите
package main
import "sync"
+import "time"
-
type Request interface {
ID() string
Run() (result interface{}, err error)
Cacheable() bool
SetResult(result interface{}, err error)
}
type Requester interface {
AddRequest(request Request)
Stop()
}
type Result struct {
id string
resData interface{}
err error
}
func (r *Result) ID() string {
return r.id
}
type Cache struct {
cachedData []Result
startIndex uint32
endIndex uint32
elementsSize uint32
maxSize uint32
}
func (q *Cache) Push(res Result) {
q.endIndex++
if q.endIndex >= q.maxSize {
q.endIndex = 0
}
if q.elementsSize == q.maxSize && q.endIndex == q.startIndex {
q.startIndex++
if q.startIndex >= q.maxSize {
q.startIndex = 0
}
} else {
q.elementsSize++
}
q.cachedData[q.endIndex] = res
}
func (q *Cache) GetResult(id string) *Result {
for _, res := range q.cachedData {
if id == res.ID() {
return &res
}
}
return nil
}
type RequestQueue []Request
func (q *RequestQueue) Push(r Request) {
*q = append(*q, r)
}
func (q *RequestQueue) Pop() (r Request) {
if len(*q) == 0 {
return nil
}
r = (*q)[0]
*q = (*q)[1:]
return
}
type Worker struct {
isStopped bool
unresolvedRequests RequestQueue
workingRequestsNum int
- cacheLock sync.Mutex
+ cacheLock sync.RWMutex
unresolvedRequestsLocker sync.Mutex
- workersWaitGr sync.WaitGroup
+ stopLocker sync.RWMutex
+
+ workersNumLocker sync.RWMutex
+ workersWaitGr sync.WaitGroup
//slice с канали
maxWorkers int
cachedRequests Cache
}
//WIP
+//thread safe methods for increasing the workerQueue number
+func (w *Worker) workersNumIncr() {
+ w.workersNumLocker.Lock()
+ w.workingRequestsNum++
+ w.workersNumLocker.Unlock()
+}
+
+func (w *Worker) workersNumDecr() {
+ w.workersNumLocker.Lock()
+ w.workingRequestsNum--
+ w.workersNumLocker.Unlock()
+}
+
+func (w *Worker) workersNum() int {
+ w.workersNumLocker.RLock()
+ defer w.workersNumLocker.RUnlock()
+
+ return w.workingRequestsNum
+}
+
//this method handles all the requests of the Requester
func (w *Worker) handleRequests() {
- for !w.isStopped {
- if len(w.unresolvedRequests) == 0 {
- continue
- }
+ for {
+ w.stopLocker.RLock()
- for w.workingRequestsNum >= w.maxWorkers {
+ if w.isStopped {
+ w.stopLocker.RUnlock()
+ return
}
+ w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
r := w.unresolvedRequests.Pop()
w.unresolvedRequestsLocker.Unlock()
+
if r != nil {
- if c := w.cachedRequests.GetResult(r.ID()); c != nil {
+ for w.workersNum() >= w.maxWorkers {
+ time.Sleep(10*time.Millisecond)
+ }
+ w.cacheLock.RLock()
+ c := w.cachedRequests.GetResult(r.ID())
+ w.cacheLock.RUnlock()
+ if c != nil {
r.SetResult(c.resData, c.err)
} else {
-
- w.workingRequestsNum++
-
+ w.workersNumIncr()
go func() {
+
w.workersWaitGr.Add(1)
res, e := r.Run()
if r.Cacheable() {
cacheEntry := Result{id: r.ID(), resData: res, err: e}
w.cacheLock.Lock()
w.cachedRequests.Push(cacheEntry)
w.cacheLock.Unlock()
}
- w.workingRequestsNum--
+
+ w.workersNumDecr()
w.workersWaitGr.Done()
}()
}
}
}
}
func (w *Worker) AddRequest(request Request) {
+ w.stopLocker.RLock()
if w.isStopped {
+ w.stopLocker.RUnlock()
return
}
+ w.stopLocker.RUnlock()
w.unresolvedRequestsLocker.Lock()
w.unresolvedRequests.Push(request)
w.unresolvedRequestsLocker.Unlock()
}
func (w *Worker) Stop() {
+ w.stopLocker.Lock()
w.isStopped = true
+ w.stopLocker.Unlock()
w.workersWaitGr.Wait()
}
func NewRequester(cacheSize int, throttleSize int) Requester {
var c Cache = Cache{cachedData: make([]Result, cacheSize), maxSize: uint32(cacheSize)}
var newRequester *Worker = &Worker{isStopped: false, unresolvedRequests: make(RequestQueue, 10),
cachedRequests: c, maxWorkers: throttleSize}
go newRequester.handleRequests()
return newRequester
}