Решение на Concurrent Retry Executor от Даниел Тасков

Обратно към всички решения

Към профила на Даниел Тасков

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 8 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"fmt"
"math"
"sync"
)
type empty struct{}
type semaphore chan empty
var buffer_mutex sync.Mutex
func OrderedLogDrainer(logs chan chan string) chan string {
result := make(chan string)
orderLogs(logs, result)
return result
}
func orderLogs(logs <-chan chan string, result chan<- string) {
var buffer []chan string
rw := make(semaphore, math.MaxInt64)
go writeBuffer(logs, &buffer, rw)
go readBuffer(result, &buffer, rw)
}
func writeBuffer(chs <-chan chan string, buffer *[]chan string, write semaphore) {
i := 0
for ch := range chs {
expandBuffer(buffer)
go bufferChannel(ch, *buffer, i)
i++
write <- empty{}
}
close(write)
}
func expandBuffer(buffer *[]chan string) {
buffer_mutex.Lock()
*buffer = append(*buffer, make(chan string, 100))
buffer_mutex.Unlock()
}
func bufferChannel(ch <-chan string, buffer []chan string, i int) {
for msg := range ch {
buffer[i] <- fmt.Sprintf("%d\t%s", i+1, msg)
}
close(buffer[i])
}
func readBuffer(ch chan<- string, buffer *[]chan string, read semaphore) {
i := 0
for _ = range read {
for msg := range getBuffer(buffer, i) {
ch <- msg
}
i++
}
close(ch)
}
func getBuffer(buffer *[]chan string, i int) chan string {
buffer_mutex.Lock()
buff := (*buffer)[i]
buffer_mutex.Unlock()
return buff
}

Лог от изпълнението

PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.003s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.003s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.004s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.003s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.003s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.703s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.004s
PASS
ok  	_/tmp/d20151110-19113-9dciuo	0.007s

История (4 версии и 4 коментара)

Даниел обнови решението на 04.11.2015 14:20 (преди над 2 години)

+package main
+
+import "fmt"
+
+type empty struct{}
+type semaphore chan empty
+
+const maxuint = ^uint(0) >> 1
+
+func OrderedLogDrainer(logs chan chan string) chan string {
+ result := make(chan string)
+ go orderLogs(logs, result)
+ return result
+}
+
+func orderLogs(logs chan chan string, result chan string) {
+ var buffer []chan string
+
+ rw := make(semaphore, maxuint)
+
+ go writeBuffer(logs, &buffer, rw)
+ sem := make(semaphore)
+ go readBuffer(result, &buffer, rw, sem)
+ go closeChannel(result, sem)
+}
+
+func writeBuffer(chs chan chan string, buffer *[]chan string, rw semaphore) {
+ i := uint(0)
+ for ch := range chs {
+ *buffer = append(*buffer, make(chan string, 100))
+ go bufferChannel(ch, *buffer, i)
+ i++
+ rw <- empty{}
+ }
+ close(rw)
+}
+
+func bufferChannel(ch chan string, buffer []chan string, i uint) {
+ for msg := range ch {
+ buffer[i] <- fmt.Sprintf("%d\t%s", i+1, msg)
+ }
+ close(buffer[i])
+}
+
+func readBuffer(ch chan string, buffer *[]chan string, rw semaphore, sem semaphore) {
+ j := 0
+ for _ = range rw {
+ for msg := range (*buffer)[j] {
+ ch <- msg
+ }
+ j++
+ }
+ sem <- empty{}
+}
+
+func closeChannel(ch chan string, sem semaphore) {
+ <-sem
+ close(ch)
+}

Даниел обнови решението на 04.11.2015 17:18 (преди над 2 години)

package main
import "fmt"
type empty struct{}
type semaphore chan empty
const maxuint = ^uint(0) >> 1
func OrderedLogDrainer(logs chan chan string) chan string {
result := make(chan string)
- go orderLogs(logs, result)
+ orderLogs(logs, result)
return result
}
func orderLogs(logs chan chan string, result chan string) {
var buffer []chan string
rw := make(semaphore, maxuint)
go writeBuffer(logs, &buffer, rw)
- sem := make(semaphore)
- go readBuffer(result, &buffer, rw, sem)
- go closeChannel(result, sem)
+ go readBuffer(result, &buffer, rw)
}
-func writeBuffer(chs chan chan string, buffer *[]chan string, rw semaphore) {
- i := uint(0)
+func writeBuffer(chs chan chan string, buffer *[]chan string, write semaphore) {
+ i := 0
for ch := range chs {
*buffer = append(*buffer, make(chan string, 100))
go bufferChannel(ch, *buffer, i)
i++
- rw <- empty{}
+ write <- empty{}
}
- close(rw)
+ close(write)
}
-func bufferChannel(ch chan string, buffer []chan string, i uint) {
+func bufferChannel(ch chan string, buffer []chan string, i int) {
for msg := range ch {
buffer[i] <- fmt.Sprintf("%d\t%s", i+1, msg)
}
close(buffer[i])
}
-func readBuffer(ch chan string, buffer *[]chan string, rw semaphore, sem semaphore) {
+func readBuffer(ch chan string, buffer *[]chan string, read semaphore) {
j := 0
- for _ = range rw {
+ for _ = range read {
for msg := range (*buffer)[j] {
ch <- msg
}
j++
}
- sem <- empty{}
-}
-
-func closeChannel(ch chan string, sem semaphore) {
- <-sem
close(ch)
}

Извинявай за закъснялото ревю. Като цяло решението е ок, но Go race condition detection-а (go test -race ./...) се оплаквa, че на редове 28 и 46 намира race condition. Не съм сигурен дали това на практика може да е проблем, на първо четене не се сещам как.

Написах уж тестове с много на брой log-ове при които да се счупи заради този race condition, ноо не се чупи :/

Състезание, предполагам, има, защото единият goroutine чете от *buffer, пък вторият може да го премести на друго място чрез append-a?

Дам, предполагам е от това. Не мисля, че това би създало проблем на практика, но не съм 100% сигурен. Ако пуснеш твойте тестове на твойто решение с включен -race, показва същата грешка както и с наще.

Даниел обнови решението на 10.11.2015 14:09 (преди над 2 години)

package main
-import "fmt"
+import (
+ "fmt"
+ "math"
+ "sync"
+)
type empty struct{}
type semaphore chan empty
-const maxuint = ^uint(0) >> 1
+var buffer_mutex sync.Mutex
func OrderedLogDrainer(logs chan chan string) chan string {
result := make(chan string)
orderLogs(logs, result)
return result
}
-func orderLogs(logs chan chan string, result chan string) {
+func orderLogs(logs <-chan chan string, result chan<- string) {
var buffer []chan string
- rw := make(semaphore, maxuint)
+ rw := make(semaphore, math.MaxInt64)
go writeBuffer(logs, &buffer, rw)
go readBuffer(result, &buffer, rw)
}
-func writeBuffer(chs chan chan string, buffer *[]chan string, write semaphore) {
+func writeBuffer(chs <-chan chan string, buffer *[]chan string, write semaphore) {
i := 0
for ch := range chs {
- *buffer = append(*buffer, make(chan string, 100))
+ expandBuffer(buffer)
go bufferChannel(ch, *buffer, i)
i++
write <- empty{}
}
close(write)
}
-func bufferChannel(ch chan string, buffer []chan string, i int) {
+func expandBuffer(buffer *[]chan string) {
+ buffer_mutex.Lock()
+ *buffer = append(*buffer, make(chan string, 100))
+ buffer_mutex.Unlock()
+}
+
+func bufferChannel(ch <-chan string, buffer []chan string, i int) {
for msg := range ch {
buffer[i] <- fmt.Sprintf("%d\t%s", i+1, msg)
}
close(buffer[i])
}
-func readBuffer(ch chan string, buffer *[]chan string, read semaphore) {
- j := 0
+func readBuffer(ch chan<- string, buffer *[]chan string, read semaphore) {
+ i := 0
for _ = range read {
- for msg := range (*buffer)[j] {
+ for msg := range getBuffer(buffer, i) {
ch <- msg
}
- j++
+ i++
}
close(ch)
+}
+
+func getBuffer(buffer *[]chan string, i int) chan string {
+ buffer_mutex.Lock()
+ buff := (*buffer)[i]
+ buffer_mutex.Unlock()
+ return buff
}

Даниел обнови решението на 10.11.2015 14:14 (преди над 2 години)

package main
import (
"fmt"
"math"
"sync"
)
type empty struct{}
type semaphore chan empty
var buffer_mutex sync.Mutex
func OrderedLogDrainer(logs chan chan string) chan string {
result := make(chan string)
orderLogs(logs, result)
return result
}
func orderLogs(logs <-chan chan string, result chan<- string) {
var buffer []chan string
-
rw := make(semaphore, math.MaxInt64)
-
go writeBuffer(logs, &buffer, rw)
go readBuffer(result, &buffer, rw)
}
func writeBuffer(chs <-chan chan string, buffer *[]chan string, write semaphore) {
i := 0
for ch := range chs {
expandBuffer(buffer)
go bufferChannel(ch, *buffer, i)
i++
write <- empty{}
}
close(write)
}
func expandBuffer(buffer *[]chan string) {
buffer_mutex.Lock()
*buffer = append(*buffer, make(chan string, 100))
buffer_mutex.Unlock()
}
func bufferChannel(ch <-chan string, buffer []chan string, i int) {
for msg := range ch {
buffer[i] <- fmt.Sprintf("%d\t%s", i+1, msg)
}
close(buffer[i])
}
func readBuffer(ch chan<- string, buffer *[]chan string, read semaphore) {
i := 0
for _ = range read {
for msg := range getBuffer(buffer, i) {
ch <- msg
}
i++
}
close(ch)
}
func getBuffer(buffer *[]chan string, i int) chan string {
buffer_mutex.Lock()
buff := (*buffer)[i]
buffer_mutex.Unlock()
return buff
}