Goroutine & Channel - 1

Goroutine

  • A goroutine is implemented as a function or method (this can also be an anonymous or lambda function) and called (invoked) with the keyword go. This starts the function running in parallel with the current computation but in the same address space and with its own stack.

  • Go’s concurrency primitives provide the basis for a good concurrency program design: expressing program structure so as to represent independently executing actions; so Go’s emphasis is not in the 1 st place on parallelism: concurrent programs may or may not be parallel. Parallelism is the ability to make things run quickly by using multiple processors. But it turns out most often that a well designed concurrent program also has excellent performing parallel capabilities.

Basic Goroutine

  • Sample 1: The code below will not print out anything, because the main exits before the “hello” goroutine kicks off. Actually the main is another goroutine which always runs first.

    func main() {
    	go func() {
    		println("Hello")
    	}()
    }
    
  • Sample 2: Add timer to set main goroutine sleep a while, and let the “hello” goroutine to start.

    func main() {
    	go func() {
    		println("Hello") // Hello
    }()
    time.Sleep(3000)
    }
    
  • Sample 3: The code below will print “hello” 10 times and then print “world” 10 times.

    func main() {
    	go func() {
    		for i := 0; i < 10; i++ {
            println("Hello")
            time.Sleep(2000)
    		}
    	}()
    	go func() {
    		for i := 0; i < 10; i++ {
    			println("World")
    		}
    	}()
    	time.Sleep(9999)
    }
    
  • Sample 4: The code below will print “hello” and “world” in different order.

    func main() {
    	runtime.GOMAXPROCS(4)
    	go func() {
    		for i := 0; i < 10; i++ {
    			println("Hello")
    			time.Sleep(500)
    		}
    
    	}()
    	go func() {
    		for i := 0; i < 10; i++ {
    			println("World")
    			time.Sleep(500)
    		}
    	}()
    	time.Sleep(999999)
    }
    
  • Sample 5: File watcher

    
    const watchedPath = "./source"
    
    func main() {
    	for {
    		d, _ := os.Open(watchedPath)
    		files, _ := d.Readdir(-1)
    		for _, fi := range files {
    			filePath := watchedPath + "/" + fi.Name()
    			f, _ := os.Open(filePath)
    			data, _ := ioutil.ReadAll(f)
    			f.Close()
    			os.Remove(filePath)
    			go func(data string) {
    				reader := csv.NewReader(strings.NewReader(data))
    				records, _ := reader.ReadAll()
    				for _, r := range records {
    					invoice := new(Invoice)
    					invoice.Number = r[0]
    					invoice.Amount, _ = strconv.ParseFloat(r[1], 64)
    					invoice.PurchaseOrderNumber, _ = strconv.Atoi(r[2])
    					unixTime, _ := strconv.ParseInt(r[3], 10, 64)
    					invoice.InvoiceDate = time.Unix(unixTime, 0)
    					
    					fmt.Printf("Received Invoice '%v' for $%.2f and submitted for processing\n", invoice.Number, invoice.Amount)
    				}
    			}(string(data))
    		}
    		d.Close()
    		time.Sleep(100 * time.Millisecond)
    	}
    }
    
    type Invoice struct {
    	Number string
    	Amount float64
    	PurchaseOrderNumber int
    	InvoiceDate time.Time
    }
    

Using GOMAXPROCS

  • When GOMAXPROCS is greater than 1, they run on a thread pool with that many threads.With the gccgo compiler GOMAXPROCS is effectively equal to the number of running goroutines.

  • Observations from experiments: on a 1 CPU laptop performance improved when GOMAXPROCS was increased to 9. On a 32 core machine, the best performance was reached with GOMAXPROCS=8, a higher number didn’t increase performance in that benchmark.

Channels

Channels for communication between goroutines

  • Go has a special type, the channel, which is a like a conduit (pipe) through which you can send typed values and which takes care of communication between goroutines, avoiding all the pitfalls of shared memory; the very act of communication through a channel guarantees synchronization.

  • Data are passed around on channels: only one goroutine has access to a data item at any given time: so data races cannot occur, by design. The ownership of the data (that is the ability to read and write it) is passed around.

  • A channel is in fact a typed message queue: data can be transmitted through it. It is a First In First Out (FIFO) structure and so they preserve the order of the items that are sent into them (for those who are familiar with it, a channel can be compared to a two-way pipe in Unix shells).

  • A channel is also a reference type, so we have to use the make() function to allocate memory for it.

NOTE: Don’t use print statements to indicate the order of sending to and receiving from a channel: this could be out of order with what actually happens due to the time lag between the print statement and the actual channel sending and receiving.

Blocking of channels

  • A send operation on a channel (and the goroutine or function that contains it) blocks until a receiver is available for the same channel

  • A receive operation for a channel blocks (and the goroutine or function that contains it) until a sender is available for the same channel

Goroutine sync through one or more channels
  • Blocking - deadlock

  • Sample 1 : deadlock because of lack of sender

    func main() {
    	ch := make(chan string)
    	fmt.Println(<-ch)
    }
    //fatal error: all goroutines are asleep - deadlock!
    
  • Sample 2: deadlock because of lack of receiver

    func main() {
    	ch := make(chan string, 1)
    	ch <- "Hello"
    	
    }
    
  • Sample 3

    func main() {
    	ch := make(chan string, 1)
    	ch <- "Hello"
    	fmt.Println(<-ch) // Hello
    }
    
Async channels - channel with buffer
  • Channel with buffer

    buf := 100
    ch1 := make(chan string, buf)
    
  • Sample 4: deadlock again because the 2nd sender can not send message

    func main() {
    	ch := make(chan string, 1)
    	ch <- "Hello"
    	ch <- "Hello"
    	fmt.Println(<-ch)
    	fmt.Println(<-ch)
    }
    
  • Sample 5

    func main() {
    	ch := make(chan string, 2)
    	ch <- "Hello"
    	ch <- "Hello"
    	fmt.Println(<-ch) //Hello
    	fmt.Println(<-ch) // Hello
    }
    
Closing channel
  • Sample 6: Closing channel will not impact the receiver to get the message

    func main() {
    	ch := make(chan string, 1)
    	ch <- "Hello"
    	ch <- "Hello"
    	fmt.Println(<-ch)
    	fmt.Println(<-ch)
    }
    
  • Sample 7: Sender cannot send message to closing channel

    func main() {
    	ch := make(chan string, 2)
    	ch <- "Hello"
    	ch <- "Hello"
    	fmt.Println(<-ch) //Hello
    fmt.Println(<-ch) // Hello
    ch <- "Hello" // panic: send on closed channel
    }
    

Semaphore pattern

  • The goroutine compute signals its completion by putting a value on the channel ch, the main routine waits on <-ch until this value gets through.

    func compute(ch chan int) {
    	ch <- someComputation()
    	// when it completes, signal on the channel.
    }
    
    func main() {
    	ch := make(chan int) // allocate a channel.
    	go compute(ch)       // start something in a goroutine
    	doSomethingElseForAWhile()
    	result := <-ch
    }
    
Implement a semaphore with a buffered channel
  • There is no semaphore implementation in Go’s sync package, but they can be emulated easily using a buffered channel:

    • the buffered channel is the number of resources we wish to synchronize
    • the length (number of elements currently stored) of the channel is the number of resources currently being used.
    • the capacity minus the length of the channel is the number of free resources (the integer value of traditional semaphores)
  • Sample 8: semaphore pattern

    type Empty interface {}
    var empty Empty
    //  do sth ...
    data := make([]float64, N)
    res := make([]float64, N)
    sem := make(chan Empty, N) // semaphore
    // do sth  ...
    for i, xi := range data {
    	go func (i int, xi float64) {
    		res[i] = doSomething(i,xi)
    		sem <- empty
    	} (i, xi)
    }
    // wait for goroutines to finish
    for i := 0; i < N; i++ { <-sem }
    
    
  • Semaphore operations sample pattern

    // acquire n resources
    func (s semaphore) P(n int) {
    	e := new(Empty)
    	for i := 0; i < n; i++ {
    		s <- e
    	}
    }
    // release n resources
    func (s semaphore) V(n int) {
    	for i := 0; i < n; i++ {
    		<-s
    	}
    }
    
  • Semaphore for a mutex:

    /* mutexes */
    func (s semaphore) Lock() {
    	s.P(1)
    }
    func (s semaphore) Unlock() {
    	s.V(1)
    }
    /* signal-wait */
    func (s semaphore) Wait(n int) {
    	s.P(n)
    }
    func (s semaphore) Signal() {
    	s.V(1)
    }
    

Channel Factory pattern

  • Another pattern common in this style of programming goes as follows: instead of passing a channel as a parameter to a goroutine, let the function make the channel and return it (so it plays the role of a factory); inside the function a lambda function is called as a goroutine.

    func main() {
    	stream := pump()
    	go suck(stream)
    	// the above 2 lines can be shortened to: go suck( pump() )
    	time.Sleep(1e9)
    }
    func pump() chan int {
    	ch := make(chan int)
    	go func() {
    		for i := 0; ; i++ {
    			ch <- i
    		}
    	}()
    	return ch
    }
    func suck(ch chan int) {
    	for {
    		fmt.Println(<-ch)
    	}
    }
    
For—range applied to channels
  • The range clause on for loops accepts a channel ch as an operand, in which case the for loops over the values received from the channel.

  • Obviously another goroutine must be writing to ch (otherwise the execution blocks in the for-loop) and must close ch when it is done writing.

    
    func main() {
    	suck(pump())
    	time.Sleep(1e9)
    }
    func pump() chan int {
    	ch := make(chan int)
    	go func() {
    		for i := 0; ; i++ {
    			ch <- i
    		}
    	}()
    	return ch
    }
    func suck(ch chan int) {
    	go func() {
    		for v := range ch {
    			fmt.Println(v)
    		}
    	}()
    }
    
Iterator pattern
  • Another common case where we have to populate a channel with the items of a container type which contains an index-addressable field items . For this we can define a method which returns a read-only channel.

  • Inside the goroutine, a for-loop iterates over the elements in the container c (for tree or graph algorithms, this simple for-loop could be replaced with a depth-first search)

    func (c *container) Iter() <-chan items {
        ch := make(chan item)
        go func() {
            for i := 0; i < c.Len(); i++ {
                // or use a for-range loop
                ch <- c.items[i]
            }
        }()
        return ch
    }
    
    // The code which calls this method can then iterate over the container
    for x := range container.Iter() { ... }
    
    

Producer Consumer pattern

  • A Produce() function which delivers the values needed by a Consume function. Both could be run as a separate goroutine, Produce putting the values on a channel which is read by Consume.

    package main
    /* producer-consumer problem in Go */
    
    import ("fmt")
    
    var done = make(chan bool)
    var msgs = make(chan int)
    
    func produce () {
        for i := 0; i < 10; i++ {
            msgs <- i
        }
        done <- true
    }
    
    func consume () {
        for {
            msg := <-msgs
            fmt.Println(msg)
        }
    }
    
    func main () {
        go produce()
        go consume()
        <- done
    }