php小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰时段的请求。本文将介绍一些限制生产者和消费者读取消息的方法,帮助开发者优化系统性能和提高应用的稳定性。
问题内容
我想用 go 获得应用程序生产者-消费者(通过信号关闭)。
生产者不断在队列中生成消息,限制为 10 条。 一些消费者阅读并处理该频道。 如果队列中的消息数为0,生产者再次生成10条消息。 当收到停止信号时,生产者停止生成新消息,消费者处理通道中的所有内容。
我找到了一段代码,但无法理解它是否正常工作,因为发现了奇怪的东西:
- 为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。 (在屏幕截图中,发送了 15 条消息,但处理了 5 条消息)
- 如何正确地将队列限制为10条消息,即必须写入10条消息,等待队列计数器变为0时处理,然后再写入10条?
- 是否可以在停止信号后通知生产者,以便他不再向通道生成新消息? (在屏幕截图中,生产者成功写入队列 - 12,13,14,15)
结果:
代码示例:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
in := make(chan int, 10)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(wg, i)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}
type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.jobs {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
fmt.Println("Consumer close channel")
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
}
}
解决方法
为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。
这是因为当 ctx
完成后,(consumer).consume
停止从 in
通道读取,但 go p.produce()
创建的 goroutine 仍然写入 in
通道。
下面的演示解决了这个问题并简化了源代码。
注释:
produce
在ctx
完成后停止。并且它关闭了in
通道。字段
jobs
已从consumer
中删除,工作人员直接从in
通道读取。以下要求被忽略,因为它很奇怪。常见的行为是,当作业产生时,如果
in
通道未满,则作业会立即发送到in
通道;当它已满时,发送操作将阻塞,直到从in
通道读取作业为止。如果队列中的消息数为0,生产者再次生成10条消息
package main
import (
"context"
"fmt"
"math/rand"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
in := make(chan int, 10)
p := Producer{in}
c := Consumer{in}
go p.Produce(ctx)
var wg sync.WaitGroup
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(&wg, i)
}
<-ctx.Done()
fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
wg.Wait()
}
type Consumer struct {
in chan int
}
func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.in {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
type Producer struct {
in chan int
}
func (p *Producer) Produce(ctx context.Context) {
task := 1
for {
select {
case p.in <- task:
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
case <-ctx.Done():
close(p.in)
return
}
}
}
以上就是如何限制生产者和消费者读取消息?的详细内容,更多请关注编程网其它相关文章!