diff --git a/CHANGELOG.md b/CHANGELOG.md index 73e5429c8a..0097fb829b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) + ## v1.1.1 ### Changes diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 5e9f5ff4f5..96ee82d8b2 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -58,6 +58,19 @@ func (c *Cache) isSeen(hash string) bool { return c.hashes[hash] } +// areSeen checks which hashes have been seen. Returns a boolean slice +// parallel to the input where result[i] is true if hashes[i] is in the +// cache. Acquires the read lock once for the entire batch. +func (c *Cache) areSeen(hashes []string) []bool { + c.mu.RLock() + defer c.mu.RUnlock() + result := make([]bool, len(hashes)) + for i, h := range hashes { + result[i] = c.hashes[h] + } + return result +} + func (c *Cache) setSeen(hash string, height uint64) { c.mu.Lock() defer c.mu.Unlock() @@ -75,6 +88,31 @@ func (c *Cache) removeSeen(hash string) { delete(c.hashes, hash) } +// setSeenBatch marks all hashes as seen under a single write lock. +// For height 0 (transactions), the hashByHeight bookkeeping is skipped +// since all txs share the same sentinel height — the map lookup and +// overwrite on every entry is pure overhead with no benefit. +func (c *Cache) setSeenBatch(hashes []string, height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + if height == 0 { + for _, h := range hashes { + c.hashes[h] = true + } + return + } + + // currently not used, but there for compleness against setSeen + for _, h := range hashes { + if existing, ok := c.hashByHeight[height]; ok && existing == h { + c.hashes[existing] = true + continue + } + c.hashes[h] = true + c.hashByHeight[height] = h + } +} + func (c *Cache) getDAIncluded(hash string) (uint64, bool) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index a91b1410d7..e907002600 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -52,6 +52,7 @@ type CacheManager interface { // Transaction operations IsTxSeen(hash string) bool + AreTxsSeen(hashes []string) []bool SetTxSeen(hash string) SetTxsSeen(hashes []string) CleanupOldTxs(olderThan time.Duration) int @@ -204,6 +205,10 @@ func (m *implementation) IsTxSeen(hash string) bool { return m.txCache.isSeen(hash) } +func (m *implementation) AreTxsSeen(hashes []string) []bool { + return m.txCache.areSeen(hashes) +} + func (m *implementation) SetTxSeen(hash string) { // Use 0 as height since transactions don't have a block height yet m.txCache.setSeen(hash, 0) @@ -212,9 +217,9 @@ func (m *implementation) SetTxSeen(hash string) { } func (m *implementation) SetTxsSeen(hashes []string) { + m.txCache.setSeenBatch(hashes, 0) now := time.Now() for _, hash := range hashes { - m.txCache.setSeen(hash, 0) m.txTimestamps.Store(hash, now) } } diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 796505e462..d35dbfff3e 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -143,11 +143,6 @@ func (r *Reaper) Stop() error { return nil } -type pendingTx struct { - tx []byte - hash string -} - func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { var totalSubmitted int @@ -175,16 +170,32 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { break } - filtered := r.filterNewTxs(txs) - if len(filtered) == 0 { + hashes := hashTxs(txs) + seen := r.cache.AreTxsSeen(hashes) + + newTxs := make([][]byte, 0, len(txs)) + newHashes := make([]string, 0, len(txs)) + for i, tx := range txs { + if !seen[i] { + newTxs = append(newTxs, tx) + newHashes = append(newHashes, hashes[i]) + } + } + + if len(newTxs) == 0 { break } - n, err := r.submitFiltered(filtered) + _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte(r.chainID), + Batch: &coresequencer.Batch{Transactions: newTxs}, + }) if err != nil { - return totalSubmitted > 0, err + return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } - totalSubmitted += n + + r.cache.SetTxsSeen(newHashes) + totalSubmitted += len(newTxs) } if totalSubmitted > 0 { @@ -194,38 +205,16 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { return totalSubmitted > 0, nil } -func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx { - pending := make([]pendingTx, 0, len(txs)) - for _, tx := range txs { - h := hashTx(tx) - if !r.cache.IsTxSeen(h) { - pending = append(pending, pendingTx{tx: tx, hash: h}) - } - } - return pending -} - -func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) { - txs := make([][]byte, len(batch)) - hashes := make([]string, len(batch)) - for i, p := range batch { - txs[i] = p.tx - hashes[i] = p.hash +func hashTxs(txs [][]byte) []string { + hashes := make([]string, len(txs)) + for i, tx := range txs { + h := sha256.Sum256(tx) + hashes[i] = hex.EncodeToString(h[:]) } - - _, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ - Id: []byte(r.chainID), - Batch: &coresequencer.Batch{Transactions: txs}, - }) - if err != nil { - return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) - } - - r.cache.SetTxsSeen(hashes) - return len(txs), nil + return hashes } func hashTx(tx []byte) string { - hash := sha256.Sum256(tx) - return hex.EncodeToString(hash[:]) + h := sha256.Sum256(tx) + return hex.EncodeToString(h[:]) }