Конкурентност 102

17.11.2015

Но преди това...

Въпрос за мъфин #1

Как създаваме нов тип?

type AwesomeInt int

Въпрос за мъфин #2

Как ще укажем, че типа Car имплементира интерфейса Vehicle?

Въпрос за мъфин #3

Как проверяваме дали някой Vehicle е Car?

Въпрос за мъфин #4

Как може да преизползваме вече имплементирани методи и атрибути на някой тип в нов?

type Student struct {
    Person
}

Конкурентност 102

Ще си говорим за

Да си спомним скуката

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
    boring := func(msg string) {
        for i := 0; ; i++ {
            fmt.Println(msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }

    go boring("boring!")
    fmt.Println("Listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You are way too boring. I am leaving.")
}

sync

Пакет, който ни дава синхронизационни примитиви от ниско ниво:

WaitGroup

Изчаква колекция от горутини да приключат и чак тогава продължава с изпълнението.
Така не правим простотии със time.Sleep, както одеве.

package sync

type WaitGroup struct {}

func (*WaitGroup) Add()
func (*WaitGroup) Done()
func (*WaitGroup) Wait()

Пример

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
    var wg sync.WaitGroup

    boring := func(msg string) {
        for i := 0; i < 8; i++ {
            fmt.Println(msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
        wg.Done()
    }

    wg.Add(1)
    go boring("boring!")
    fmt.Println("Waiting for the boring function to do its work")
    wg.Wait()
}

Стига вече с тая скука!

По - интересен пример

package main

import (
	"fmt"
	"net/http"
	"sync"
)

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        wg.Add(1) // Increment the WaitGroup counter.
        // Launch a goroutine to fetch the URL.
        go func(url string) {
            content, err := http.Get(url) // Fetch the URL.
            if err == nil {
                fmt.Println(url, content.Status)
            } else {
                fmt.Println(url, "has failed")
            }
            wg.Done() // Decrement the counter when the goroutine completes.
        }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()
}

Mutex

package sync

type Mutex struct {}

func (*Mutex) Lock()
func (*Mutex) Unlock()

В код

package main

import (
	"fmt"
	"sync"
)

func main() {
    var (
        count    int
        countMtx sync.Mutex
        countWg  sync.WaitGroup
    )

    worker := func() {
        countMtx.Lock()
        count += 1
        countMtx.Unlock()
        countWg.Done()
    }

    for i := 0; i < 8; i++ {
        countWg.Add(1)
        go worker()
    }

    countWg.Wait()
    fmt.Println("counter:", count)
}

На каналски

package main

import (
	"fmt"
	"sync"
)

func main() {
    var (
        count   int
        countWg sync.WaitGroup
    )
    ch := make(chan struct{}, 1)

    worker := func() {
        ch <- struct{}{}
        count += 1
        <-ch
        countWg.Done()
    }

    for i := 0; i < 8; i++ {
        countWg.Add(1)
        go worker()
    }

    countWg.Wait()
    fmt.Println("counter:", count)
}

RWMutex

package sync

type RWMutex struct {}

func (*RWMutex) Lock()
func (*RWMutex) Unlock()
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
func (*RWMutex) RLocker() sync.Locker

Once

Обект от този тип ще изпълни точно една функция.

package main

import (
	"fmt"
	"sync"
)

func main() {
    var once sync.Once
    var wg sync.WaitGroup

    onceBody := func() {
        fmt.Println("Only once")
    }
    anotherBody := func() {
        fmt.Println("Another")
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            once.Do(onceBody)
            once.Do(anotherBody)
            wg.Done()
        }()
    }
    wg.Wait()
}

Cond

type Cond struct {
    // L is held while observing or changing the condition
    L Locker
    // contains filtered or unexported fields
}

select

select {
case v1 := <-c1:
    fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
    fmt.Printf("received %v from c2\n", v2)
default:
    fmt.Printf("no one was ready to communicate\n")
}

Накратко: switch за канали.
Надълго: Изчаква първия канал, по който е изпратена стойност

Concurrency patterns

Timeout

select {
case result := <-google:
    fmt.Printf("received %v from google\n", result)
case result := <-bing:
    fmt.Printf("received %v from bing\n", result)
case <- time.After(1 * time.Second):
    fmt.Printf("timed out\n")
}

Игра на развален телефон

Игра на развален телефон

package main

import (
	"fmt"
)

func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 100000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost

    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }

    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

Generators

package main

func fib() <-chan int {
    c := make(chan int)
    go func() {
        for a, b := 0, 1; ; a, b = b, a+b {
            c <- a
        }
    }()
    return c
}

func main() {
    fibonacci := fib()
    for i := 0; i < 10; i++ {
        println(<-fibonacci)
    }
}

Fan In

func talk(msg string) <-chan string {
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s: %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
    doycho := talk("Doycho")
    ned := talk("Ned")
    for i := 0; i < 5; i++ {
        fmt.Println(<-doycho)
        fmt.Println(<-ned)
    }
}

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

Fan In

Fan In <- Concurrency

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            }
        }
    }()
    return c
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(r.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

// FANIN START OMIT
func fanIn(input1, input2 <-chan string) <-chan string { // HL
	c := make(chan string)
	go func() { // HL
		for {
			select { // HL
			case s := <-input1:
				c <- s // HL
			case s := <-input2:
				c <- s // HL
			} // HL
		}
	}()
	return c
}

// FANIN END OMIT

func main() {
    c := fanIn(talk("Ned"), talk("Doycho"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
}

Finish channel

func fanIn(input1, input2 <-chan string, finish chan struct{}) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            case <-finish:
                close(c)
                wg.Done()
                return
            }
        }
    }()
    return c
}

Finish channel

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var wg sync.WaitGroup

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(r.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

// FANIN START OMIT
func fanIn(input1, input2 <-chan string, finish chan struct{}) <-chan string { // HL
	c := make(chan string)
	go func() {
		for {
			select {
			case s := <-input1:
				c <- s
			case s := <-input2:
				c <- s
			case <-finish: // HL
				close(c) // HL
				wg.Done()
				return
			}
		}
	}()
	return c
}

// FANIN END OMIT

func main() {
    wg.Add(1)
    finish := make(chan struct{})
    c := fanIn(talk("Ned"), talk("Doycho"), finish)
    for value := range c {
        fmt.Println(value)
        if len(value) > 8 {
            close(finish)
        }
    }
    wg.Wait()
}

Words of wisdom

Въпроси?