Go语言并发编程深度解析

编程技术

Go语言并发编程深度解析

Go语言的并发模型是其最大的特色之一。本文将深入讲解Go语言并发编程的核心概念和最佳实践。

Goroutine基础

1. 创建Goroutine

package main

import (
	"fmt"
	"time"
)

func sayHello() {
	for i := 0; i < 5; i++ {
		fmt.Println("Hello")
		time.Sleep(100 * time.Millisecond)
	}
}

func sayWorld() {
	for i := 0; i < 5; i++ {
		fmt.Println("World")
		time.Sleep(100 * time.Millisecond)
	}
}

func main() {
	// 启动两个goroutine
	go sayHello()
	go sayWorld()

	// 等待goroutine执行完成
	time.Sleep(1 * time.Second)
	fmt.Println("Done")
}

2. 匿名函数Goroutine

package main

import (
	"fmt"
	"time"
)

func main() {
	// 使用匿名函数
	go func() {
		fmt.Println("Anonymous goroutine")
	}()

	// 带参数的匿名函数
	msg := "Hello"
	go func(m string) {
		fmt.Println(m)
	}(msg)

	time.Sleep(100 * time.Millisecond)
}

3. 等待Goroutine完成

package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			fmt.Printf("Goroutine %d is running\n", id)
		}(i)
	}

	wg.Wait()
	fmt.Println("All goroutines completed")
}

Channel通信

1. Channel基础

package main

import "fmt"

func main() {
	// 创建channel
	ch := make(chan int)

	// 发送数据
	go func() {
		ch <- 42
	}()

	// 接收数据
	value := <-ch
	fmt.Println(value) // 42
}

2. 缓冲Channel

package main

import "fmt"

func main() {
	// 创建缓冲channel
	ch := make(chan int, 3)

	// 发送数据(不会阻塞,直到缓冲区满)
	ch <- 1
	ch <- 2
	ch <- 3

	// 接收数据
	fmt.Println(<-ch) // 1
	fmt.Println(<-ch) // 2
	fmt.Println(<-ch) // 3
}

3. Channel方向

package main

import "fmt"

// 只发送channel
func sendData(ch chan<- int) {
	ch <- 42
}

// 只接收channel
func receiveData(ch <-chan int) {
	value := <-ch
	fmt.Println(value)
}

func main() {
	ch := make(chan int)
	go sendData(ch)
	receiveData(ch)
}

4. 关闭Channel

package main

import "fmt"

func main() {
	ch := make(chan int, 3)
	ch <- 1
	ch <- 2
	ch <- 3
	close(ch)

	// 使用range遍历channel
	for value := range ch {
		fmt.Println(value)
	}

	// 检查channel是否关闭
	v, ok := <-ch
	fmt.Printf("Value: %d, OK: %v\n", v, ok) // 0, false
}

5. Select多路复用

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "from ch1"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "from ch2"
	}()

	// 使用select等待多个channel
	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	}
}

6. 超时处理

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)

	go func() {
		time.Sleep(2 * time.Second)
		ch <- "result"
	}()

	select {
	case result := <-ch:
		fmt.Println(result)
	case <-time.After(1 * time.Second):
		fmt.Println("Timeout")
	}
}

7. 非阻塞Channel操作

package main

import "fmt"

func main() {
	ch := make(chan int)

	// 非阻塞发送
	select {
	case ch <- 1:
		fmt.Println("Sent")
	default:
		fmt.Println("Channel is full or no receiver")
	}

	// 非阻塞接收
	select {
	case v := <-ch:
		fmt.Println("Received:", v)
	default:
		fmt.Println("No data available")
	}
}

并发模式

1. 扇出/扇入模式

package main

import (
	"fmt"
	"sync"
)

// 生产者
func producer(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// 处理者
func processor(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

// 合并多个channel
func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		defer wg.Done()
		for n := range