Решение на Concurrent Retry Executor от Марио Даскалов

Обратно към всички решения

Към профила на Марио Даскалов

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 8 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import "fmt"
func infiniteBufferedChannelStream(from, to chan (chan string)) {
pending := make([]chan string, 0)
loop:
for {
if len(pending) == 0 {
log, ok := <-from
if !ok {
break
}
pending = append(pending, log)
}
select {
case to <- pending[0]:
pending = pending[1:]
case log, ok := <-from:
if !ok {
break loop
}
pending = append(pending, log)
}
}
for _, stringCh := range pending {
to <- stringCh
}
close(to)
}
func OrderedLogDrainer(logs chan (chan string)) chan string {
ch := make(chan string)
channels := make(chan (chan string))
bufferedLogs := make(chan (chan string))
go func() {
for log := range logs {
log_buffered_channel := make(chan string, 100)
go func(log, log_buffered_channel chan string) {
for message := range log {
log_buffered_channel <- message
}
close(log_buffered_channel)
}(log, log_buffered_channel)
bufferedLogs <- log_buffered_channel
}
close(bufferedLogs)
}()
go infiniteBufferedChannelStream(bufferedLogs, channels)
go func() {
logNumber := 1
for channel := range channels {
for message := range channel {
ch <- fmt.Sprintf("%d\t%s", logNumber, message)
}
logNumber++
}
close(ch)
}()
return ch
}

Лог от изпълнението

PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.003s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.003s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.003s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.003s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.003s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.703s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.004s
PASS
ok  	_/tmp/d20151110-19113-1b8yj9q	0.006s

История (1 версия и 2 коментара)

Марио обнови решението на 09.11.2015 23:37 (преди над 2 години)

+package main
+
+import "fmt"
+
+func infiniteBufferedChannelStream(from, to chan (chan string)) {
+ pending := make([]chan string, 0)
+loop:
+ for {
+ if len(pending) == 0 {
+ log, ok := <-from
+ if !ok {
+ break
+ }
+ pending = append(pending, log)
+ }
+
+ select {
+ case to <- pending[0]:
+ pending = pending[1:]
+ case log, ok := <-from:
+ if !ok {
+ break loop
+ }
+ pending = append(pending, log)
+
+ }
+ }
+
+ for _, stringCh := range pending {
+ to <- stringCh
+ }
+
+ close(to)
+}
+
+func OrderedLogDrainer(logs chan (chan string)) chan string {
+ ch := make(chan string)
+ channels := make(chan (chan string))
+ bufferedLogs := make(chan (chan string))
+
+ go func() {
+ for log := range logs {
+ log_buffered_channel := make(chan string, 100)
+
+ go func(log, log_buffered_channel chan string) {
+ for message := range log {
+ log_buffered_channel <- message
+ }
+ close(log_buffered_channel)
+ }(log, log_buffered_channel)
+
+ bufferedLogs <- log_buffered_channel
+ }
+ close(bufferedLogs)
+ }()
+
+ go infiniteBufferedChannelStream(bufferedLogs, channels)
+
+ go func() {
+ logNumber := 1
+ for channel := range channels {
+ for message := range channel {
+ ch <- fmt.Sprintf("%d\t%s", logNumber, message)
+ }
+ logNumber++
+ }
+
+ close(ch)
+ }()
+
+ return ch
+}