Марио обнови решението на 09.11.2015 23:37 (преди над 2 години)
+package main
+
+import "fmt"
+
+func infiniteBufferedChannelStream(from, to chan (chan string)) {
+ pending := make([]chan string, 0)
+loop:
+ for {
+ if len(pending) == 0 {
+ log, ok := <-from
+ if !ok {
+ break
+ }
+ pending = append(pending, log)
+ }
+
+ select {
+ case to <- pending[0]:
+ pending = pending[1:]
+ case log, ok := <-from:
+ if !ok {
+ break loop
+ }
+ pending = append(pending, log)
+
+ }
+ }
+
+ for _, stringCh := range pending {
+ to <- stringCh
+ }
+
+ close(to)
+}
+
+func OrderedLogDrainer(logs chan (chan string)) chan string {
+ ch := make(chan string)
+ channels := make(chan (chan string))
+ bufferedLogs := make(chan (chan string))
+
+ go func() {
+ for log := range logs {
+ log_buffered_channel := make(chan string, 100)
+
+ go func(log, log_buffered_channel chan string) {
+ for message := range log {
+ log_buffered_channel <- message
+ }
+ close(log_buffered_channel)
+ }(log, log_buffered_channel)
+
+ bufferedLogs <- log_buffered_channel
+ }
+ close(bufferedLogs)
+ }()
+
+ go infiniteBufferedChannelStream(bufferedLogs, channels)
+
+ go func() {
+ logNumber := 1
+ for channel := range channels {
+ for message := range channel {
+ ch <- fmt.Sprintf("%d\t%s", logNumber, message)
+ }
+ logNumber++
+ }
+
+ close(ch)
+ }()
+
+ return ch
+}
Отне ми известно време да разбера какво правиш, но всичко изглежда ок
Уви, предполагам има по-елегантен начин, но за сега толкова