chore: upgrade the embedded xsync.Map to v4.2.0
Some checks failed
Test / test (1.20, macos-15-intel) (push) Has been cancelled
Test / test (1.20, macos-latest) (push) Has been cancelled
Test / test (1.20, ubuntu-24.04-arm) (push) Has been cancelled
Test / test (1.20, ubuntu-latest) (push) Has been cancelled
Test / test (1.20, windows-latest) (push) Has been cancelled
Test / test (1.21, macos-15-intel) (push) Has been cancelled
Test / test (1.21, macos-latest) (push) Has been cancelled
Test / test (1.21, ubuntu-24.04-arm) (push) Has been cancelled
Test / test (1.21, ubuntu-latest) (push) Has been cancelled
Test / test (1.21, windows-latest) (push) Has been cancelled
Test / test (1.22, macos-15-intel) (push) Has been cancelled
Test / test (1.22, macos-latest) (push) Has been cancelled
Test / test (1.22, ubuntu-24.04-arm) (push) Has been cancelled
Test / test (1.22, ubuntu-latest) (push) Has been cancelled
Test / test (1.22, windows-latest) (push) Has been cancelled
Test / test (1.23, macos-15-intel) (push) Has been cancelled
Test / test (1.23, macos-latest) (push) Has been cancelled
Test / test (1.23, ubuntu-24.04-arm) (push) Has been cancelled
Test / test (1.23, ubuntu-latest) (push) Has been cancelled
Test / test (1.23, windows-latest) (push) Has been cancelled
Test / test (1.24, macos-15-intel) (push) Has been cancelled
Test / test (1.24, macos-latest) (push) Has been cancelled
Test / test (1.24, ubuntu-24.04-arm) (push) Has been cancelled
Test / test (1.24, ubuntu-latest) (push) Has been cancelled
Test / test (1.24, windows-latest) (push) Has been cancelled
Test / test (1.25, macos-15-intel) (push) Has been cancelled
Test / test (1.25, macos-latest) (push) Has been cancelled
Test / test (1.25, ubuntu-24.04-arm) (push) Has been cancelled
Test / test (1.25, ubuntu-latest) (push) Has been cancelled
Test / test (1.25, windows-latest) (push) Has been cancelled
Trigger CMFA Update / trigger-CMFA-update (push) Has been cancelled

This commit is contained in:
wwqgtxx 2025-11-14 00:11:01 +08:00
parent 0b3159bf9b
commit f6e494e73f
2 changed files with 465 additions and 103 deletions

View File

@ -1,16 +1,17 @@
package xsync package xsync
// copy and modified from https://github.com/puzpuzpuz/xsync/blob/v4.1.0/map.go // copy and modified from https://github.com/puzpuzpuz/xsync/blob/v4.2.0/map.go
// which is licensed under Apache v2. // which is licensed under Apache v2.
// //
// mihomo modified: // mihomo modified:
// 1. parallel Map resize has been removed to decrease the memory using. // 1. restore xsync/v3's LoadOrCompute api and rename to LoadOrStoreFn.
// 2. the zero Map is ready for use. // 2. the zero Map is ready for use.
import ( import (
"fmt" "fmt"
"math" "math"
"math/bits" "math/bits"
"runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -41,8 +42,28 @@ const (
metaMask uint64 = 0xffffffffff metaMask uint64 = 0xffffffffff
defaultMetaMasked uint64 = defaultMeta & metaMask defaultMetaMasked uint64 = defaultMeta & metaMask
emptyMetaSlot uint8 = 0x80 emptyMetaSlot uint8 = 0x80
// minimal number of buckets to transfer when participating in cooperative
// resize; should be at least defaultMinMapTableLen
minResizeTransferStride = 64
// upper limit for max number of additional goroutines that participate
// in cooperative resize; must be changed simultaneously with resizeCtl
// and the related code
maxResizeHelpersLimit = (1 << 5) - 1
) )
// max number of additional goroutines that participate in cooperative resize;
// "resize owner" goroutine isn't counted
var maxResizeHelpers = func() int32 {
v := int32(parallelism() - 1)
if v < 1 {
v = 1
}
if v > maxResizeHelpersLimit {
v = maxResizeHelpersLimit
}
return v
}()
type mapResizeHint int type mapResizeHint int
const ( const (
@ -100,16 +121,25 @@ type Map[K comparable, V any] struct {
initOnce sync.Once initOnce sync.Once
totalGrowths atomic.Int64 totalGrowths atomic.Int64
totalShrinks atomic.Int64 totalShrinks atomic.Int64
resizing atomic.Bool // resize in progress flag
resizeMu sync.Mutex // only used along with resizeCond
resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications)
table atomic.Pointer[mapTable[K, V]] table atomic.Pointer[mapTable[K, V]]
minTableLen int // table being transferred to
growOnly bool nextTable atomic.Pointer[mapTable[K, V]]
// resize control state: combines resize sequence number (upper 59 bits) and
// the current number of resize helpers (lower 5 bits);
// odd values of resize sequence mean in-progress resize
resizeCtl atomic.Uint64
// only used along with resizeCond
resizeMu sync.Mutex
// used to wake up resize waiters (concurrent writes)
resizeCond sync.Cond
// transfer progress index for resize
resizeIdx atomic.Int64
minTableLen int
growOnly bool
} }
type mapTable[K comparable, V any] struct { type mapTable[K comparable, V any] struct {
buckets []bucketPadded[K, V] buckets []bucketPadded
// striped counter for number of table entries; // striped counter for number of table entries;
// used to determine if a table shrinking is needed // used to determine if a table shrinking is needed
// occupies min(buckets_memory/1024, 64KB) of memory // occupies min(buckets_memory/1024, 64KB) of memory
@ -125,16 +155,16 @@ type counterStripe struct {
// bucketPadded is a CL-sized map bucket holding up to // bucketPadded is a CL-sized map bucket holding up to
// entriesPerMapBucket entries. // entriesPerMapBucket entries.
type bucketPadded[K comparable, V any] struct { type bucketPadded struct {
//lint:ignore U1000 ensure each bucket takes two cache lines on both 32 and 64-bit archs //lint:ignore U1000 ensure each bucket takes two cache lines on both 32 and 64-bit archs
pad [cacheLineSize - unsafe.Sizeof(bucket[K, V]{})]byte pad [cacheLineSize - unsafe.Sizeof(bucket{})]byte
bucket[K, V] bucket
} }
type bucket[K comparable, V any] struct { type bucket struct {
meta atomic.Uint64 meta uint64
entries [entriesPerMapBucket]atomic.Pointer[entry[K, V]] // *entry entries [entriesPerMapBucket]unsafe.Pointer // *entry
next atomic.Pointer[bucketPadded[K, V]] // *bucketPadded next unsafe.Pointer // *bucketPadded
mu sync.Mutex mu sync.Mutex
} }
@ -194,15 +224,15 @@ func (m *Map[K, V]) init() {
m.minTableLen = defaultMinMapTableLen m.minTableLen = defaultMinMapTableLen
} }
m.resizeCond = *sync.NewCond(&m.resizeMu) m.resizeCond = *sync.NewCond(&m.resizeMu)
table := newMapTable[K, V](m.minTableLen) table := newMapTable[K, V](m.minTableLen, maphash.MakeSeed())
m.minTableLen = len(table.buckets) m.minTableLen = len(table.buckets)
m.table.Store(table) m.table.Store(table)
} }
func newMapTable[K comparable, V any](minTableLen int) *mapTable[K, V] { func newMapTable[K comparable, V any](minTableLen int, seed maphash.Seed) *mapTable[K, V] {
buckets := make([]bucketPadded[K, V], minTableLen) buckets := make([]bucketPadded, minTableLen)
for i := range buckets { for i := range buckets {
buckets[i].meta.Store(defaultMeta) buckets[i].meta = defaultMeta
} }
counterLen := minTableLen >> 10 counterLen := minTableLen >> 10
if counterLen < minMapCounterLen { if counterLen < minMapCounterLen {
@ -214,7 +244,7 @@ func newMapTable[K comparable, V any](minTableLen int) *mapTable[K, V] {
t := &mapTable[K, V]{ t := &mapTable[K, V]{
buckets: buckets, buckets: buckets,
size: counter, size: counter,
seed: maphash.MakeSeed(), seed: seed,
} }
return t return t
} }
@ -246,22 +276,24 @@ func (m *Map[K, V]) Load(key K) (value V, ok bool) {
bidx := uint64(len(table.buckets)-1) & h1 bidx := uint64(len(table.buckets)-1) & h1
b := &table.buckets[bidx] b := &table.buckets[bidx]
for { for {
metaw := b.meta.Load() metaw := atomic.LoadUint64(&b.meta)
markedw := markZeroBytes(metaw^h2w) & metaMask markedw := markZeroBytes(metaw^h2w) & metaMask
for markedw != 0 { for markedw != 0 {
idx := firstMarkedByteIndex(markedw) idx := firstMarkedByteIndex(markedw)
e := b.entries[idx].Load() eptr := atomic.LoadPointer(&b.entries[idx])
if e != nil { if eptr != nil {
e := (*entry[K, V])(eptr)
if e.key == key { if e.key == key {
return e.value, true return e.value, true
} }
} }
markedw &= markedw - 1 markedw &= markedw - 1
} }
b = b.next.Load() bptr := atomic.LoadPointer(&b.next)
if b == nil { if bptr == nil {
return return
} }
b = (*bucketPadded)(bptr)
} }
} }
@ -399,7 +431,7 @@ func (m *Map[K, V]) doCompute(
for { for {
compute_attempt: compute_attempt:
var ( var (
emptyb *bucketPadded[K, V] emptyb *bucketPadded
emptyidx int emptyidx int
) )
table := m.table.Load() table := m.table.Load()
@ -415,12 +447,13 @@ func (m *Map[K, V]) doCompute(
b := rootb b := rootb
load: load:
for { for {
metaw := b.meta.Load() metaw := atomic.LoadUint64(&b.meta)
markedw := markZeroBytes(metaw^h2w) & metaMask markedw := markZeroBytes(metaw^h2w) & metaMask
for markedw != 0 { for markedw != 0 {
idx := firstMarkedByteIndex(markedw) idx := firstMarkedByteIndex(markedw)
e := b.entries[idx].Load() eptr := atomic.LoadPointer(&b.entries[idx])
if e != nil { if eptr != nil {
e := (*entry[K, V])(eptr)
if e.key == key { if e.key == key {
if loadOp == loadOrComputeOp { if loadOp == loadOrComputeOp {
return e.value, true return e.value, true
@ -430,23 +463,24 @@ func (m *Map[K, V]) doCompute(
} }
markedw &= markedw - 1 markedw &= markedw - 1
} }
b = b.next.Load() bptr := atomic.LoadPointer(&b.next)
if b == nil { if bptr == nil {
if loadOp == loadAndDeleteOp { if loadOp == loadAndDeleteOp {
return *new(V), false return *new(V), false
} }
break load break load
} }
b = (*bucketPadded)(bptr)
} }
} }
rootb.mu.Lock() rootb.mu.Lock()
// The following two checks must go in reverse to what's // The following two checks must go in reverse to what's
// in the resize method. // in the resize method.
if m.resizeInProgress() { if seq := resizeSeq(m.resizeCtl.Load()); seq&1 == 1 {
// Resize is in progress. Wait, then go for another attempt. // Resize is in progress. Help with the transfer, then go for another attempt.
rootb.mu.Unlock() rootb.mu.Unlock()
m.waitForResize() m.helpResize(seq)
goto compute_attempt goto compute_attempt
} }
if m.newerTableExists(table) { if m.newerTableExists(table) {
@ -456,12 +490,13 @@ func (m *Map[K, V]) doCompute(
} }
b := rootb b := rootb
for { for {
metaw := b.meta.Load() metaw := b.meta
markedw := markZeroBytes(metaw^h2w) & metaMask markedw := markZeroBytes(metaw^h2w) & metaMask
for markedw != 0 { for markedw != 0 {
idx := firstMarkedByteIndex(markedw) idx := firstMarkedByteIndex(markedw)
e := b.entries[idx].Load() eptr := b.entries[idx]
if e != nil { if eptr != nil {
e := (*entry[K, V])(eptr)
if e.key == key { if e.key == key {
// In-place update/delete. // In-place update/delete.
// We get a copy of the value via an interface{} on each call, // We get a copy of the value via an interface{} on each call,
@ -475,8 +510,8 @@ func (m *Map[K, V]) doCompute(
// Deletion. // Deletion.
// First we update the hash, then the entry. // First we update the hash, then the entry.
newmetaw := setByte(metaw, emptyMetaSlot, idx) newmetaw := setByte(metaw, emptyMetaSlot, idx)
b.meta.Store(newmetaw) atomic.StoreUint64(&b.meta, newmetaw)
b.entries[idx].Store(nil) atomic.StorePointer(&b.entries[idx], nil)
rootb.mu.Unlock() rootb.mu.Unlock()
table.addSize(bidx, -1) table.addSize(bidx, -1)
// Might need to shrink the table if we left bucket empty. // Might need to shrink the table if we left bucket empty.
@ -488,7 +523,7 @@ func (m *Map[K, V]) doCompute(
newe := new(entry[K, V]) newe := new(entry[K, V])
newe.key = key newe.key = key
newe.value = newv newe.value = newv
b.entries[idx].Store(newe) atomic.StorePointer(&b.entries[idx], unsafe.Pointer(newe))
case CancelOp: case CancelOp:
newv = oldv newv = oldv
} }
@ -512,7 +547,7 @@ func (m *Map[K, V]) doCompute(
emptyidx = idx emptyidx = idx
} }
} }
if b.next.Load() == nil { if b.next == nil {
if emptyb != nil { if emptyb != nil {
// Insertion into an existing bucket. // Insertion into an existing bucket.
var zeroV V var zeroV V
@ -526,8 +561,8 @@ func (m *Map[K, V]) doCompute(
newe.key = key newe.key = key
newe.value = newValue newe.value = newValue
// First we update meta, then the entry. // First we update meta, then the entry.
emptyb.meta.Store(setByte(emptyb.meta.Load(), h2, emptyidx)) atomic.StoreUint64(&emptyb.meta, setByte(emptyb.meta, h2, emptyidx))
emptyb.entries[emptyidx].Store(newe) atomic.StorePointer(&emptyb.entries[emptyidx], unsafe.Pointer(newe))
rootb.mu.Unlock() rootb.mu.Unlock()
table.addSize(bidx, 1) table.addSize(bidx, 1)
return newValue, computeOnly return newValue, computeOnly
@ -549,19 +584,19 @@ func (m *Map[K, V]) doCompute(
return newValue, false return newValue, false
default: default:
// Create and append a bucket. // Create and append a bucket.
newb := new(bucketPadded[K, V]) newb := new(bucketPadded)
newb.meta.Store(setByte(defaultMeta, h2, 0)) newb.meta = setByte(defaultMeta, h2, 0)
newe := new(entry[K, V]) newe := new(entry[K, V])
newe.key = key newe.key = key
newe.value = newValue newe.value = newValue
newb.entries[0].Store(newe) newb.entries[0] = unsafe.Pointer(newe)
b.next.Store(newb) atomic.StorePointer(&b.next, unsafe.Pointer(newb))
rootb.mu.Unlock() rootb.mu.Unlock()
table.addSize(bidx, 1) table.addSize(bidx, 1)
return newValue, computeOnly return newValue, computeOnly
} }
} }
b = b.next.Load() b = (*bucketPadded)(b.next)
} }
} }
} }
@ -570,13 +605,21 @@ func (m *Map[K, V]) newerTableExists(table *mapTable[K, V]) bool {
return table != m.table.Load() return table != m.table.Load()
} }
func (m *Map[K, V]) resizeInProgress() bool { func resizeSeq(ctl uint64) uint64 {
return m.resizing.Load() return ctl >> 5
}
func resizeHelpers(ctl uint64) uint64 {
return ctl & maxResizeHelpersLimit
}
func resizeCtl(seq uint64, helpers uint64) uint64 {
return (seq << 5) | (helpers & maxResizeHelpersLimit)
} }
func (m *Map[K, V]) waitForResize() { func (m *Map[K, V]) waitForResize() {
m.resizeMu.Lock() m.resizeMu.Lock()
for m.resizeInProgress() { for resizeSeq(m.resizeCtl.Load())&1 == 1 {
m.resizeCond.Wait() m.resizeCond.Wait()
} }
m.resizeMu.Unlock() m.resizeMu.Unlock()
@ -593,9 +636,9 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
} }
} }
// Slow path. // Slow path.
if !m.resizing.CompareAndSwap(false, true) { seq := resizeSeq(m.resizeCtl.Load())
// Someone else started resize. Wait for it to finish. if seq&1 == 1 || !m.resizeCtl.CompareAndSwap(resizeCtl(seq, 0), resizeCtl(seq+1, 0)) {
m.waitForResize() m.helpResize(seq)
return return
} }
var newTable *mapTable[K, V] var newTable *mapTable[K, V]
@ -604,64 +647,189 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
switch hint { switch hint {
case mapGrowHint: case mapGrowHint:
// Grow the table with factor of 2. // Grow the table with factor of 2.
// We must keep the same table seed here to keep the same hash codes
// allowing us to avoid locking destination buckets when resizing.
m.totalGrowths.Add(1) m.totalGrowths.Add(1)
newTable = newMapTable[K, V](tableLen << 1) newTable = newMapTable[K, V](tableLen<<1, table.seed)
case mapShrinkHint: case mapShrinkHint:
shrinkThreshold := int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) shrinkThreshold := int64((tableLen * entriesPerMapBucket) / mapShrinkFraction)
if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold { if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold {
// Shrink the table with factor of 2. // Shrink the table with factor of 2.
// It's fine to generate a new seed since full locking
// is required anyway.
m.totalShrinks.Add(1) m.totalShrinks.Add(1)
newTable = newMapTable[K, V](tableLen >> 1) newTable = newMapTable[K, V](tableLen>>1, maphash.MakeSeed())
} else { } else {
// No need to shrink. Wake up all waiters and give up. // No need to shrink. Wake up all waiters and give up.
m.resizeMu.Lock() m.resizeMu.Lock()
m.resizing.Store(false) m.resizeCtl.Store(resizeCtl(seq+2, 0))
m.resizeCond.Broadcast() m.resizeCond.Broadcast()
m.resizeMu.Unlock() m.resizeMu.Unlock()
return return
} }
case mapClearHint: case mapClearHint:
newTable = newMapTable[K, V](m.minTableLen) newTable = newMapTable[K, V](m.minTableLen, maphash.MakeSeed())
default: default:
panic(fmt.Sprintf("unexpected resize hint: %d", hint)) panic(fmt.Sprintf("unexpected resize hint: %d", hint))
} }
// Copy the data only if we're not clearing the map. // Copy the data only if we're not clearing the map.
if hint != mapClearHint { if hint != mapClearHint {
for i := 0; i < tableLen; i++ { // Set up cooperative transfer state.
copied := copyBucket(&table.buckets[i], newTable) // Next table must be published as the last step.
newTable.addSizePlain(uint64(i), copied) m.resizeIdx.Store(0)
} m.nextTable.Store(newTable)
// Copy the buckets.
m.transfer(table, newTable)
}
// We're about to publish the new table, but before that
// we must wait for all helpers to finish.
for resizeHelpers(m.resizeCtl.Load()) != 0 {
runtime.Gosched()
} }
// Publish the new table and wake up all waiters.
m.table.Store(newTable) m.table.Store(newTable)
m.nextTable.Store(nil)
ctl := resizeCtl(seq+1, 0)
newCtl := resizeCtl(seq+2, 0)
// Increment the sequence number and wake up all waiters.
m.resizeMu.Lock() m.resizeMu.Lock()
m.resizing.Store(false) // There may be slowpoke helpers who have just incremented
// the helper counter. This CAS loop makes sure to wait
// for them to back off.
for !m.resizeCtl.CompareAndSwap(ctl, newCtl) {
runtime.Gosched()
}
m.resizeCond.Broadcast() m.resizeCond.Broadcast()
m.resizeMu.Unlock() m.resizeMu.Unlock()
} }
func copyBucket[K comparable, V any]( func (m *Map[K, V]) helpResize(seq uint64) {
b *bucketPadded[K, V], for {
table := m.table.Load()
nextTable := m.nextTable.Load()
if resizeSeq(m.resizeCtl.Load()) == seq {
if nextTable == nil || nextTable == table {
// Carry on until the next table is set by the main
// resize goroutine or until the resize finishes.
runtime.Gosched()
continue
}
// The resize is still in-progress, so let's try registering
// as a helper.
for {
ctl := m.resizeCtl.Load()
if resizeSeq(ctl) != seq || resizeHelpers(ctl) >= uint64(maxResizeHelpers) {
// The resize has ended or there are too many helpers.
break
}
if m.resizeCtl.CompareAndSwap(ctl, ctl+1) {
// Yay, we're a resize helper!
m.transfer(table, nextTable)
// Don't forget to unregister as a helper.
m.resizeCtl.Add(^uint64(0))
break
}
}
m.waitForResize()
}
break
}
}
func (m *Map[K, V]) transfer(table, newTable *mapTable[K, V]) {
tableLen := len(table.buckets)
newTableLen := len(newTable.buckets)
stride := (tableLen >> 3) / int(maxResizeHelpers)
if stride < minResizeTransferStride {
stride = minResizeTransferStride
}
for {
// Claim work by incrementing resizeIdx.
nextIdx := m.resizeIdx.Add(int64(stride))
start := int(nextIdx) - stride
if start < 0 {
start = 0
}
if start > tableLen {
break
}
end := int(nextIdx)
if end > tableLen {
end = tableLen
}
// Transfer buckets in this range.
total := 0
if newTableLen > tableLen {
// We're growing the table with 2x multiplier, so entries from a N bucket can
// only be transferred to N and 2*N buckets in the new table. Thus, destination
// buckets written by the resize helpers don't intersect, so we don't need to
// acquire locks in the destination buckets.
for i := start; i < end; i++ {
total += transferBucketUnsafe(&table.buckets[i], newTable)
}
} else {
// We're shrinking the table, so all locks must be acquired.
for i := start; i < end; i++ {
total += transferBucket(&table.buckets[i], newTable)
}
}
// The exact counter stripe doesn't matter here, so pick up the one
// that corresponds to the start value to avoid contention.
newTable.addSize(uint64(start), total)
}
}
// Doesn't acquire dest bucket lock.
func transferBucketUnsafe[K comparable, V any](
b *bucketPadded,
destTable *mapTable[K, V], destTable *mapTable[K, V],
) (copied int) { ) (copied int) {
rootb := b rootb := b
rootb.mu.Lock() rootb.mu.Lock()
for { for {
for i := 0; i < entriesPerMapBucket; i++ { for i := 0; i < entriesPerMapBucket; i++ {
if e := b.entries[i].Load(); e != nil { if eptr := b.entries[i]; eptr != nil {
e := (*entry[K, V])(eptr)
hash := maphash.Comparable(destTable.seed, e.key) hash := maphash.Comparable(destTable.seed, e.key)
bidx := uint64(len(destTable.buckets)-1) & h1(hash) bidx := uint64(len(destTable.buckets)-1) & h1(hash)
destb := &destTable.buckets[bidx] destb := &destTable.buckets[bidx]
appendToBucket(h2(hash), b.entries[i].Load(), destb) appendToBucket(h2(hash), e, destb)
copied++ copied++
} }
} }
if next := b.next.Load(); next == nil { if b.next == nil {
rootb.mu.Unlock() rootb.mu.Unlock()
return return
} else {
b = next
} }
b = (*bucketPadded)(b.next)
}
}
func transferBucket[K comparable, V any](
b *bucketPadded,
destTable *mapTable[K, V],
) (copied int) {
rootb := b
rootb.mu.Lock()
for {
for i := 0; i < entriesPerMapBucket; i++ {
if eptr := b.entries[i]; eptr != nil {
e := (*entry[K, V])(eptr)
hash := maphash.Comparable(destTable.seed, e.key)
bidx := uint64(len(destTable.buckets)-1) & h1(hash)
destb := &destTable.buckets[bidx]
destb.mu.Lock()
appendToBucket(h2(hash), e, destb)
destb.mu.Unlock()
copied++
}
}
if b.next == nil {
rootb.mu.Unlock()
return
}
b = (*bucketPadded)(b.next)
} }
} }
@ -691,16 +859,15 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
rootb.mu.Lock() rootb.mu.Lock()
for { for {
for i := 0; i < entriesPerMapBucket; i++ { for i := 0; i < entriesPerMapBucket; i++ {
if entry := b.entries[i].Load(); entry != nil { if b.entries[i] != nil {
bentries = append(bentries, entry) bentries = append(bentries, (*entry[K, V])(b.entries[i]))
} }
} }
if next := b.next.Load(); next == nil { if b.next == nil {
rootb.mu.Unlock() rootb.mu.Unlock()
break break
} else {
b = next
} }
b = (*bucketPadded)(b.next)
} }
// Call the function for all copied entries. // Call the function for all copied entries.
for j, e := range bentries { for j, e := range bentries {
@ -727,24 +894,25 @@ func (m *Map[K, V]) Size() int {
return int(m.table.Load().sumSize()) return int(m.table.Load().sumSize())
} }
func appendToBucket[K comparable, V any](h2 uint8, e *entry[K, V], b *bucketPadded[K, V]) { // It is safe to use plain stores here because the destination bucket must be
// either locked or exclusively written to by the helper during resize.
func appendToBucket[K comparable, V any](h2 uint8, e *entry[K, V], b *bucketPadded) {
for { for {
for i := 0; i < entriesPerMapBucket; i++ { for i := 0; i < entriesPerMapBucket; i++ {
if b.entries[i].Load() == nil { if b.entries[i] == nil {
b.meta.Store(setByte(b.meta.Load(), h2, i)) b.meta = setByte(b.meta, h2, i)
b.entries[i].Store(e) b.entries[i] = unsafe.Pointer(e)
return return
} }
} }
if next := b.next.Load(); next == nil { if b.next == nil {
newb := new(bucketPadded[K, V]) newb := new(bucketPadded)
newb.meta.Store(setByte(defaultMeta, h2, 0)) newb.meta = setByte(defaultMeta, h2, 0)
newb.entries[0].Store(e) newb.entries[0] = unsafe.Pointer(e)
b.next.Store(newb) b.next = unsafe.Pointer(newb)
return return
} else {
b = next
} }
b = (*bucketPadded)(b.next)
} }
} }
@ -753,11 +921,6 @@ func (table *mapTable[K, V]) addSize(bucketIdx uint64, delta int) {
atomic.AddInt64(&table.size[cidx].c, int64(delta)) atomic.AddInt64(&table.size[cidx].c, int64(delta))
} }
func (table *mapTable[K, V]) addSizePlain(bucketIdx uint64, delta int) {
cidx := uint64(len(table.size)-1) & bucketIdx
table.size[cidx].c += int64(delta)
}
func (table *mapTable[K, V]) sumSize() int64 { func (table *mapTable[K, V]) sumSize() int64 {
sum := int64(0) sum := int64(0)
for i := range table.size { for i := range table.size {
@ -856,7 +1019,7 @@ func (m *Map[K, V]) Stats() MapStats {
nentriesLocal := 0 nentriesLocal := 0
stats.Capacity += entriesPerMapBucket stats.Capacity += entriesPerMapBucket
for i := 0; i < entriesPerMapBucket; i++ { for i := 0; i < entriesPerMapBucket; i++ {
if b.entries[i].Load() != nil { if atomic.LoadPointer(&b.entries[i]) != nil {
stats.Size++ stats.Size++
nentriesLocal++ nentriesLocal++
} }
@ -865,11 +1028,10 @@ func (m *Map[K, V]) Stats() MapStats {
if nentriesLocal == 0 { if nentriesLocal == 0 {
stats.EmptyBuckets++ stats.EmptyBuckets++
} }
if next := b.next.Load(); next == nil { if b.next == nil {
break break
} else {
b = next
} }
b = (*bucketPadded)(atomic.LoadPointer(&b.next))
stats.TotalBuckets++ stats.TotalBuckets++
} }
if nentries < stats.MinEntries { if nentries < stats.MinEntries {
@ -906,6 +1068,15 @@ func nextPowOf2(v uint32) uint32 {
return v return v
} }
func parallelism() uint32 {
maxProcs := uint32(runtime.GOMAXPROCS(0))
numCores := uint32(runtime.NumCPU())
if maxProcs < numCores {
return maxProcs
}
return numCores
}
func broadcast(b uint8) uint64 { func broadcast(b uint8) uint64 {
return 0x101010101010101 * uint64(b) return 0x101010101010101 * uint64(b)
} }
@ -920,6 +1091,7 @@ func markZeroBytes(w uint64) uint64 {
return ((w - 0x0101010101010101) & (^w) & 0x8080808080808080) return ((w - 0x0101010101010101) & (^w) & 0x8080808080808080)
} }
// Sets byte of the input word at the specified index to the given value.
func setByte(w uint64, b uint8, idx int) uint64 { func setByte(w uint64, b uint8, idx int) uint64 {
shift := idx << 3 shift := idx << 3
return (w &^ (0xff << shift)) | (uint64(b) << shift) return (w &^ (0xff << shift)) | (uint64(b) << shift)

View File

@ -3,6 +3,7 @@ package xsync
import ( import (
"math" "math"
"math/rand" "math/rand"
"runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -53,11 +54,11 @@ func runParallel(b *testing.B, benchFn func(pb *testing.PB)) {
} }
func TestMap_BucketStructSize(t *testing.T) { func TestMap_BucketStructSize(t *testing.T) {
size := unsafe.Sizeof(bucketPadded[string, int64]{}) size := unsafe.Sizeof(bucketPadded{})
if size != 64 { if size != 64 {
t.Fatalf("size of 64B (one cache line) is expected, got: %d", size) t.Fatalf("size of 64B (one cache line) is expected, got: %d", size)
} }
size = unsafe.Sizeof(bucketPadded[struct{}, int32]{}) size = unsafe.Sizeof(bucketPadded{})
if size != 64 { if size != 64 {
t.Fatalf("size of 64B (one cache line) is expected, got: %d", size) t.Fatalf("size of 64B (one cache line) is expected, got: %d", size)
} }
@ -743,10 +744,7 @@ func TestNewMapGrowOnly_OnlyShrinksOnClear(t *testing.T) {
} }
func TestMapResize(t *testing.T) { func TestMapResize(t *testing.T) {
testMapResize(t, NewMap[string, int]()) m := NewMap[string, int]()
}
func testMapResize(t *testing.T, m *Map[string, int]) {
const numEntries = 100_000 const numEntries = 100_000
for i := 0; i < numEntries; i++ { for i := 0; i < numEntries; i++ {
@ -810,6 +808,147 @@ func TestMapResize_CounterLenLimit(t *testing.T) {
} }
} }
func testParallelResize(t *testing.T, numGoroutines int) {
m := NewMap[int, int]()
// Fill the map to trigger resizing
const initialEntries = 10000
const newEntries = 5000
for i := 0; i < initialEntries; i++ {
m.Store(i, i*2)
}
// Start concurrent operations that should trigger helping behavior
var wg sync.WaitGroup
// Launch goroutines that will encounter resize operations
for g := 0; g < numGoroutines; g++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
// Perform many operations to trigger resize and helping
for i := 0; i < newEntries; i++ {
key := goroutineID*newEntries + i + initialEntries
m.Store(key, key*2)
// Verify the value
if val, ok := m.Load(key); !ok || val != key*2 {
t.Errorf("Failed to load key %d: got %v, %v", key, val, ok)
return
}
}
}(g)
}
wg.Wait()
// Verify all entries are present
finalSize := m.Size()
expectedSize := initialEntries + numGoroutines*newEntries
if finalSize != expectedSize {
t.Errorf("Expected size %d, got %d", expectedSize, finalSize)
}
stats := m.Stats()
if stats.TotalGrowths == 0 {
t.Error("Expected at least one table growth due to concurrent operations")
}
}
func TestMapParallelResize(t *testing.T) {
testParallelResize(t, 1)
testParallelResize(t, runtime.GOMAXPROCS(0))
testParallelResize(t, 100)
}
func testParallelResizeWithSameKeys(t *testing.T, numGoroutines int) {
m := NewMap[int, int]()
// Fill the map to trigger resizing
const entries = 1000
for i := 0; i < entries; i++ {
m.Store(2*i, 2*i)
}
// Start concurrent operations that should trigger helping behavior
var wg sync.WaitGroup
// Launch goroutines that will encounter resize operations
for g := 0; g < numGoroutines; g++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for i := 0; i < 10*entries; i++ {
m.Store(i, i)
}
}(g)
}
wg.Wait()
// Verify all entries are present
finalSize := m.Size()
expectedSize := 10 * entries
if finalSize != expectedSize {
t.Errorf("Expected size %d, got %d", expectedSize, finalSize)
}
stats := m.Stats()
if stats.TotalGrowths == 0 {
t.Error("Expected at least one table growth due to concurrent operations")
}
}
func TestMapParallelResize_IntersectingKeys(t *testing.T) {
testParallelResizeWithSameKeys(t, 1)
testParallelResizeWithSameKeys(t, runtime.GOMAXPROCS(0))
testParallelResizeWithSameKeys(t, 100)
}
func testParallelShrinking(t *testing.T, numGoroutines int) {
m := NewMap[int, int]()
// Fill the map to trigger resizing
const entries = 100000
for i := 0; i < entries; i++ {
m.Store(i, i)
}
// Start concurrent operations that should trigger helping behavior
var wg sync.WaitGroup
// Launch goroutines that will encounter resize operations
for g := 0; g < numGoroutines; g++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for i := 0; i < entries; i++ {
m.Delete(i)
}
}(g)
}
wg.Wait()
// Verify all entries are present
finalSize := m.Size()
if finalSize != 0 {
t.Errorf("Expected size 0, got %d", finalSize)
}
stats := m.Stats()
if stats.TotalShrinks == 0 {
t.Error("Expected at least one table shrinking due to concurrent operations")
}
}
func TestMapParallelShrinking(t *testing.T) {
testParallelShrinking(t, 1)
testParallelShrinking(t, runtime.GOMAXPROCS(0))
testParallelShrinking(t, 100)
}
func parallelSeqMapGrower(m *Map[int, int], numEntries int, positive bool, cdone chan bool) { func parallelSeqMapGrower(m *Map[int, int], numEntries int, positive bool, cdone chan bool) {
for i := 0; i < numEntries; i++ { for i := 0; i < numEntries; i++ {
if positive { if positive {
@ -1459,7 +1598,7 @@ func BenchmarkMapRange(b *testing.B) {
} }
// Benchmarks noop performance of Compute // Benchmarks noop performance of Compute
func BenchmarkCompute(b *testing.B) { func BenchmarkMapCompute(b *testing.B) {
tests := []struct { tests := []struct {
Name string Name string
Op ComputeOp Op ComputeOp
@ -1487,6 +1626,57 @@ func BenchmarkCompute(b *testing.B) {
} }
} }
func BenchmarkMapParallelRehashing(b *testing.B) {
tests := []struct {
name string
goroutines int
numEntries int
}{
{"1goroutine_10M", 1, 10_000_000},
{"4goroutines_10M", 4, 10_000_000},
{"8goroutines_10M", 8, 10_000_000},
}
for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
m := NewMap[int, int]()
var wg sync.WaitGroup
entriesPerGoroutine := test.numEntries / test.goroutines
start := time.Now()
for g := 0; g < test.goroutines; g++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
base := goroutineID * entriesPerGoroutine
for j := 0; j < entriesPerGoroutine; j++ {
key := base + j
m.Store(key, key)
}
}(g)
}
wg.Wait()
duration := time.Since(start)
b.ReportMetric(float64(test.numEntries)/duration.Seconds(), "entries/s")
finalSize := m.Size()
if finalSize != test.numEntries {
b.Fatalf("Expected size %d, got %d", test.numEntries, finalSize)
}
stats := m.Stats()
if stats.TotalGrowths == 0 {
b.Error("Expected at least one table growth during rehashing")
}
}
})
}
}
func TestNextPowOf2(t *testing.T) { func TestNextPowOf2(t *testing.T) {
if nextPowOf2(0) != 1 { if nextPowOf2(0) != 1 {
t.Error("nextPowOf2 failed") t.Error("nextPowOf2 failed")