10_协程与管道

发布时间 2023-10-08 13:19:46作者: Stitches

1、线程与协程的关系

  • 创建时默认的栈空间大小,JDK5以后 Java Thread Stack 默认大小为1M;Goroutine 的 Stack 初始化大小为 2K;
  • Java Thread 和 内核系统线程是 1 : 1 关系;Goroutine 和 内核系统线程关系是 M : N关系;

image-20221006202111470

​ Java操作的用户态线程实际是映射到了内核态的系统线程,当发生线程之间的切换时会引起内核系统线程之间的切换,就会涉及到运行状态(寄存器地址、程序计数器地址、栈空间变量地址等)的保存和切换;

​ 而多个Goroutine对应一个或者多个系统线程,映射到同一个系统线程的协程间的切换消耗就很少了。

2、协程机制

image-20221006202814164

Go 语言实现的线程处理器来控制多个协程G 在 系统线程 M 上的运行,多个协程任务存储在队列中按序执行。

  • 那么如果某一个协程任务占用时间过长占住整个Process,其它协程任务就不能执行?

    当Process开始处理任务时 Go会生成一个守护线程去统计每个Process处理完的任务数,如果某个Process统计的值长时间不变,那么就会给该协程任务栈中添加一个标记,当协程运行时遇到非内联函数就会遇到该标记,它会把自己中断下来放入到任务队尾等待执行。

  • 另外当任务栈中某个协程需要中断等待系统调用(比如IO),为了提高并发率,Process会把自己移动到另外一个可执行队列中去处理任务,而当被中断的协程被唤醒时,它会将自己加入到某个可执行任务队列中等待执行。协程被中断时的一些数据状态会保存到协程数据栈中,等待再次被调用时恢复到寄存器中。

3、共享内存并发机制

3.1 锁控制访问数据的安全性

// 通过互斥锁 sync.Mutex + sync.WaitGroup
func TestCounterWaitGroup(t *testing.T) {
	var mux sync.Mutex
	var wg sync.WaitGroup
	count := 0
	for i := 0; i < 5000; i++ {
		wg.Add(1)
		go func() {
			defer func() {
				mux.Unlock()
			}()
			mux.Lock()
			count++
			wg.Done()
		}()
	}
	wg.Wait()
	t.Log(count)
}

3.2 异步返回结果的方式

func service() string {
	time.Sleep(time.Millisecond * 50)
	return "Done"
}
func otherTask() {
	fmt.Println("working on something else")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("Task is done.")
}
func AsyncService() chan string {        //执行完后数据写入通道并返回通道,需要结果时从通道中获取,实现了异步请求
	retCh := make(chan string, 1)    //这里不指定通道大小就会一直阻塞等待数据从通道中拿走;指定后就是缓冲通道发送完立即结束
	go func() {						  
		ret := service()
		fmt.Println("returned result.")
		retCh <- ret
		fmt.Println("service exited.")
	}()
	return retCh
}
func TestAsyncTask(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)    //从通道中获取数据
}
image-20221007102706284

3.3 多路选择和超时机制

image-20221007102632908

func service() string {
	time.Sleep(time.Millisecond * 500)      //这里设置为睡眠500
	return "Done"
}
func AsyncService() chan string {
	retCh := make(chan string, 1)
	go func() {
		ret := service()
		fmt.Println("returned result.")
		retCh <- ret
		fmt.Println("service exited.")
	}()
	return retCh
}
func TestAsyncTask(t *testing.T) {
	select {
	case ret := <-AsyncService():
		t.Log(ret)
	case <-time.After(time.Millisecond * 100):  //超时就输出time out
		t.Error("time out")
	}
}


//output:
=== RUN   TestAsyncTask
    SyncTest_test.go:53: time out
--- FAIL: TestAsyncTask (0.11s)

FAIL

三种方式实现超时退出:

方法一: ctx.Done() 搭配 time.After()

原理:

1、通过context的WithTimeout设置一个有效时间为800毫秒的context。

2、该context会在耗尽800毫秒后或者方法执行完成后结束,结束的时候会向通道ctx.Done发送信号。

3、有人可能要问,你这里已经设置了context的有效时间,为什么还要加上这个time.After呢?

这是因为该方法内的context是自己申明的,可以手动设置对应的超时时间,但是在大多数场景,这里的ctx是从上游一直传递过来的对于上游传递过来的context还剩多少时间我们是不知道的,所以这时候通过time.After设置一个自己预期的超时时间就很有必要了。

4、注意,这里要记得调用cancel(),不然即使提前执行完了,还要傻傻等到800毫秒后context才会被释放。

func AsyncCall() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
	defer cancel()
	go func(ctx context.Context) {
		// 发送HTTP请求
	}()

	select {
	case <-ctx.Done():
		fmt.Println("call successfully!!!")
		return
	case <-time.After(time.Duration(time.Millisecond * 900)):
		fmt.Println("timeout!!!")
		return
	}
}

方法二:time.NewTimer()

原理:

这里的主要区别是将time.After换成了time.NewTimer,也是同样的思路如果接口调用提前完成,则监听到Done信号,然后关闭定时器。

否则的话,会在指定的timer即900毫秒后执行超时后的业务逻辑。

func AsyncCall() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond * 800))
	defer cancel()
	timer := time.NewTimer(time.Duration(time.Millisecond * 900))

	go func(ctx context.Context) {
		// 发送HTTP请求
	}()

	select {
	case <-ctx.Done():
		timer.Stop()
		timer.Reset(time.Second)
		fmt.Println("call successfully!!!")
		return
	case <-timer.C:
		fmt.Println("timeout!!!")
		return
	}
}

方法三:使用通道

func AsyncCall() {
  ctx := context.Background()
	done := make(chan struct{}, 1)

	go func(ctx context.Context) {
		// 发送HTTP请求
		done <- struct{}{}
	}()

	select {
	case <-done:
		fmt.Println("call successfully!!!")
		return
	case <-time.After(time.Duration(800 * time.Millisecond)):
		fmt.Println("timeout!!!")
		return
	}
}

3.4 Channel 的关闭和广播

​ 一个生产者对应多个消费者,生产者往通道中发送数据,消费者从通道中接收数据。那么如何保证在数据发送完毕时通知消费者呢?

  • 可以通过关闭 Channel,向关闭的 Channel 发送数据会导致 panic;
  • v,ok := <-ch,ok 为 bool 值,true 表示正常接收,false 表示通道关闭;
  • 所有 channel 接收者都会在 channel 关闭时,立刻从阻塞等待中返回且上述 ok 值为 false。这个广播机制常被用来向多个订阅者发送信号,如退出信号。
//生产者-消费者-通道
func procedureMsg(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		close(ch) //注意这里发送完数据需要关闭channel
		wg.Done()
	}()
}

func consumerMsg(name string, ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if num, ok := <-ch; ok { //ok为true表示正常接收; ok为false表示发送方关闭了通道
				fmt.Printf("%s : %d\n", name, num)
			} else {
				break
			}
		}
		wg.Done()
	}()
}

func TestChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	procedureMsg(ch, &wg)
	wg.Add(1)
	consumerMsg("consumer1", ch, &wg)
	wg.Add(1)
	consumerMsg("consumer2", ch, &wg)
	wg.Wait()
}

3.5 任务的取消

方式一:关闭通道

​ 取消任务可以考虑单独向通道 cancelledChannel 中发送数据来通知正在进行任务的取消;但是有多少个消费者就需要发送多少份数据,这样会在代码中耦合消费者数量。如果采用关闭通道,那么每个消费者从通道中获取的都为数据的零值,加以逻辑处理就能够实现广播通知任务取消了。

//任务的取消
func TestTaskCancel(t *testing.T) {
	cancelChannel := make(chan struct{}, 2)
	for i := 0; i < 5; i++ {
		go func(i int, ch chan struct{}) {
			for {
				if isCancelled(ch) {
					break
				}
				time.Sleep(time.Millisecond * 5)
			}
			fmt.Println(i, " Cancelled")
		}(i, cancelChannel)
	}
	sendCancelled2(cancelChannel) //方式二关闭
	//sendCancelled1(cancelChannel) //方式一关闭
	time.Sleep(time.Second * 1)
}

func isCancelled(ch chan struct{}) bool { //接收到关闭通知就关闭通道
	select {
	case ret := <-ch:
		fmt.Println(ret)
		return true
	default:
		return false
	}
}

func sendCancelled1(ch chan struct{}) {
	ch <- struct{}{}
}

func sendCancelled2(ch chan struct{}) {
	close(ch)   //通道关闭后,从通道中获取的是数据的默认零值
}
image-20221007114251946

方式二:context上下文

​ 如果存在这样的情况,某个goroutine执行时又开启了新的 goroutine 执行,那么结束上层 goroutine 时是否应该先结束底层 goroutine,显然这一点仅仅关闭通道是做不到的。由此 Go 实现了 context调度树来追踪各个执行中的 goroutine,实现如下:

//Context任务的取消
func TestTaskCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())  //返回上下文、当前goroutine结束的方法
	for i := 0; i < 5; i++ {
		go func(i int, context context.Context) {
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5)
			}
			fmt.Println(i, " Cancelled")
		}(i, ctx)
	}
	cancel()
	time.Sleep(time.Second * 1)
}

func isCancelled(ctx context.Context) bool { 
	select {
	case <-ctx.Done():  //能够获取数据说明 goroutine结束了
		return true
	default:
		return false
	}
}

3.6 单例模式只执行一次

public class Singleton {
    private static Singleton INSTANCE=null;
    private Singleton() {}
    public static Singleton getInstance() {
        if (INSTANCE==null) {
            synchronized(Singleton.class) {
                if (INSTANCE==null) {
                    INSTANCE=new Singleton();
                }
            }
        }
        return INSTANCE;
    }
}

​ 上述是 Java 方式的单例模式,通过懒汉式 double check 来生成单例对象。Go 语言中通过 sync.Once 对象的 Do() 方法来执行一次方法。

type Singleton struct {
}

var instance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
		fmt.Println("Create Obj")
		instance = new(Singleton)
	})
	return instance
}

func TestSingletonObj(t *testing.T) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%x\n", unsafe.Pointer(&obj))
			wg.Done()
		}()
	}
}
image-20221007232625983

​ 可以发现对象只创建了一次,实现了 Go 语言的单例模式。

3.7 仅需任意任务完成

​ 有时候我们需要这样的场景,多个任务并发执行,我们仅需任意一个任务完毕时返回结果就结束。比如我们采用谷歌、百度等搜索引擎查询某信息,任意一个搜索引擎得到结果时就返回结束。

​ Go 中通过通道实现,多个 goroutine 并发执行,任意任务执行结束时就往通道中传入数据,通道中一旦有数据就消费返回。

func runTask(i int) string {
	var id int
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", i)
}

//多个任务执行,当其中任意一个任务执行完毕就返回
func FirstResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner)
	for i := 0; i < numOfRunner; i++ {
		go func(i int) {
			ret := runTask(i)
			ch <- ret
		}(i)
	}
	return <-ch
}

func TestFirstResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine())
	t.Log(FirstResponse())
	time.Sleep(1 * time.Second)
	t.Log("After:", runtime.NumGoroutine())
}
image-20221008010135324

​ 注意通道大小必须设置,这样采用缓冲通道保证在函数结束时不会有协程阻塞住,如果将代码中的通道换成非缓冲通道,结果如下:

image-20221008010337346

3.8 等待所有任务完成后一起返回

​ 有两种实现方式,一种是采用 sync.WaitGroup 等待所有 goroutine 处理完毕后返回;另一种是采用 CSP机制,通过从通道中获取指定数量的消息后才返回。

//方式一:CSP机制
func runTask(i int) string {
	time.Sleep(3 * time.Second)
	return fmt.Sprintf("The result is from %d", i)
}

func FirstResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner)
	for i := 0; i < numOfRunner; i++ {
		go func(i int) {
			ret := runTask(i)
			ch <- ret
		}(i)
	}
	finalRet := ""
	for j := 0; j < numOfRunner; j++ {  //从通道中循环获取到numOfRunner个对象后才返回
		finalRet += <-ch + "\n"
	}
	return finalRet
}

func TestFirstResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine())
	t.Log(FirstResponse())
	t.Log("After:", runtime.NumGoroutine())
}
image-20221008014710103
//方式二:sync.WaitGroup
func runTask(i int) string {
	time.Sleep(3 * time.Second)
	return fmt.Sprintf("The result is from %d", i)
}

func FirstResponse(outWg *sync.WaitGroup) string {
	defer outWg.Done()
	res := make([]string, 0, 20)
	count := 0
	var innerWg sync.WaitGroup
	for i := 0; i < 10; i++ {
		innerWg.Add(1)
		go func(i int) {
			tmp := runTask(i)
			res = append(res, tmp)
			count++
			defer innerWg.Done()
		}(i)
	}
	innerWg.Wait()
	for _, item := range res {
		fmt.Println(item)
	}
	return ""
}

func TestFirstResponse(t *testing.T) {
	var outWg sync.WaitGroup
	outWg.Add(1)
	go FirstResponse(&outWg)   //注意外部传入的是 WaitGroup的引用
	outWg.Wait()
	fmt.Println("全部任务执行完毕.")
}
image-20221008100845372

3.9 创建对象池

​ 对于一些难以创建的对象,为了避免多次重复创建对象的性能消耗,可以将创建好的对象存储到对象池,使用时取出,使用完再放回对象池中。

//对象池中的存储对象
type ReusableObj struct {
}
type ObjPool struct {
	bufChan chan *ReusableObj //利用通道存储
}

//创建对象池
func NewObjPool(numOfObj int) *ObjPool {
	objPool := ObjPool{}
	objPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		objPool.bufChan <- &ReusableObj{}
	}
	return &objPool
}

//取对象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): //注意需要添加超时操作,防止取不到对象引起的超时
		return nil, errors.New("time out")
	}
}

//还对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default: //放错对象或者对象池已满
		return errors.New("overflow")
	}
}

func TestObjPool(t *testing.T) {
	pool := NewObjPool(10)
	for i := 0; i < 11; i++ {
		if obj, err := pool.GetObj(time.Second * 1); err != nil {
			t.Error(err)
		} else {
			fmt.Println(obj)
			if err := pool.ReleaseObj(obj); err != nil {
				t.Error(err)
			}
		}
	}
	fmt.Println("Done")
}

​ 在 Go 并发包中有一个 sync.Pool,它能否用作 Go 实现了的对象池呢?先来了解下 sync.Pool 的结构:

sync.Pool 对象获取:

image-20221008111748408

​ 每个 Processor 实际上包含了一个私有对象、一个共享对象池;在从 Processor 中获取对象时,如果私有对象不存在,那么就会尝试从当前 Processor 的共享池中获取;如果当前 Processor 共享池是空的,那么就尝试从其它 Processor 共享池中获取;如果所有 Processor 的共享池都是空的,就用用户指定的 New 函数产生一个新的对象返回。

​ 私有对象是协程安全的,共享池是协程不安全的,从共享池中获取对象需要结合锁来实现。

sync.Pool 对象返回:

​ 如果私有对象不存在,就保存为私有对象;

​ 如果私有对象存在,则放入当前 Processor 子池的共享池中;

sync.Pool 对象的生命周期:

​ 每一次 GC 都会清理掉 sync.Pool 缓存的对象,对象缓存的有效期为下一次 GC 之前。GC由系统自动触发,我们无法控制GC的触发时间,因此 sync.Pool 不适合作为对象池来使用。

func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} { //定义好创建对象的New方法
			fmt.Println("Create new Object.")
			return 100
		},
	}
	v := pool.Get().(int)
	fmt.Println(v)
	pool.Put(3)
	v1, _ := pool.Get().(int) //调用默认New方法创建好的对象并不会放在对象池中
	fmt.Println(v1)
	v2, _ := pool.Get().(int)
	fmt.Println(v2)
}
image-20221008113154086

总结:

​ sync.Pool 适合于通过复用,降低复杂对象的创建和GC代价;但是 sync.Pool 共享池为非协程安全的,使用时需要结合锁操作实现,会有锁方面的开销。sync.Pool 生命周期受 GC 影响,不适合用作连接池。

参考

https://juejin.cn/post/6844904099662692360