package main
import (
"fmt"
"sync"
"time"
)
type queue struct {
items []interface{}
mutex *sync.Mutex
cap int
readerCond *sync.Cond
writerCond *sync.Cond
}
func initQueue(cap int) *queue {
q := &queue{}
q.items = make([]interface{}, 0, cap)
q.mutex = &sync.Mutex{}
q.cap = cap
q.writerCond = sync.NewCond(q.mutex)
q.readerCond = sync.NewCond(q.mutex)
return q
}
func (q *queue) addItem(item interface{}) {
q.mutex.Lock()
defer q.mutex.Unlock()
for len(q.items) == q.cap {
fmt.Println("add wait")
q.writerCond.Wait()
}
q.items = append(q.items, item)
q.readerCond.Signal()
}
func (q *queue) getItem() interface{} {
q.mutex.Lock()
defer q.mutex.Unlock()
for len(q.items) == 0 {
fmt.Println("get wait")
q.readerCond.Wait()
}
res := q.items[0]
q.items = q.items[1:len(q.items)]
q.writerCond.Signal()
return res
}
func main() {
q := initQueue(2)
go func(q *queue) {
id := 1
for {
q.addItem(id)
id++
time.Sleep(100 * time.Millisecond)
}
}(q)
go func(q *queue) {
for {
fmt.Println(q.getItem())
time.Sleep(500 * time.Millisecond)
}
}(q)
go func(q *queue) {
for {
fmt.Println(q.getItem())
time.Sleep(300 * time.Millisecond)
}
}(q)
time.Sleep(3 * time.Second)
}
运行结果
1
get wait
2
3
4
add wait
5
add wait
6
7
add wait
8
add wait
9
10
add wait
11
12
add wait
13
add wait
14
15
add wait
16
add wait