在Go语言中,可以使用`sync.WaitGroup`来处理实时数据流。
`sync.WaitGroup`是一个计数信号量,用来等待一组goroutine的结束。它的主要方法有三个:`Add()`、`Done()`和`Wait()`。
首先,需要创建一个`sync.WaitGroup`对象,然后在每个goroutine开始之前调用`Add()`方法,表示要等待的goroutine数量增加1。在goroutine的结尾处调用`Done()`方法,表示该goroutine已经结束。最后,在主goroutine中调用`Wait()`方法,等待所有的goroutine都结束。
下面是一个简单的例子,假设有一个数据流需要实时处理,处理的任务是打印每个数据的平方值:
```go
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
dataStream := []int{1, 2, 3, 4, 5}
for _, data := range dataStream {
wg.Add(1)
go process(data, &wg)
}
wg.Wait()
}
func process(data int, wg *sync.WaitGroup) {
defer wg.Done()
result := data * data
fmt.Println(result)
}
```
在主函数中,首先创建了一个`sync.WaitGroup`对象`wg`。然后,遍历数据流,并为每个数据增加了一个等待任务。接着,启动了一个goroutine来处理每个数据,并传递了`&wg`作为参数。在`process()`函数中,打印了每个数据的平方值,并且在函数结尾处调用了`wg.Done()`来表示该goroutine已经结束。
最后,在主函数中调用`wg.Wait()`来等待所有的goroutine都结束。这样就可以保证在所有的数据都被处理完之前,主函数不会退出。