Concurrent Tasks
- Краен срок:
- 06.12.2015 18:00
- Точки:
- 10
Срокът за предаване на решения е отминал
Имате следния интерфейс:
type Task interface {
Execute(int) (int, error)
}
(Трябва да го включите в предаденото от вас домашно. И да, знаем, че не е много идиоматичен)
Имплементирайте следните функции (конструктори), които да приемат и връщат задачи от горния тип:
-
Функция
Pipeline(tasks ...Task) Task
със следните свойства:- Приема произволен брой
Task
обекти и връща написан от вас тип, който също имплементираTask
интерфейса. - При извикването на метода
Execute()
на върнатия отPipeline()
обект, методитеExecute()
на всички задачи от подаденитеtasks
трябва да се изпълнят последователно. - За аргумент на първата задача от
tasks
се използва аргумента наExecute()
, а за аргумент на всяка следваща се използва резултата от предишната. - Като краен резултат на
Execute()
метода наpipeline
обекта се връща резултата отExecute()
на последната задача отtasks
. - Ако няма подадени задачи на
Pipeline()
,Execute()
трябва да върне грешка. Ако някоя от задачите върне грешка, целиятExecute()
на pipeline-а приключва (не се изпълняват повече задачи) и връща грешка. -
Ето как изглежда това в код. Нека си направим следния прост тип
adder
, който не хапе, а събира цели числа до 127 и имплементира интерфейсаTask
:type adder struct { augend int } func (a adder) Execute(addend int) (int, error) { result := a.augend + addend if result > 127 { return 0, fmt.Errorf("Result %d exceeds the adder threshold", a) } return result, nil }
Ето и как очакваме да се държи върнатия от
Pipeline()
тип:if res, err := Pipeline(adder{50}, adder{60}).Execute(10); err != nil { fmt.Printf("The pipeline returned an error\n") } else { fmt.Printf("The pipeline returned %d\n", res) }
Това би трябвало да изведе "The pipeline returned 120". Но ако имахме
Pipeline(adder{20}, adder{10}, adder{-50}).Execute(100)
, би трябвало да получим на екрана "The pipeline returned an error".
- Приема произволен брой
-
Функция
Fastest(tasks ...Task) Task
със следните свойства:- Отново приема произволен брой
Task
обекти и връща написан от вас тип, който също имплементираTask
интерфейса. - При извикването на метода
Execute()
на върнатия отFastest()
обект, методитеExecute()
на всички задачи от подаденитеtasks
трябва да се изпълнят конкурентно и да се върне резултата (или грешката) на тази задача, която завърши първа. - Като аргумент на всички задачи от
tasks
се подава едно и също число - аргументът, с който е извиканExecute()
на върнатия отFastest()
обект. - Ако няма подадени задачи на
Fastest()
,Execute()
трябва да върне грешка. - Постарайте се да не оставяте "висящи" горутини, ще смъкваме точки.
-
Ето прост пример, преизползвайки
adder
от горния пример:type lazyAdder struct { adder delay time.Duration } func (la lazyAdder) Execute(addend int) (int, error) { time.Sleep(la.delay * time.Millisecond) return la.adder.Execute(addend) }
би трябвало да получим 42 от следния код:
f := Fastest( lazyAdder{adder{20}, 500}, lazyAdder{adder{50}, 300}, adder{41}, ) f.Execute(1)
- Отново приема произволен брой
-
Функция
Timed(task Task, timeout time.Duration) Task
със следните свойства:- Приема една задача от тип
Task
иtimeout
време и връща написан от вас тип, който също имплементира Task интерфейса. - При извикването на метода
Execute()
на връщания обект, изпълняваtask.Execute()
със същата стойност и връща получения резултат или грешка ако задачата приключи в зададеното отtimeout
време. Ако не успее да приключи за това време, връща грешка. - Постарайте се да не оставяте "висящи" горутини, ще смъкваме точки.
-
Ето пример, преизползвайки
lazyAdder
:_, e1 := Timed(lazyAdder{adder{20}, 50}, 2*time.Millisecond).Execute(2) r2, e2 := Timed(lazyAdder{adder{20}, 50}, 300*time.Millisecond).Execute(2)
Очакваме първия ред да върне грешка (т.е.
e1 != nil
), a вторият ред да е ок иr2
да съдържа резултата 22.
- Приема една задача от тип
-
Функция
ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task
със следните свойства:- Приема
reduce
функция и произволен бройTask
обекти и връща написан от вас тип, който също имплементираTask
интерфейса. - При извикването на метода
Execute()
на върнатия отConcurrentMapReduce()
обект, методитеExecute()
на всички задачи от подаденитеtasks
трябва да се изпълнят конкурентно. Като аргумент на всички задачи се подава аргументът, с който е извиканExecute()
. - Ако няма подадени задачи на
ConcurrentMapReduce()
,Execute()
трябва да върне грешка. Ако някоя от функциите fail-не,Execute()
трябва веднага да върне грешка. Ако всички задачи приключат успешно, трябва да се извикаreduce
с техните резултати (в произволен ред) и резултатът отreduce
да бъде върнат като резултат на функцията. - Постарайте се да не оставяте "висящи" горутини, ще смъкваме точки.
-
Следният код:
reduce := func(results []int) int { smallest := 128 for _, v := range results { if v < smallest { smallest = v } } return smallest } mr := ConcurrentMapReduce(reduce, adder{30}, adder{50}, adder{20}) if res, err := mr.Execute(5); err != nil { fmt.Printf("We got an error!\n") } else { fmt.Printf("The ConcurrentMapReduce returned %d\n", res) }
би трябвало да изведе
The ConcurrentMapReduce returned 25
.
- Приема
-
Функция
GreatestSearcher(errorLimit int, tasks <-chan Task) Task
със следните свойства:- Приема максимален допустим брой на грешките
errorLimit
и небуфериран канал за четенеtasks
, по който асинхронно могат да ѝ се подават задачи за изпълнение. Отново връща написан от вас тип, който също имплементираTask
интерфейса. - При извикването на
Execute()
от върнатияTask
трябва всичките задачи от каналаtasks
да започнат greedily да се изпълняват конкурентно. Искаме да няма блокиране, щом ние пуснем задача по този канал, вашия тип трябва да я прочете от канала и да извика нейнияExecute()
метод -
Execute()
метода на задачата трябва да приключи след като ние затворимtasks
канала и всички вече подадени задачи от него са приключили. - Като резултат
Execute()
метода на вашия тип, след приключването на всики задачи, трябва да върне най-голямото число, което някоя задача е върнала. Но ако повече отerrorLimit
задачи са върнали грешка или поtasks
не бъдат подадени никакви задачи,Execute()
трябва да върне грешка. -
Пример:
tasks := make(chan Task) gs := GreatestSearcher(2, tasks) // Приемаме 2 грешки go func() { tasks <- adder{4} tasks <- lazyAdder{adder{22}, 20} tasks <- adder{125} // Това е първата "допустима" грешка (защото 125+10 > 127) time.Sleep(50 * time.Millisecond) tasks <- adder{32} // Това би трябвало да "спечели" // Това би трябвало да timeout-не и да е втората "допустима" грешка tasks <- Timed(lazyAdder{adder{100}, 2000}, 20*time.Millisecond) // Ако разкоментираме това, gs.Execute() трябва да върне грешка // tasks <- adder{127} // трета (и недопустима) грешка close(tasks) }() result, err := gs.Execute(10)
Очакваме да получим
42
катоresult
. Но ако разкоментираме реда сtasks <- adder{127}
, тогаваgs.Execute()
трябва да върне грешка (т.е.err != nil
).
- Приема максимален допустим брой на грешките