Golang超全面讲解并发

 

1. goroutine

1.1 定义

func main() {
	for i := 0; i < 10; i++ {
		//开启并发打印
		go func(i int) {
			fmt.Printf("hello goroutine : %d \n", i)
		}(i)
	}
	time.Sleep(time.Millisecond)
}

go语言是采用一种叫 协程(Coroutine)

轻量级 “线程”

非抢占式 多任务处理,由协程主动交出CPU控制权

  • 线程是由CPU来决定是否移交控制权,做到一半可能线程就会进行切换
  • 协程则是由内部进行决定是否要移交CPU的控制权

编译器/解释器/虚拟机层面的多任务

多个协程可以在一个或者多个线程上运行 (由调度器来决定)

以下例子,通过 runtime.Gosched() 可以手动交出控制权,如果不交出控制权;还有如果在 gorotine 里面使用外部函数,如果不传入的话,就是一个闭包的变量,会导致数据冲突,就是通过不同的协程写入数据。检查数据是否有冲突可以通过以下语句进行检测

go run -race

func main() {
	var a [10]int
	for i := 0; i < 10; i++ {
		//如果这里不将i传入参数直接引用外部的参数会出现:数据冲突(race condition)
		go func(i int) {
			// 打印语句会进行协程的调度,会交出控制权:fmt.Printf("hello goroutine : %d \n", i)
			a[i]++
			//通过 Gosched() 可以手动交出协程的控制权;如果不写这个语句协程就不会交出控制权进行调度执行,就会一直卡死在这里
			runtime.Gosched()
		}(i)
	}
	time.Sleep(time.Millisecond)
	fmt.Println(a)
}

普通函数:在一个线程里面执行,调用完后释放资源,单向调用

协程:双向流通

但go的程序启动时,一个线程里面可能有多个 goroutine 执行,具体在哪个线程执行,由调度器决定;传统意义上的 routine 需要显示的写出释放控制权,而 goroutine 不需要写出来,调度器会进行切换

1.2 goroutine切换点

只是参考,不能保证肯定会切换

  • I/O, select:打印数据的时候
  • channel
  • 等待锁
  • 函数调用(有时)
  • runtime.Gosched()

 

2. channel

协程与协程之间的双向通信

2.1 语法

func chanDemo() {
	//var c chan int //为 nil 的chan不能使用
	c := make(chan int)
  go func() {
		for  {
			n := <- c
			fmt.Println(n)
		}
	}()
  c <- 1 //向 c 里面发送数据(在发送时需要 goroutine 来进行接收,否则就会死锁)
  c <- 2
}

2.2 channel作为参数

func worker(i int, c chan int) {
	for  {
		fmt.Printf("Worker :%d, accpet: %c \n", i, <-c)
	}
}
func chanDemo() {
	var channels [10]chan int
	for i := 0; i < 10; i++ {
		channels[i] = make(chan int)
		go worker(i, channels[i])
	}
	for i := 0; i < 10; i++ {
		channels[i] <- 'a' + i
	}
	for i := 0; i < 10; i++ {
		channels[i] <- 'A' + i
	}
}

2.3 channel作为返回值

返回值定义:chan<- int :代表只能发数据; <-chan int:代表只能收数据

// chan<- int :代表只能发数据; <-chan int:代表只能收数据
func worker(i int) chan int {
	c := make(chan int)
	go func() {
		for  {
			fmt.Printf("Worker :%d, accpet: %c \n", i, <-c)
		}
	}()
	return c
}
func chanDemo() {
	var channels [10]chan int
	for i := 0; i < 10; i++ {
		channels[i] = worker(i) 
	}
	for i := 0; i < 10; i++ {
		channels[i] <- 'a' + i
	}
	for i := 0; i < 10; i++ {
		channels[i] <- 'A' + i
	}
}

创建时可以指定 channel 的大小,make(chan int, 3) 如果数据超过3个就会死锁,对提升性能有好处。

2.4 chan关闭

如果不 close 那么会一直接收下去,可以是否 if、range 进行判断是否 close

不要通过共享内存来进行通信,要通过通信来共享内存

func chanClose() {
	//创建一个缓冲区
	c := make(chan int, 3)
	c <- 1
	c <- 2
	c <- 3
	//方法一:
	go func() {
		for  {
			//如果没有接收到数据就直接返回
			if i, ok := <-c; !ok {
				fmt.Println(" close channel.....")
				break
			} else {
				//死循环读取,如果外部 chan 已经关闭了,这里会一直接收具体数据的默认值
				fmt.Printf("%d\n", i)
			}
		}
	}()
  //方法二:
  go func() {
		for n := range c {
			//如果没有接收到数据就直接返回
			fmt.Printf("%d\n", n)
		}
	}()
	//close关闭 channel
	close(c)
}

2.5 等待goroutine

如何等待所有的 goroutine 执行完之后才退出程序?

方式一:

func ChanDemo() {
	workers := make([]worker, 10)
	for i, _ := range workers {
		workers[i] = createWorker(i)
	}
	for i, worker := range workers{
		worker.in <- 'a' + i
	}
	for i, worker := range workers{
		worker.in <- 'A' + i
	}
	//这种方式可以接收消费者传入的数据,如果消费者传入的数据是同步的话,这里也会出现死锁
	for _, worker := range workers {
		<- worker.done
		<- worker.done
	}
}
//定义一个worker的结构用来存放 chan数据
type worker struct {
	done chan bool
	in chan int
}
func createWorker(id int) worker  {
	w := worker{
		done: make(chan bool),
		in: make(chan int),
	}
	go doWorker(id, w.in, w.done)
	return w
}
func doWorker(id int, in chan int, done chan bool)  {
	for i := range in {
		fmt.Printf("Worker:%d, accept:%c \n", id, i)
		go func() {
			//异步发送,否则会出现死锁,因为发送会加锁
			done <- true
		}()
	}
}

方式二:

使用 sync.WaitGroup 等待所有的 gorounte 执行结束

//定义一个worker的结构用来存放 chan数据
type done2Worker struct {
	in chan int
	//使用指针传递
	wg *sync.WaitGroup
}
func create2Worker(id int, group *sync.WaitGroup) done2Worker  {
	w := done2Worker{
		wg: group,
		in: make(chan int),
	}
	go do2Worker(id, w)
	return w
}
func do2Worker(id int, worker2 done2Worker)  {
	for i := range worker2.in {
		fmt.Printf("Worker:%d, accept:%c \n", id, i)
		//执行完成
		worker2.wg.Done()
	}
}
func Chan2Demo() {
	var wg sync.WaitGroup
	workers := make([]done2Worker, 10)
	for i, _ := range workers {
		workers[i] = create2Worker(i, &wg)
	}
	wg.Add(20)
	for i, worker := range workers {
		worker.in <- 'a' + i
	}
	for i, worker := range workers {
		worker.in <- 'A' + i
	}
	wg.Wait()
}

 

3. select

select 可以对 channel 进行非阻塞式调用,谁先来执行谁,在 select 中也可以使用 nil 进行调度

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			//休眠随机数
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}
func main() {
	//非阻塞式获取数据,谁先出数据就执行哪一段逻辑
	var c1, c2 = generator(), generator()
	// 10秒钟后发送一次数据
	after := time.After(time.Second * 10)
	//每秒钟都会写一次数据
	tick := time.Tick(time.Second)
	//死循环获取channel中的数据
	for  {
		select {
		//会通过select关键字进行调用,谁先来数据,就执行谁
		case n := <-c1:
			fmt.Println("Received from c1:", n)
		case n := <-c2:
			fmt.Println("Received from c2:", n)
		case <-after:
			fmt.Println("ten second after......")
		case <-tick:
			fmt.Println("tick task exec .....")
		default:
			fmt.Println("No value received")
		}
	}
}

 

4. 传统同步机制

CSP : 模型下面尽量少用传统的同步方式,传统的方式使用共享变量进行使用

  • WaitGroup
  • Mutex
type atomicInt struct {
	a int
  //定义互斥量进行同步
	lock sync.Mutex
}
func (a *atomicInt) add()  {
	a.lock.Lock()
	defer a.lock.Unlock()
	a.a++
}
func main() {
	a := atomicInt{
		a: 0,
	}
	a.add()
	go a.add()
	time.Sleep(time.Millisecond)
	fmt.Println("value : ", a.a)
}

Cond

 

5. 并发模式

5.1 生成器

//传入多个chan,返回一个只能输出的chan
func fanIn(chs...chan string) <-chan string {
	//创建管道
	c := make(chan string)
  //这里循环读取 chs 管道传入的数据
	for _ , ch := range chs {
		go func(in chan string) {
			for  {
				//循环从chCopy里面读取数据后传入到返回出去的chan
              //这里不能直接使用 ch ,因为该变量是一个闭包,后续遍历的管道会将其覆盖
				c <- <-in
			}
		}(ch)
	}
	return c
}
//创建一个channel,循环的发送数据
func msgGen(serviceName string) chan string {
	ch := make(chan string)
	go func() {
		for  {
			ch<- fmt.Sprintf("hello:%s", serviceName)
		}
	}()
	return ch
}
func main() {
	s1 := msgGen("service1")
	s2 := msgGen("service2")
	s3 := msgGen("service3")
	//可以拿到返回出来的 channel 跟服务继续做交互
	m := fanIn(s1, s2, s3)
	for  {
		fmt.Println(<-m)
	}
}

5.2 定义接口

// ChannelCreateFunc 创建接口,需要传入管道,以及参数
type ChannelCreateFunc interface {
	// Create 创建函数传入一个任何类型的管道,后面参数选择性传入
	Create(ch <-chan any, V...any) (any, bool)
}
func Creator(c ChannelCreateFunc, ch <-chan any) {
	if r, ok := c.Create(ch, time.Duration(time.Second)); ok {
		fmt.Println(r)
	} else {
		fmt.Println("未接收到数据")
	}
}
func main() {
	s1 := msgGen("service1")
	m := fanIn(s1)
	Creator(timeout.TimeoutCreator{}, m)
	Creator(noblock.NotBlockCreator{}, m)
}

5.3 非阻塞管道

新建 noblock.go,定义下面这样的格式

type NotBlockCreator struct {
}
func (n NotBlockCreator) Create(ch <-chan any, V...any) (any, bool) {
	select {
	case m := <-ch:
		return m, true
	default:
		return "", false
	}
}

5.4 超时管道

新建 timeout.go

type TimeoutCreator struct {
}
func (t TimeoutCreator) Create(ch <-chan any, V...any) (any, bool) {
	size := len(V)
	if size == 1 {
		var timeoutValue = V[0]
		switch v := timeoutValue.(type) {
		case time.Duration:
			for  {
				select {
				case m := <-ch:
					return m, true
				case <-time.After(v):
					fmt.Println("数据超时接收,直接返回false")
					return "", false
				}
			}
		}
	}
	return "", false
}

 

6. 广度优先算法(迷宫)

每次探索都是一层一层的向外进行探索,如果起始为0,那么先将周边的 1 进行探索完毕,探索1时会将1的点位先存入到队列中,等后续所有的1都探索完成之后,再取出1的点位进行1周边的探索

通过上面的这种点位算法,就可以将迷宫的路画出来

6.1 代码实现

创建文本,这里需要注意,idea创建文件分隔符编码需要设置以下,否则后续读取文件时会有问题

6 5
0 1 0 0 0
0 0 0 1 0
0 1 0 1 0
1 1 1 0 0
0 1 0 0 1
0 1 0 0 0

读取文件成二维数组

// Fscanf函数在读取文件时,遇到\r为默认替换为0,读取\n结束,如果编码不对,这里读取就会出问题
func readMaze(path string) [][]int {
	file, err := os.Open(path)
	if err != nil {
		panic(err)
	}
	var row, col int
	//这里需要取地址,函数里面会更改row和col的值
	fmt.Fscanf(file, "%d %d", &row, &col)
	fmt.Printf("%d\t%d\n", row, col)
	//创建一个二位数组,一共有多少行
	maze := make([][]int, row)
	for i := range maze {
		//创建列
		maze[i] = make([]int, col)
		for j := range maze[i] {
			fmt.Fscanf(file, "%d", &maze[i][j])
		}
	}
	return maze
}
func main() {
	//读取迷宫文件
	maze := readMaze("maze/maze.in")
	for _, row := range maze {
		for _, col := range row {
			fmt.Printf("%d\t", col)
		}
		fmt.Println()
	}
}
//点位的结构体
type point struct {
	i, j int
}
//定义需要探索的方向
var dirs = [4]point {
	//当前位置-1,就是向上
	{-1, 0},
	//左边的点位
	{0, -1},
	//向下的点位
	{1, 0},
	//向右的点位
	{0, 1},
}
//将两个点位相加,就可以获取到下一个点位
func (p point) add(r point) point {
	return point{p.i + r.i, p.j + r.j}
}
func (p point) at(grid [][]int) (int, bool) {
	//首先判断点位是否越界了,例如传入的点位 (-1,0)或者(1, -1)
	if p.i < 0 || p.i >= len(grid) {
		return 0, false
	}
	//判断j列是否越界了
	if p.j < 0 || p.j >= len(grid[p.i]) {
		return 0, false
	}
	//返回数据
	return grid[p.i][p.j], true
}
// walk 传入迷宫,指定迷宫开始的点位,以及出口的点位
func walk(maze [][]int, start, end point) [][]int  {
	//创建走过的步
	steps := make([][]int, len(maze))
	for i := range steps {
		steps[i] = make([]int, len(maze[i]))
	}
	//创建需要探索的队列,初始的点位(0,0)
	Q := []point{start}
	for len(Q) > 0 {
		cur := Q[0]
		//截取出队列中的头部
		Q = Q[1:]
		//判断如果点位等于出口的点位,那么直接退出
		if cur == end {
			break
		}
		//dirs为点位周边的四个方向,我这里采用的是 上、左、下、右 的方向进行探索
		for _, dir := range dirs {
			//将当前点位跟四个方向相加,例如 (0,0) 向上的方向就是(-1,0),将i的值进行减1
			next := cur.add(dir)
			//判断向上的点位不能超出迷宫的界限,并且返回在迷宫中的值,因为如果返回的值为1,就证明是墙
			val, ok := next.at(maze)
			if !ok || val == 1 {
				continue
			}
			//不等0就证明是墙
			val, ok = next.at(steps)
			if !ok || val != 0 {
				continue
			}
			//如果是起点,就跳过
			if next == start {
				continue
			}
			//获取到当前步数的值
			curSteps, _ := cur.at(steps)
			//将走过的点位追加到切片中
			steps[next.i][next.j] = curSteps + 1
			//继续将下一个点位添加到需要探索的队列当中
			Q = append(Q, next)
		}
	}
	return steps
}

关于Golang超全面讲解并发的文章就介绍至此,更多相关Golang并发内容请搜索编程宝库以前的文章,希望以后支持编程宝库

 常用指令agent指令-bind=0.0.0.0 指定consul所在机器的ip地址-http-port 指定web接口服务端口-client 指定哪些机器可以访问consul, 0.0.0.0 ...