From f6e494e73f5d51d29fec4f40ae52aa5d77ee4c47 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Fri, 14 Nov 2025 00:11:01 +0800 Subject: [PATCH] chore: upgrade the embedded xsync.Map to v4.2.0 --- common/xsync/map.go | 364 ++++++++++++++++++++++++++++----------- common/xsync/map_test.go | 204 +++++++++++++++++++++- 2 files changed, 465 insertions(+), 103 deletions(-) diff --git a/common/xsync/map.go b/common/xsync/map.go index b85bf421..fd8faa8b 100644 --- a/common/xsync/map.go +++ b/common/xsync/map.go @@ -1,16 +1,17 @@ package xsync -// copy and modified from https://github.com/puzpuzpuz/xsync/blob/v4.1.0/map.go +// copy and modified from https://github.com/puzpuzpuz/xsync/blob/v4.2.0/map.go // which is licensed under Apache v2. // // mihomo modified: -// 1. parallel Map resize has been removed to decrease the memory using. +// 1. restore xsync/v3's LoadOrCompute api and rename to LoadOrStoreFn. // 2. the zero Map is ready for use. import ( "fmt" "math" "math/bits" + "runtime" "strings" "sync" "sync/atomic" @@ -41,8 +42,28 @@ const ( metaMask uint64 = 0xffffffffff defaultMetaMasked uint64 = defaultMeta & metaMask emptyMetaSlot uint8 = 0x80 + // minimal number of buckets to transfer when participating in cooperative + // resize; should be at least defaultMinMapTableLen + minResizeTransferStride = 64 + // upper limit for max number of additional goroutines that participate + // in cooperative resize; must be changed simultaneously with resizeCtl + // and the related code + maxResizeHelpersLimit = (1 << 5) - 1 ) +// max number of additional goroutines that participate in cooperative resize; +// "resize owner" goroutine isn't counted +var maxResizeHelpers = func() int32 { + v := int32(parallelism() - 1) + if v < 1 { + v = 1 + } + if v > maxResizeHelpersLimit { + v = maxResizeHelpersLimit + } + return v +}() + type mapResizeHint int const ( @@ -100,16 +121,25 @@ type Map[K comparable, V any] struct { initOnce sync.Once totalGrowths atomic.Int64 totalShrinks atomic.Int64 - resizing atomic.Bool // resize in progress flag - resizeMu sync.Mutex // only used along with resizeCond - resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications) table atomic.Pointer[mapTable[K, V]] - minTableLen int - growOnly bool + // table being transferred to + nextTable atomic.Pointer[mapTable[K, V]] + // resize control state: combines resize sequence number (upper 59 bits) and + // the current number of resize helpers (lower 5 bits); + // odd values of resize sequence mean in-progress resize + resizeCtl atomic.Uint64 + // only used along with resizeCond + resizeMu sync.Mutex + // used to wake up resize waiters (concurrent writes) + resizeCond sync.Cond + // transfer progress index for resize + resizeIdx atomic.Int64 + minTableLen int + growOnly bool } type mapTable[K comparable, V any] struct { - buckets []bucketPadded[K, V] + buckets []bucketPadded // striped counter for number of table entries; // used to determine if a table shrinking is needed // occupies min(buckets_memory/1024, 64KB) of memory @@ -125,16 +155,16 @@ type counterStripe struct { // bucketPadded is a CL-sized map bucket holding up to // entriesPerMapBucket entries. -type bucketPadded[K comparable, V any] struct { +type bucketPadded struct { //lint:ignore U1000 ensure each bucket takes two cache lines on both 32 and 64-bit archs - pad [cacheLineSize - unsafe.Sizeof(bucket[K, V]{})]byte - bucket[K, V] + pad [cacheLineSize - unsafe.Sizeof(bucket{})]byte + bucket } -type bucket[K comparable, V any] struct { - meta atomic.Uint64 - entries [entriesPerMapBucket]atomic.Pointer[entry[K, V]] // *entry - next atomic.Pointer[bucketPadded[K, V]] // *bucketPadded +type bucket struct { + meta uint64 + entries [entriesPerMapBucket]unsafe.Pointer // *entry + next unsafe.Pointer // *bucketPadded mu sync.Mutex } @@ -194,15 +224,15 @@ func (m *Map[K, V]) init() { m.minTableLen = defaultMinMapTableLen } m.resizeCond = *sync.NewCond(&m.resizeMu) - table := newMapTable[K, V](m.minTableLen) + table := newMapTable[K, V](m.minTableLen, maphash.MakeSeed()) m.minTableLen = len(table.buckets) m.table.Store(table) } -func newMapTable[K comparable, V any](minTableLen int) *mapTable[K, V] { - buckets := make([]bucketPadded[K, V], minTableLen) +func newMapTable[K comparable, V any](minTableLen int, seed maphash.Seed) *mapTable[K, V] { + buckets := make([]bucketPadded, minTableLen) for i := range buckets { - buckets[i].meta.Store(defaultMeta) + buckets[i].meta = defaultMeta } counterLen := minTableLen >> 10 if counterLen < minMapCounterLen { @@ -214,7 +244,7 @@ func newMapTable[K comparable, V any](minTableLen int) *mapTable[K, V] { t := &mapTable[K, V]{ buckets: buckets, size: counter, - seed: maphash.MakeSeed(), + seed: seed, } return t } @@ -246,22 +276,24 @@ func (m *Map[K, V]) Load(key K) (value V, ok bool) { bidx := uint64(len(table.buckets)-1) & h1 b := &table.buckets[bidx] for { - metaw := b.meta.Load() + metaw := atomic.LoadUint64(&b.meta) markedw := markZeroBytes(metaw^h2w) & metaMask for markedw != 0 { idx := firstMarkedByteIndex(markedw) - e := b.entries[idx].Load() - if e != nil { + eptr := atomic.LoadPointer(&b.entries[idx]) + if eptr != nil { + e := (*entry[K, V])(eptr) if e.key == key { return e.value, true } } markedw &= markedw - 1 } - b = b.next.Load() - if b == nil { + bptr := atomic.LoadPointer(&b.next) + if bptr == nil { return } + b = (*bucketPadded)(bptr) } } @@ -399,7 +431,7 @@ func (m *Map[K, V]) doCompute( for { compute_attempt: var ( - emptyb *bucketPadded[K, V] + emptyb *bucketPadded emptyidx int ) table := m.table.Load() @@ -415,12 +447,13 @@ func (m *Map[K, V]) doCompute( b := rootb load: for { - metaw := b.meta.Load() + metaw := atomic.LoadUint64(&b.meta) markedw := markZeroBytes(metaw^h2w) & metaMask for markedw != 0 { idx := firstMarkedByteIndex(markedw) - e := b.entries[idx].Load() - if e != nil { + eptr := atomic.LoadPointer(&b.entries[idx]) + if eptr != nil { + e := (*entry[K, V])(eptr) if e.key == key { if loadOp == loadOrComputeOp { return e.value, true @@ -430,23 +463,24 @@ func (m *Map[K, V]) doCompute( } markedw &= markedw - 1 } - b = b.next.Load() - if b == nil { + bptr := atomic.LoadPointer(&b.next) + if bptr == nil { if loadOp == loadAndDeleteOp { return *new(V), false } break load } + b = (*bucketPadded)(bptr) } } rootb.mu.Lock() // The following two checks must go in reverse to what's // in the resize method. - if m.resizeInProgress() { - // Resize is in progress. Wait, then go for another attempt. + if seq := resizeSeq(m.resizeCtl.Load()); seq&1 == 1 { + // Resize is in progress. Help with the transfer, then go for another attempt. rootb.mu.Unlock() - m.waitForResize() + m.helpResize(seq) goto compute_attempt } if m.newerTableExists(table) { @@ -456,12 +490,13 @@ func (m *Map[K, V]) doCompute( } b := rootb for { - metaw := b.meta.Load() + metaw := b.meta markedw := markZeroBytes(metaw^h2w) & metaMask for markedw != 0 { idx := firstMarkedByteIndex(markedw) - e := b.entries[idx].Load() - if e != nil { + eptr := b.entries[idx] + if eptr != nil { + e := (*entry[K, V])(eptr) if e.key == key { // In-place update/delete. // We get a copy of the value via an interface{} on each call, @@ -475,8 +510,8 @@ func (m *Map[K, V]) doCompute( // Deletion. // First we update the hash, then the entry. newmetaw := setByte(metaw, emptyMetaSlot, idx) - b.meta.Store(newmetaw) - b.entries[idx].Store(nil) + atomic.StoreUint64(&b.meta, newmetaw) + atomic.StorePointer(&b.entries[idx], nil) rootb.mu.Unlock() table.addSize(bidx, -1) // Might need to shrink the table if we left bucket empty. @@ -488,7 +523,7 @@ func (m *Map[K, V]) doCompute( newe := new(entry[K, V]) newe.key = key newe.value = newv - b.entries[idx].Store(newe) + atomic.StorePointer(&b.entries[idx], unsafe.Pointer(newe)) case CancelOp: newv = oldv } @@ -512,7 +547,7 @@ func (m *Map[K, V]) doCompute( emptyidx = idx } } - if b.next.Load() == nil { + if b.next == nil { if emptyb != nil { // Insertion into an existing bucket. var zeroV V @@ -526,8 +561,8 @@ func (m *Map[K, V]) doCompute( newe.key = key newe.value = newValue // First we update meta, then the entry. - emptyb.meta.Store(setByte(emptyb.meta.Load(), h2, emptyidx)) - emptyb.entries[emptyidx].Store(newe) + atomic.StoreUint64(&emptyb.meta, setByte(emptyb.meta, h2, emptyidx)) + atomic.StorePointer(&emptyb.entries[emptyidx], unsafe.Pointer(newe)) rootb.mu.Unlock() table.addSize(bidx, 1) return newValue, computeOnly @@ -549,19 +584,19 @@ func (m *Map[K, V]) doCompute( return newValue, false default: // Create and append a bucket. - newb := new(bucketPadded[K, V]) - newb.meta.Store(setByte(defaultMeta, h2, 0)) + newb := new(bucketPadded) + newb.meta = setByte(defaultMeta, h2, 0) newe := new(entry[K, V]) newe.key = key newe.value = newValue - newb.entries[0].Store(newe) - b.next.Store(newb) + newb.entries[0] = unsafe.Pointer(newe) + atomic.StorePointer(&b.next, unsafe.Pointer(newb)) rootb.mu.Unlock() table.addSize(bidx, 1) return newValue, computeOnly } } - b = b.next.Load() + b = (*bucketPadded)(b.next) } } } @@ -570,13 +605,21 @@ func (m *Map[K, V]) newerTableExists(table *mapTable[K, V]) bool { return table != m.table.Load() } -func (m *Map[K, V]) resizeInProgress() bool { - return m.resizing.Load() +func resizeSeq(ctl uint64) uint64 { + return ctl >> 5 +} + +func resizeHelpers(ctl uint64) uint64 { + return ctl & maxResizeHelpersLimit +} + +func resizeCtl(seq uint64, helpers uint64) uint64 { + return (seq << 5) | (helpers & maxResizeHelpersLimit) } func (m *Map[K, V]) waitForResize() { m.resizeMu.Lock() - for m.resizeInProgress() { + for resizeSeq(m.resizeCtl.Load())&1 == 1 { m.resizeCond.Wait() } m.resizeMu.Unlock() @@ -593,9 +636,9 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) { } } // Slow path. - if !m.resizing.CompareAndSwap(false, true) { - // Someone else started resize. Wait for it to finish. - m.waitForResize() + seq := resizeSeq(m.resizeCtl.Load()) + if seq&1 == 1 || !m.resizeCtl.CompareAndSwap(resizeCtl(seq, 0), resizeCtl(seq+1, 0)) { + m.helpResize(seq) return } var newTable *mapTable[K, V] @@ -604,64 +647,189 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) { switch hint { case mapGrowHint: // Grow the table with factor of 2. + // We must keep the same table seed here to keep the same hash codes + // allowing us to avoid locking destination buckets when resizing. m.totalGrowths.Add(1) - newTable = newMapTable[K, V](tableLen << 1) + newTable = newMapTable[K, V](tableLen<<1, table.seed) case mapShrinkHint: shrinkThreshold := int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold { // Shrink the table with factor of 2. + // It's fine to generate a new seed since full locking + // is required anyway. m.totalShrinks.Add(1) - newTable = newMapTable[K, V](tableLen >> 1) + newTable = newMapTable[K, V](tableLen>>1, maphash.MakeSeed()) } else { // No need to shrink. Wake up all waiters and give up. m.resizeMu.Lock() - m.resizing.Store(false) + m.resizeCtl.Store(resizeCtl(seq+2, 0)) m.resizeCond.Broadcast() m.resizeMu.Unlock() return } case mapClearHint: - newTable = newMapTable[K, V](m.minTableLen) + newTable = newMapTable[K, V](m.minTableLen, maphash.MakeSeed()) default: panic(fmt.Sprintf("unexpected resize hint: %d", hint)) } + // Copy the data only if we're not clearing the map. if hint != mapClearHint { - for i := 0; i < tableLen; i++ { - copied := copyBucket(&table.buckets[i], newTable) - newTable.addSizePlain(uint64(i), copied) - } + // Set up cooperative transfer state. + // Next table must be published as the last step. + m.resizeIdx.Store(0) + m.nextTable.Store(newTable) + // Copy the buckets. + m.transfer(table, newTable) + } + + // We're about to publish the new table, but before that + // we must wait for all helpers to finish. + for resizeHelpers(m.resizeCtl.Load()) != 0 { + runtime.Gosched() } - // Publish the new table and wake up all waiters. m.table.Store(newTable) + m.nextTable.Store(nil) + ctl := resizeCtl(seq+1, 0) + newCtl := resizeCtl(seq+2, 0) + // Increment the sequence number and wake up all waiters. m.resizeMu.Lock() - m.resizing.Store(false) + // There may be slowpoke helpers who have just incremented + // the helper counter. This CAS loop makes sure to wait + // for them to back off. + for !m.resizeCtl.CompareAndSwap(ctl, newCtl) { + runtime.Gosched() + } m.resizeCond.Broadcast() m.resizeMu.Unlock() } -func copyBucket[K comparable, V any]( - b *bucketPadded[K, V], +func (m *Map[K, V]) helpResize(seq uint64) { + for { + table := m.table.Load() + nextTable := m.nextTable.Load() + if resizeSeq(m.resizeCtl.Load()) == seq { + if nextTable == nil || nextTable == table { + // Carry on until the next table is set by the main + // resize goroutine or until the resize finishes. + runtime.Gosched() + continue + } + // The resize is still in-progress, so let's try registering + // as a helper. + for { + ctl := m.resizeCtl.Load() + if resizeSeq(ctl) != seq || resizeHelpers(ctl) >= uint64(maxResizeHelpers) { + // The resize has ended or there are too many helpers. + break + } + if m.resizeCtl.CompareAndSwap(ctl, ctl+1) { + // Yay, we're a resize helper! + m.transfer(table, nextTable) + // Don't forget to unregister as a helper. + m.resizeCtl.Add(^uint64(0)) + break + } + } + m.waitForResize() + } + break + } +} + +func (m *Map[K, V]) transfer(table, newTable *mapTable[K, V]) { + tableLen := len(table.buckets) + newTableLen := len(newTable.buckets) + stride := (tableLen >> 3) / int(maxResizeHelpers) + if stride < minResizeTransferStride { + stride = minResizeTransferStride + } + for { + // Claim work by incrementing resizeIdx. + nextIdx := m.resizeIdx.Add(int64(stride)) + start := int(nextIdx) - stride + if start < 0 { + start = 0 + } + if start > tableLen { + break + } + end := int(nextIdx) + if end > tableLen { + end = tableLen + } + // Transfer buckets in this range. + total := 0 + if newTableLen > tableLen { + // We're growing the table with 2x multiplier, so entries from a N bucket can + // only be transferred to N and 2*N buckets in the new table. Thus, destination + // buckets written by the resize helpers don't intersect, so we don't need to + // acquire locks in the destination buckets. + for i := start; i < end; i++ { + total += transferBucketUnsafe(&table.buckets[i], newTable) + } + } else { + // We're shrinking the table, so all locks must be acquired. + for i := start; i < end; i++ { + total += transferBucket(&table.buckets[i], newTable) + } + } + // The exact counter stripe doesn't matter here, so pick up the one + // that corresponds to the start value to avoid contention. + newTable.addSize(uint64(start), total) + } +} + +// Doesn't acquire dest bucket lock. +func transferBucketUnsafe[K comparable, V any]( + b *bucketPadded, destTable *mapTable[K, V], ) (copied int) { rootb := b rootb.mu.Lock() for { for i := 0; i < entriesPerMapBucket; i++ { - if e := b.entries[i].Load(); e != nil { + if eptr := b.entries[i]; eptr != nil { + e := (*entry[K, V])(eptr) hash := maphash.Comparable(destTable.seed, e.key) bidx := uint64(len(destTable.buckets)-1) & h1(hash) destb := &destTable.buckets[bidx] - appendToBucket(h2(hash), b.entries[i].Load(), destb) + appendToBucket(h2(hash), e, destb) copied++ } } - if next := b.next.Load(); next == nil { + if b.next == nil { rootb.mu.Unlock() return - } else { - b = next } + b = (*bucketPadded)(b.next) + } +} + +func transferBucket[K comparable, V any]( + b *bucketPadded, + destTable *mapTable[K, V], +) (copied int) { + rootb := b + rootb.mu.Lock() + for { + for i := 0; i < entriesPerMapBucket; i++ { + if eptr := b.entries[i]; eptr != nil { + e := (*entry[K, V])(eptr) + hash := maphash.Comparable(destTable.seed, e.key) + bidx := uint64(len(destTable.buckets)-1) & h1(hash) + destb := &destTable.buckets[bidx] + destb.mu.Lock() + appendToBucket(h2(hash), e, destb) + destb.mu.Unlock() + copied++ + } + } + if b.next == nil { + rootb.mu.Unlock() + return + } + b = (*bucketPadded)(b.next) } } @@ -691,16 +859,15 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) { rootb.mu.Lock() for { for i := 0; i < entriesPerMapBucket; i++ { - if entry := b.entries[i].Load(); entry != nil { - bentries = append(bentries, entry) + if b.entries[i] != nil { + bentries = append(bentries, (*entry[K, V])(b.entries[i])) } } - if next := b.next.Load(); next == nil { + if b.next == nil { rootb.mu.Unlock() break - } else { - b = next } + b = (*bucketPadded)(b.next) } // Call the function for all copied entries. for j, e := range bentries { @@ -727,24 +894,25 @@ func (m *Map[K, V]) Size() int { return int(m.table.Load().sumSize()) } -func appendToBucket[K comparable, V any](h2 uint8, e *entry[K, V], b *bucketPadded[K, V]) { +// It is safe to use plain stores here because the destination bucket must be +// either locked or exclusively written to by the helper during resize. +func appendToBucket[K comparable, V any](h2 uint8, e *entry[K, V], b *bucketPadded) { for { for i := 0; i < entriesPerMapBucket; i++ { - if b.entries[i].Load() == nil { - b.meta.Store(setByte(b.meta.Load(), h2, i)) - b.entries[i].Store(e) + if b.entries[i] == nil { + b.meta = setByte(b.meta, h2, i) + b.entries[i] = unsafe.Pointer(e) return } } - if next := b.next.Load(); next == nil { - newb := new(bucketPadded[K, V]) - newb.meta.Store(setByte(defaultMeta, h2, 0)) - newb.entries[0].Store(e) - b.next.Store(newb) + if b.next == nil { + newb := new(bucketPadded) + newb.meta = setByte(defaultMeta, h2, 0) + newb.entries[0] = unsafe.Pointer(e) + b.next = unsafe.Pointer(newb) return - } else { - b = next } + b = (*bucketPadded)(b.next) } } @@ -753,11 +921,6 @@ func (table *mapTable[K, V]) addSize(bucketIdx uint64, delta int) { atomic.AddInt64(&table.size[cidx].c, int64(delta)) } -func (table *mapTable[K, V]) addSizePlain(bucketIdx uint64, delta int) { - cidx := uint64(len(table.size)-1) & bucketIdx - table.size[cidx].c += int64(delta) -} - func (table *mapTable[K, V]) sumSize() int64 { sum := int64(0) for i := range table.size { @@ -856,7 +1019,7 @@ func (m *Map[K, V]) Stats() MapStats { nentriesLocal := 0 stats.Capacity += entriesPerMapBucket for i := 0; i < entriesPerMapBucket; i++ { - if b.entries[i].Load() != nil { + if atomic.LoadPointer(&b.entries[i]) != nil { stats.Size++ nentriesLocal++ } @@ -865,11 +1028,10 @@ func (m *Map[K, V]) Stats() MapStats { if nentriesLocal == 0 { stats.EmptyBuckets++ } - if next := b.next.Load(); next == nil { + if b.next == nil { break - } else { - b = next } + b = (*bucketPadded)(atomic.LoadPointer(&b.next)) stats.TotalBuckets++ } if nentries < stats.MinEntries { @@ -906,6 +1068,15 @@ func nextPowOf2(v uint32) uint32 { return v } +func parallelism() uint32 { + maxProcs := uint32(runtime.GOMAXPROCS(0)) + numCores := uint32(runtime.NumCPU()) + if maxProcs < numCores { + return maxProcs + } + return numCores +} + func broadcast(b uint8) uint64 { return 0x101010101010101 * uint64(b) } @@ -920,6 +1091,7 @@ func markZeroBytes(w uint64) uint64 { return ((w - 0x0101010101010101) & (^w) & 0x8080808080808080) } +// Sets byte of the input word at the specified index to the given value. func setByte(w uint64, b uint8, idx int) uint64 { shift := idx << 3 return (w &^ (0xff << shift)) | (uint64(b) << shift) diff --git a/common/xsync/map_test.go b/common/xsync/map_test.go index b40d412b..72ebfaea 100644 --- a/common/xsync/map_test.go +++ b/common/xsync/map_test.go @@ -3,6 +3,7 @@ package xsync import ( "math" "math/rand" + "runtime" "strconv" "sync" "sync/atomic" @@ -53,11 +54,11 @@ func runParallel(b *testing.B, benchFn func(pb *testing.PB)) { } func TestMap_BucketStructSize(t *testing.T) { - size := unsafe.Sizeof(bucketPadded[string, int64]{}) + size := unsafe.Sizeof(bucketPadded{}) if size != 64 { t.Fatalf("size of 64B (one cache line) is expected, got: %d", size) } - size = unsafe.Sizeof(bucketPadded[struct{}, int32]{}) + size = unsafe.Sizeof(bucketPadded{}) if size != 64 { t.Fatalf("size of 64B (one cache line) is expected, got: %d", size) } @@ -743,10 +744,7 @@ func TestNewMapGrowOnly_OnlyShrinksOnClear(t *testing.T) { } func TestMapResize(t *testing.T) { - testMapResize(t, NewMap[string, int]()) -} - -func testMapResize(t *testing.T, m *Map[string, int]) { + m := NewMap[string, int]() const numEntries = 100_000 for i := 0; i < numEntries; i++ { @@ -810,6 +808,147 @@ func TestMapResize_CounterLenLimit(t *testing.T) { } } +func testParallelResize(t *testing.T, numGoroutines int) { + m := NewMap[int, int]() + + // Fill the map to trigger resizing + const initialEntries = 10000 + const newEntries = 5000 + for i := 0; i < initialEntries; i++ { + m.Store(i, i*2) + } + + // Start concurrent operations that should trigger helping behavior + var wg sync.WaitGroup + + // Launch goroutines that will encounter resize operations + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + // Perform many operations to trigger resize and helping + for i := 0; i < newEntries; i++ { + key := goroutineID*newEntries + i + initialEntries + m.Store(key, key*2) + + // Verify the value + if val, ok := m.Load(key); !ok || val != key*2 { + t.Errorf("Failed to load key %d: got %v, %v", key, val, ok) + return + } + } + }(g) + } + + wg.Wait() + + // Verify all entries are present + finalSize := m.Size() + expectedSize := initialEntries + numGoroutines*newEntries + if finalSize != expectedSize { + t.Errorf("Expected size %d, got %d", expectedSize, finalSize) + } + + stats := m.Stats() + if stats.TotalGrowths == 0 { + t.Error("Expected at least one table growth due to concurrent operations") + } +} + +func TestMapParallelResize(t *testing.T) { + testParallelResize(t, 1) + testParallelResize(t, runtime.GOMAXPROCS(0)) + testParallelResize(t, 100) +} + +func testParallelResizeWithSameKeys(t *testing.T, numGoroutines int) { + m := NewMap[int, int]() + + // Fill the map to trigger resizing + const entries = 1000 + for i := 0; i < entries; i++ { + m.Store(2*i, 2*i) + } + + // Start concurrent operations that should trigger helping behavior + var wg sync.WaitGroup + + // Launch goroutines that will encounter resize operations + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < 10*entries; i++ { + m.Store(i, i) + } + }(g) + } + + wg.Wait() + + // Verify all entries are present + finalSize := m.Size() + expectedSize := 10 * entries + if finalSize != expectedSize { + t.Errorf("Expected size %d, got %d", expectedSize, finalSize) + } + + stats := m.Stats() + if stats.TotalGrowths == 0 { + t.Error("Expected at least one table growth due to concurrent operations") + } +} + +func TestMapParallelResize_IntersectingKeys(t *testing.T) { + testParallelResizeWithSameKeys(t, 1) + testParallelResizeWithSameKeys(t, runtime.GOMAXPROCS(0)) + testParallelResizeWithSameKeys(t, 100) +} + +func testParallelShrinking(t *testing.T, numGoroutines int) { + m := NewMap[int, int]() + + // Fill the map to trigger resizing + const entries = 100000 + for i := 0; i < entries; i++ { + m.Store(i, i) + } + + // Start concurrent operations that should trigger helping behavior + var wg sync.WaitGroup + + // Launch goroutines that will encounter resize operations + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < entries; i++ { + m.Delete(i) + } + }(g) + } + + wg.Wait() + + // Verify all entries are present + finalSize := m.Size() + if finalSize != 0 { + t.Errorf("Expected size 0, got %d", finalSize) + } + + stats := m.Stats() + if stats.TotalShrinks == 0 { + t.Error("Expected at least one table shrinking due to concurrent operations") + } +} + +func TestMapParallelShrinking(t *testing.T) { + testParallelShrinking(t, 1) + testParallelShrinking(t, runtime.GOMAXPROCS(0)) + testParallelShrinking(t, 100) +} + func parallelSeqMapGrower(m *Map[int, int], numEntries int, positive bool, cdone chan bool) { for i := 0; i < numEntries; i++ { if positive { @@ -1459,7 +1598,7 @@ func BenchmarkMapRange(b *testing.B) { } // Benchmarks noop performance of Compute -func BenchmarkCompute(b *testing.B) { +func BenchmarkMapCompute(b *testing.B) { tests := []struct { Name string Op ComputeOp @@ -1487,6 +1626,57 @@ func BenchmarkCompute(b *testing.B) { } } +func BenchmarkMapParallelRehashing(b *testing.B) { + tests := []struct { + name string + goroutines int + numEntries int + }{ + {"1goroutine_10M", 1, 10_000_000}, + {"4goroutines_10M", 4, 10_000_000}, + {"8goroutines_10M", 8, 10_000_000}, + } + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + m := NewMap[int, int]() + + var wg sync.WaitGroup + entriesPerGoroutine := test.numEntries / test.goroutines + + start := time.Now() + + for g := 0; g < test.goroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + base := goroutineID * entriesPerGoroutine + for j := 0; j < entriesPerGoroutine; j++ { + key := base + j + m.Store(key, key) + } + }(g) + } + + wg.Wait() + duration := time.Since(start) + + b.ReportMetric(float64(test.numEntries)/duration.Seconds(), "entries/s") + + finalSize := m.Size() + if finalSize != test.numEntries { + b.Fatalf("Expected size %d, got %d", test.numEntries, finalSize) + } + + stats := m.Stats() + if stats.TotalGrowths == 0 { + b.Error("Expected at least one table growth during rehashing") + } + } + }) + } +} + func TestNextPowOf2(t *testing.T) { if nextPowOf2(0) != 1 { t.Error("nextPowOf2 failed")