我是 GO 新手,我有一个关于使用通道信号停止 goroutine 的问题。
我有一个长期运行的 goroutine(超过 1000 个)和管理器来管理它:
func myThreadFunc(stop chan bool) {
for {
select {
case <- stop:
log.Debug("Stopping thread")
return
default:
callClientTask()
}
}
}
func callClientTask() {
// This can take long time up to 30 seconds - this is external HTTP API call
time.Sleep(5 * time.Second)
}
func manager() {
var cancelChannelSlice []chan bool
for i := 0; i < 1000; i++ {
cancelChannel := make(chan bool)
cancelChannelSlice = append(cancelChannelSlice, cancelChannel)
go myThreadFunc(cancelChannel)
}
var stopTest = func() {
for _, c := range cancelChannelSlice {
c <- true
}
}
timeout := time.After(time.Duration(300) * time.Second)
for {
select {
case <-timeout:
stopTest()
default:
time.Sleep(time.Second)
}
}
}
在这种情况下,每次我调用 c <- true
管理器都会等待 callClientTask()
完成,然后转到下一个 cancelChannel
我希望所有 goroutine 在 callClientTask()
的 1 次迭代中停止(不超过 30 秒)
我尝试的唯一方法是像这样投射新的 goroutine:
var stopTest = func() {
for _, c := range cancelChannelSlice {
go func(c chan bool) {
c <- true
close(c)
}(c)
}
}
我这是正确的方法吗?
正确答案
据我从您的问题中了解到,“您希望所有 goroutine 在 callClientTask() 的 1 次迭代中停止(不超过 30 秒)”并且工作线程同时运行而不会出现同步问题。
我重新组织了与等待组同时运行的代码。
示例代码:
package main
import (
"log"
"sync"
"time"
)
func worker(stop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-stop:
log.Println("Stopping thread")
return
default:
callClientTask()
}
}
}
func callClientTask() {
time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}
func main() {
var wg sync.WaitGroup
stop := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go worker(stop, &wg)
}
time.Sleep(5 * time.Second) // allow workers to run for a while
close(stop) // stop all workers, close channel
wg.Wait() // wait for all workers
}
输出:
2023/10/26 10:40:44 Stopping thread
2023/10/26 10:40:44 Stopping thread
....
2023/10/26 10:40:49 Stopping thread
2023/10/26 10:40:49 Stopping thread
编辑:
如果您想停止某些工作人员,则必须更新工作人员。以下代码包括具有“停止”和“停止”通道的工作人员以及启动/停止功能。
示例代码:
package main
import (
"log"
"sync"
"time"
)
type Worker struct {
stop chan struct{}
stopped chan struct{}
}
func NewWorker() *Worker {
return &Worker{
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
}
func (w *Worker) Start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-w.stop:
log.Println("Stopping thread")
close(w.stopped)
return
default:
callClientTask()
}
}
}()
}
func (w *Worker) Stop() {
close(w.stop)
<-w.stopped
}
func callClientTask() {
time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}
func main() {
var wg sync.WaitGroup
workers := make([]*Worker, 1000)
for i := 0; i < 1000; i++ {
workers[i] = NewWorker()
workers[i].Start(&wg)
}
time.Sleep(5 * time.Second) // allow workers to run for a while
for i := 0; i < 100; i++ { // stop first 100 workers
workers[i].Stop()
}
for i := 100; i < 1000; i++ { // wait other workers to finish
workers[i].Stop()
}
wg.Wait()
}
输出:
2023/10/26 12:51:26 Stopping thread
2023/10/26 12:51:28 Stopping thread
2023/10/26 12:51:30 Stopping thread
....
以上就是使用通道更快地关闭 goroutine的详细内容,更多请关注编程网其它相关文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
软考中级精品资料免费领
- 历年真题答案解析
- 备考技巧名师总结
- 高频考点精准押题
- 资料下载
- 历年真题
193.9 KB下载数265
191.63 KB下载数245
143.91 KB下载数1142
183.71 KB下载数642
644.84 KB下载数2755