通过cond实现阻塞队列

发布时间 2023-06-09 18:07:30作者: 王景迁
package main

import (
	"fmt"
	"sync"
	"time"
)

type queue struct {
	items      []interface{}
	mutex      *sync.Mutex
	cap        int

	readerCond *sync.Cond
	writerCond *sync.Cond
}

func initQueue(cap int) *queue {
	q := &queue{}
	q.items = make([]interface{}, 0, cap)
	q.mutex = &sync.Mutex{}
	q.cap = cap

	q.writerCond = sync.NewCond(q.mutex)
	q.readerCond = sync.NewCond(q.mutex)
	return q
}

func (q *queue) addItem(item interface{}) {
	q.mutex.Lock()
	defer q.mutex.Unlock()

	for len(q.items) == q.cap {
		fmt.Println("add wait")
		q.writerCond.Wait()
	}

	q.items = append(q.items, item)
	q.readerCond.Signal()
}

func (q *queue) getItem() interface{} {
	q.mutex.Lock()
	defer q.mutex.Unlock()

	for len(q.items) == 0 {
		fmt.Println("get wait")
		q.readerCond.Wait()
	}

	res := q.items[0]
	q.items = q.items[1:len(q.items)]

	q.writerCond.Signal()
	return res
}

func main() {
	q := initQueue(2)
	go func(q *queue) {
		id := 1
		for {
			q.addItem(id)
			id++
			time.Sleep(100 * time.Millisecond)
		}
	}(q)
	go func(q *queue) {
		for {
			fmt.Println(q.getItem())
			time.Sleep(500 * time.Millisecond)
		}
	}(q)
	go func(q *queue) {
		for {
			fmt.Println(q.getItem())
			time.Sleep(300 * time.Millisecond)
		}
	}(q)
	time.Sleep(3 * time.Second)
}

运行结果

1
get wait
2
3
4
add wait
5
add wait
6
7
add wait
8
add wait
9
10
add wait
11
12
add wait
13
add wait
14
15
add wait
16
add wait