php小编草莓今天要为大家介绍一种新的技术——使用新的atomic.Pointer类型实现无锁无界队列。在并发编程中,队列是一种常见的数据结构,但传统的队列实现通常需要使用锁来保证线程安全,这会带来性能的损耗。而新的atomic.Pointer类型则提供了一种无锁的解决方案,可以实现高效的并发队列操作。下面我们将详细介绍这种新的实现方式,以及它的优势和使用方法。
问题内容
我正在尝试实现 michael 和 scott 的这个非阻塞队列。
我正在尝试使用 go 1.19 中引入的新的atomic.pointer 类型,但我的应用程序中出现了数据争用。
这是我的实现:
package queue
import (
"errors"
"sync/atomic"
)
// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
// node represents a node in the queue
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}
// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
n := &node[T]{}
head.Store(n)
tail.Store(n)
return &LockFreeQueue[T]{
head: head,
tail: tail,
}
}
// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return head.value, errors.New("queue is empty")
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v, nil
}
}
}
}
}
// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
我在这里发现了一个不同的实现,它可以在我的应用程序中工作而无需数据竞争,但我似乎无法弄清楚两者之间到底有什么不同。
感谢任何帮助或反馈!
解决方法
事实证明,更改一些内容可以解决问题。
第一个变化:
var n = node[t]{}
head.store(&n)
tail.store(&n)
第二个更改是更改 dequeue()
返回签名。
最终文件如下所示:
package queue
import (
"sync/atomic"
)
// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
// node represents a node in the queue
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}
// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
return &LockFreeQueue[T]{
head: head,
tail: tail,
}
}
// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
var t T
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return t
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v
}
}
}
}
}
// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
以上就是使用新的atomic.Pointer类型实现无锁无界队列的详细内容,更多请关注编程网其它相关文章!