[3分钟]GO:不要错误的使用并发

发布时间 2023-12-07 15:36:50作者: 姜七七

七、并发

由于 Go 语言的并发功能,经常被选作项目编程语言。 Go 语言团队已经竭尽全力以廉价(在硬件资源方面)和高性能来实现并发,但是 Go 语言的并发功能也可以被用来编写性能不高同时也不太可靠的代码。

这里有一个二分法; Go 语言的最大特点是简单、轻量级的并发模型。作为一种产品,我们的语言几乎只推广这个功能。

本节讨论了 Go 语言的并发功能的“坑”。

7.1 保持自己忙碌或做自己的工作

下面的代码存在什么问题?

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	for {
	}
}

该程序实现了我们的预期,它提供简单的 Web 服务。 然而,它同时也做了其他事情,它在无限循环中浪费 CPU 资源
这是因为 main 的最后一行上的 for {} 将阻塞 main goroutine,因为它不执行任何 IO、等待锁定、发送或接收通道数据或以其他方式与调度器通信

由于 Go 语言运行时主要是协同调度,该程序将在单个 CPU 上做无效地旋转,并可能最终实时锁定。

试想以上代码是整个系统中的一小部分,这部分代码会在无限循环中占用线程。这会导致很大的资源浪费。

我们如何解决这个问题? 这是一个建议。

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	for {
		// runtime.Gosched() 是 Go 语言 runtime 包中的一个函数,它的作用是让出当前 goroutine 的执行权限,让其他 goroutine 有机会运行。
		runtime.Gosched()
	}
}

这看起来很愚蠢,但这是我看过的一种常见解决方案。 这是不了解潜在问题的症状。

现在,如果你有更多的经验,你可能会写这样的东西。

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	select {}
}

空的 select 语句将永远阻塞。 这是一个有用的属性,因为现在我们不再调用 runtime.GoSched() 而耗费整个 CPU。 但是这也只是治疗了症状,而不是病根。

我想向你提出另一种你可能在用的解决方案。 与其在 goroutine 中运行 http.ListenAndServe,会给我们留下处理 main goroutine 的问题,
不如在 main goroutine 本身上运行 http.ListenAndServe

贴士: 如果 Go 语言程序的 main.main 函数返回,无论程序在一段时间内启动的其他 goroutine 在做什么, Go 语言程序会无条件地退出。

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}

所以这是我的第一条建议:如果你的 goroutine 在得到另一个结果之前(在这个例子中即 main 进程结束之前)无法取得进展,那么让自己完成此工作而不是委托给其他 goroutine 会更简单。

这通常会消除将结果从 goroutine 返回到其启动程序所需的大量状态跟踪和通道操作

贴士: 许多 Go 程序员过度使用 goroutine,特别是刚开始时。与生活中的所有事情一样,适度是成功的关键。

7.2 将并发性留给调用者

以下两个 API 有什么区别?

// ListDirectory returns the contents of dir.
func ListDirectory(dir string) ([]string, error)
// ListDirectory returns a channel over which
// directory entries will be published. When the list
// of entries is exhausted, the channel will be closed.
func ListDirectory(dir string) chan string

首先,最明显的不同: 第一个示例将目录读入切片然后返回整个切片,如果出错则返回错误。这是同步发生的,ListDirectory 的调用者会阻塞,
直到读取了所有目录条目。根据目录的大小,这可能需要很长时间,并且可能会分配大量内存来构建目录条目。

让我们看看第二个例子。 这个示例更像是 Go 语言风格,ListDirectory 返回一个通道,通过该通道传递目录条目。当通道关闭时,表明没有更多目录条目。
由于在 ListDirectory 返回后发生了通道的填充,ListDirectory 可能会启动一个 goroutine 来填充通道。

注意: 第二个版本实际上不必使用 Go 协程; 它可以分配一个足以保存所有目录条目而不阻塞的通道,填充通道,关闭它,然后将通道返回给调用者。
但这样做不太现实,因为会消耗大量内存来缓冲通道中的所有结果

通道版本的 ListDirectory 还有两个问题:

  • 通过使用关闭通道作为没有其他项目要处理的信号,在中途遇到了错误时, ListDirectory 无法告诉调用者通过通道返回的项目集是否完整。
    调用者无法区分空目录和读取目录的错误。两者都导致从 ListDirectory 返回的通道立即关闭。
  • 调用者必须持续从通道中读取,直到它被关闭,因为这是调用者知道此通道的是否停止的唯一方式。这是对 ListDirectory 使用的严重限制,
    即使可能已经收到了它想要的答案,调用者也必须花时间从通道中读取。就中型到大型目录的内存使用而言,它可能更有效,但这种方法并不比原始的基于切片的方法快。

以上两种实现所带来的问题的解决方案是使用回调,该回调是在执行时在每个目录条目的上下文中调用函数。

func ListDirectory(dir string, fn func(string))

毫不奇怪,这就是 filepath.WalkDir 函数的工作方式。

贴士: 如果你的函数启动了 goroutine,你必须为调用者提供一种明确停止 goroutine 的方法。 把异步执行函数的决定留给该函数的调用者通常会更容易些。

7.3 永远不要启动一个停止不了的 goroutine。

前面的例子显示当一个任务时没有必要时使用 goroutine。但使用 Go 语言的原因之一是该语言提供的并发功能。实际上,很多情况下你希望利用硬件中可用的并行性。为此,你必须使用 goroutines

这个简单的应用程序在两个不同的端口上提供 http 服务,端口 8080 用于应用程序服务,端口 8001 用于访问 /debug/pprof 终端。

package main

import (
	"fmt"
	"net/http"
	_ "net/http/pprof"
)

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	go http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux) // debug
	http.ListenAndServe("0.0.0.0:8080", mux)                       // app traffic
}

虽然这个程序不是很复杂,但它代表了真实应用程序的基础

该应用程序存在一些问题,因为它随着应用程序的增长而显露出来,所以我们现在来解决其中的一些问题。


func serveApp() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() {
	http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main() {
	go serveDebug()
	serveApp()
}

通过将 serveAppserveDebug 处理程序分解成为它们自己的函数,我们将它们与 main.main 分离。
也遵循了上面的建议,并确保 serveAppserveDebug 将它们的并发性留给调用者

但是这个程序存在一些可操作性问题。 如果 serveApp 返回,那么 main.main 将返回,导致程序关闭并由你使用的进程管理器来重新启动。

贴士: 正如 Go 语言中的函数将并发性留给调用者一样,应用程序应该将监视其状态和检测是否重启的工作留给另外的程序来做。
不要让你的应用程序负责重新启动自己,最好从应用程序外部处理该过程

然而,serveDebug 是在一个单独的 goroutine 中运行的,返回后该 goroutine 将退出,而程序的其余部分继续。
由于 /debug 处理程序已停止工作很久,因此操作人员会很不高兴,他们发现无法在你的应用程序中获取统计信息。

我们想要确保的是,如果任何负责提供此应用程序的 goroutine 停止,我们将关闭该应用程序。
【即:如果服务中某些 goroutine 负责的部分服务停止,我们需要关闭整个服务】

func serveApp() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	if err := http.ListenAndServe("0.0.0.0:8080", mux); err != nil {
		// log.Fatal(err) 是一个用于错误处理的函数调用。它的作用是在打印错误信息后,调用 os.Exit(1) 来终止程序的执行。
		log.Fatal(err)
	}
}

func serveDebug() {
	if err := http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux); err != nil {
		log.Fatal(err)
	}
}

func main() {
	go serveDebug()
	go serveApp()
	select {}
}

现在 serverAppserveDebug 检查从 ListenAndServe 返回的错误,并在需要时调用 log.Fatal
因为两个处理程序都在 goroutine 中运行,所以我们将 main goroutine 停在 select{} 中。

这种方法存在许多问题:

  1. 如果 ListenAndServer 返回 nil 错误,则不会调用 log.Fatal,并且该端口上的 HTTP 服务将在不停止应用程序的情况下关闭
  2. log.Fatal 调用 os.Exit,它将无条件地退出程序; defer 不会被调用,其他 goroutines 也不会被通知关闭,程序就停止了。 这使得编写这些函数的测试变得困难

贴士: 只在 main.maininit 函数中的使用 log.Fatal

我们真正想要的是任何错误发送回 goroutine 的调用者,以便它可以知道 goroutine 停止的原因,可以干净地关闭程序进程。

func serveApp() error {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	return http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() error {
	return http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main() {
	done := make(chan error, 2)
	go func() {
		done <- serveDebug()
	}()
	go func() {
		done <- serveApp()
	}()

	for i := 0; i < cap(done); i++ {
		if err := <-done; err != nil {
			fmt.Println("error: %v", err)
		}
	}
}

我们可以使用通道来收集 goroutine 的返回状态。通道的大小等于我们想要管理的 goroutine 的数量,这样发送到 done 通道就不会阻塞,因为这会阻止 goroutine 的关闭,导致它泄漏。

由于没有办法安全地关闭 done 通道,我们不能使用 for range 来循环通道直到获取所有 goroutine 发来的报告,而是循环我们开启的多个 goroutine,即通道的容量。

现在我们有办法等待每个 goroutine 干净地退出并记录他们遇到的错误。所需要的只是一种从第一个 goroutine 转发关闭信号到其他 goroutine 的方法

事实证明,要求 http.Server 关闭是有点牵扯的,所以我将这个逻辑转给辅助函数serve 助手使用一个地址和 http.Handler,类似于 http.ListenAndServe,还有一个 stop 通道,我们用它来触发 Shutdown 方法。

func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
	s := http.Server{
		Addr:    addr,
		Handler: handler,
	}

	go func() {
		<-stop // wait for stop signal 获取一个关闭的 channel 会获取一个空值,所以这里就不阻塞了,就会执行下一行的关闭服务操作
		s.Shutdown(context.Background())
	}()

	return s.ListenAndServe()
}

func serveApp(stop <-chan struct{}) error {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	return serve("0.0.0.0:8080", mux, stop)
}

func serveDebug(stop <-chan struct{}) error {
	return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
}

func main() {
	done := make(chan error, 2)
	stop := make(chan struct{})
	go func() {
		done <- serveDebug(stop)
	}()
	go func() {
		done <- serveApp(stop)
	}()

	var stopped bool
	for i := 0; i < cap(done); i++ {
		if err := <-done; err != nil {
			fmt.Println("error: %v", err)
		}
		if !stopped {
			stopped = true
			close(stop)
		}
	}
}

现在,每次我们在 done 通道上收到一个值时,我们关闭 stop 通道,这会导致在该通道上等待的所有 goroutine 关闭其 http.Server
这反过来将导致其余所有的 ListenAndServe goroutines 返回。 一旦我们开启的所有 goroutine 都停止了,main.main 就会返回并且进程会干净地停止。

贴士: 自己编写这种逻辑是重复而微妙的。 参考下这个包: https://github.com/heptio/workgroup ,它会为你完成大部分工作。

内容学习于该博客:英文博客

同时借鉴于该翻译:中文翻译