Станислав обнови решението на 07.11.2015 22:49 (преди над 2 години)
+package main
+
+import "strconv"
+
+// Transform transforms the data coming over an input chanel
+// applying a tranformation function and redirecting it to an
+// output chanel.
+func Transform(
+ input <-chan interface{},
+ output chan<- interface{},
+ transformer func(result chan<- interface{}, dataIndex int, data interface{})) {
+
+ out := output
+ in := make(chan interface{})
+
+ index := 0
+ for data := range input {
+ go func(in <-chan interface{}, out chan<- interface{}, index int) {
+ transformer(out, index, data)
+ for data := range in {
+ out <- data
+ }
+ close(out)
+ }(in, out, index)
+ out = in
+ in = make(chan interface{})
+ index++
+ }
+ close(out)
+}
+
+// ChanChanStringToChanInterface converts a chan (chan string) to chan interface{}.
+func ChanChanStringToChanInterface(input chan (chan string)) chan interface{} {
+ output := make(chan interface{})
+ go func() {
+ for data := range input {
+ output <- data
+ }
+ close(output)
+ }()
+ return output
+}
+
+// ChanStringToChanInterface converts a chan string to chan interface{}.
+func ChanStringToChanInterface(input chan string) chan interface{} {
+ output := make(chan interface{})
+ go func() {
+ for data := range input {
+ output <- data
+ }
+ close(output)
+ }()
+ return output
+}
+
+// ChanInterfaceToChanString converts a chan interface{} to chan string.
+func ChanInterfaceToChanString(input chan interface{}) chan string {
+ output := make(chan string)
+ go func() {
+ for data := range input {
+ output <- data.(string)
+ }
+ close(output)
+ }()
+ return output
+}
+
+// OrderedLogDrainer drains all log channels coming over a channel
+// and redirects the messages to an output channel prefixing them
+// with the log index of the message.
+func OrderedLogDrainer(logs chan (chan string)) chan string {
+ // Redirect all logs coming from logs to logsBuffer.
+ logsBuffer := make(chan interface{})
+ go Transform(
+ ChanChanStringToChanInterface(logs),
+ logsBuffer,
+ func(logs chan<- interface{}, logIndex int, log interface{}) {
+ logBuffer := make(chan interface{})
+
+ // Redirect all messages coming from log to logBuffer.
+ go Transform(
+ ChanStringToChanInterface(log.(chan string)),
+ logBuffer,
+ func(log chan<- interface{}, messageIndex int, message interface{}) {
+ log <- message
+ })
+
+ logs <- logBuffer
+ })
+
+ // Redirect all log messages from logsBuffer to orderedLogs.
+ orderedLogs := make(chan interface{})
+ go Transform(
+ logsBuffer,
+ orderedLogs,
+ func(orderedLogs chan<- interface{}, logIndex int, log interface{}) {
+ for message := range log.(chan interface{}) {
+ orderedLogs <- strconv.Itoa(logIndex+1) + "\t" + message.(string)
+ }
+ })
+
+ return ChanInterfaceToChanString(orderedLogs)
+}