Решение на Concurrent Retry Executor от Юлия Недялкова

Обратно към всички решения

Към профила на Юлия Недялкова

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 8 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"container/list"
"fmt"
)
const MaxInt = int(^uint(0) >> 1)
func OrderedLogDrainer(channels chan (chan string)) chan string {
result := make(chan string)
go divideChannels(channels, result)
return result
}
func divideChannels(channels chan (chan string), result chan string) {
threadSafeQueue := make(chan struct{}, 1)
channelsInQueue := make(chan struct{}, MaxInt)
index := 0
queue := list.New()
go writeAll(queue, result, channelsInQueue, threadSafeQueue)
for channel := range channels {
channelResult := make(chan string, 100)
threadSafeQueue <- struct{}{}
queue.PushBack(channelResult)
<-threadSafeQueue
channelsInQueue <- struct{}{}
index = index + 1
go drainChannel(channel, channelResult, index)
}
close(channelsInQueue)
}
func drainChannel(channel, column chan string, index int) {
for log := range channel {
column <- fmt.Sprintf("%d\t%s", index, log)
}
close(column)
}
func writeAll(queue *list.List, result chan string, channelsInQueue, threadSafeQueue chan struct{}) {
for _ = range channelsInQueue {
threadSafeQueue <- struct{}{}
channel := (queue.Remove(queue.Front())).(chan string)
<-threadSafeQueue
for element := range channel {
result <- element
}
}
close(result)
}

Лог от изпълнението

PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.003s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.003s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.003s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.003s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.003s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.703s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.004s
PASS
ok  	_/tmp/d20151110-19113-o4q8os	0.007s

История (3 версии и 1 коментар)

Юлия обнови решението на 09.11.2015 22:09 (преди над 2 години)

+package main
+
+import (
+ "container/list"
+ "fmt"
+)
+
+func OrderedLogDrainer(channels chan (chan string)) chan string {
+ result := make(chan string, 1000)
+ go divideChannels(channels, result)
+ return result
+}
+
+func divideChannels(channels chan (chan string), result chan string) {
+ threadSafeQueue := make(chan struct{}, 1)
+ channelsInQueue := make(chan struct{}, 100)
+ index := 0
+ queue := list.New()
+
+ go writeAll(queue, result, channelsInQueue, threadSafeQueue)
+ for channel := range channels {
+ channelResult := make(chan string, 100)
+ threadSafeQueue <- struct{}{}
+ queue.PushBack(channelResult)
+ <-threadSafeQueue
+ channelsInQueue <- struct{}{}
+ index = index + 1
+ go drainChannel(channel, channelResult, index)
+
+ }
+ close(channelsInQueue)
+}
+
+func drainChannel(channel, column chan string, index int) {
+ for log := range channel {
+ column <- fmt.Sprintf("%d\t%s", index, log)
+ }
+ close(column)
+}
+
+func writeAll(queue *list.List, result chan string, channelsInQueue, threadSafeQueue chan struct{}) {
+ for _ = range channelsInQueue {
+ threadSafeQueue <- struct{}{}
+ channel := (queue.Remove(queue.Front())).(chan string)
+ <-threadSafeQueue
+ for element := range channel {
+ result <- element
+ }
+ }
+ close(result)
+}

Супер е, много ми харесва как ползваш threadSafeQueue като mutex! Единствено искам да ти насоча вниманието към последното правило от условието. Казали сме, че във всеки лог ще има по максимум 100 съобщения, но няма ограничение за броя логове.

Юлия обнови решението на 10.11.2015 10:36 (преди над 2 години)

package main
import (
"container/list"
"fmt"
)
func OrderedLogDrainer(channels chan (chan string)) chan string {
- result := make(chan string, 1000)
+ result := make(chan string)
go divideChannels(channels, result)
return result
}
func divideChannels(channels chan (chan string), result chan string) {
threadSafeQueue := make(chan struct{}, 1)
channelsInQueue := make(chan struct{}, 100)
index := 0
queue := list.New()
go writeAll(queue, result, channelsInQueue, threadSafeQueue)
for channel := range channels {
channelResult := make(chan string, 100)
threadSafeQueue <- struct{}{}
queue.PushBack(channelResult)
<-threadSafeQueue
channelsInQueue <- struct{}{}
index = index + 1
go drainChannel(channel, channelResult, index)
}
close(channelsInQueue)
}
func drainChannel(channel, column chan string, index int) {
for log := range channel {
column <- fmt.Sprintf("%d\t%s", index, log)
}
close(column)
}
func writeAll(queue *list.List, result chan string, channelsInQueue, threadSafeQueue chan struct{}) {
for _ = range channelsInQueue {
threadSafeQueue <- struct{}{}
channel := (queue.Remove(queue.Front())).(chan string)
<-threadSafeQueue
for element := range channel {
result <- element
}
}
close(result)
}

Юлия обнови решението на 10.11.2015 12:55 (преди над 2 години)

package main
import (
"container/list"
"fmt"
)
+const MaxInt = int(^uint(0) >> 1)
+
func OrderedLogDrainer(channels chan (chan string)) chan string {
result := make(chan string)
go divideChannels(channels, result)
return result
}
func divideChannels(channels chan (chan string), result chan string) {
threadSafeQueue := make(chan struct{}, 1)
- channelsInQueue := make(chan struct{}, 100)
+ channelsInQueue := make(chan struct{}, MaxInt)
index := 0
queue := list.New()
go writeAll(queue, result, channelsInQueue, threadSafeQueue)
for channel := range channels {
channelResult := make(chan string, 100)
threadSafeQueue <- struct{}{}
queue.PushBack(channelResult)
<-threadSafeQueue
channelsInQueue <- struct{}{}
index = index + 1
go drainChannel(channel, channelResult, index)
}
close(channelsInQueue)
}
func drainChannel(channel, column chan string, index int) {
for log := range channel {
column <- fmt.Sprintf("%d\t%s", index, log)
}
close(column)
}
func writeAll(queue *list.List, result chan string, channelsInQueue, threadSafeQueue chan struct{}) {
for _ = range channelsInQueue {
threadSafeQueue <- struct{}{}
channel := (queue.Remove(queue.Front())).(chan string)
<-threadSafeQueue
for element := range channel {
result <- element
}
}
close(result)
}