From 4e06040d80875986433538e22a6ab5144b44d558 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 2 Feb 2023 18:51:50 -0800 Subject: [PATCH 1/5] feat(blocks): cache all cids in memory in pruner for performance, keep a list of cids in memory --- blocks/randompruner.go | 111 ++++++++++++++++++++++++++++------------- 1 file changed, 77 insertions(+), 34 deletions(-) diff --git a/blocks/randompruner.go b/blocks/randompruner.go index 0cfdc13..de6266d 100644 --- a/blocks/randompruner.go +++ b/blocks/randompruner.go @@ -35,6 +35,11 @@ type RandomPrunerConfig struct { PinDuration time.Duration } +type cidStatus struct { + pinned bool + pinTime time.Time +} + // RandomPruner is a blockstore wrapper which removes blocks at random when disk // space is exhausted type RandomPruner struct { @@ -48,12 +53,25 @@ type RandomPruner struct { // A list of "hot" CIDs which should not be deleted, and when they were last // used - pins map[cid.Cid]time.Time - pinsLk sync.Mutex + allCids map[cid.Cid]*cidStatus + allCidsLk sync.Mutex + lastAllCidsUpdate time.Time pruneLk sync.Mutex } +var cidStatusPool = sync.Pool{ + New: func() any { + return new(cidStatus) + }, +} + +// 128K +const approxBlockSizeBytes = 1 << 17 + +// 40 Bytes +const approxCidSizeBytes = 40 + // The datastore that was used to create the blockstore is a required parameter // used for calculating remaining disk space - the Blockstore interface itself // does not provide this information @@ -82,6 +100,8 @@ func NewRandomPruner( log.Infof("Initialized pruner's tracked size as %s", humanize.IBytes(size)) + var cidMapApproxSize uint64 = cfg.Threshold * approxCidSizeBytes / approxBlockSizeBytes + pruner := &RandomPruner{ Blockstore: inner, datastore: datastore, @@ -89,8 +109,7 @@ func NewRandomPruner( pruneBytes: cfg.PruneBytes, pinDuration: cfg.PinDuration, size: size, - - pins: make(map[cid.Cid]time.Time), + allCids: make(map[cid.Cid]*cidStatus, cidMapApproxSize), } // Poll immediately on startup and then periodically @@ -115,6 +134,13 @@ func (pruner *RandomPruner) DeleteBlock(ctx context.Context, cid cid.Cid) error } pruner.size -= uint64(blockSize) + pruner.allCidsLk.Lock() + cs, ok := pruner.allCids[cid] + delete(pruner.allCids, cid) + if ok { + cidStatusPool.Put(cs) + } + pruner.allCidsLk.Unlock() return nil } @@ -190,12 +216,40 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro log.Infof("Starting prune operation with original datastore size of %s", humanize.IBytes(pruner.size)) start := time.Now() - // Get all the keys in the blockstore - allCids, err := pruner.AllKeysChan(ctx) - if err != nil { - return err + // periodically sync the in memory list of all cids with disk + if time.Since(pruner.lastAllCidsUpdate) > time.Hour { + diskCids, err := pruner.AllKeysChan(ctx) + if err != nil { + return err + } + pruner.allCidsLk.Lock() + inMemoryCids := make(map[cid.Cid]struct{}, len(pruner.allCids)) + for cid := range pruner.allCids { + inMemoryCids[cid] = struct{}{} + } + for diskCid := range diskCids { + if ctx.Err() != nil { + pruner.allCidsLk.Unlock() + return ctx.Err() + } + _, existing := pruner.allCids[diskCid] + if existing { + delete(inMemoryCids, diskCid) + } else { + cs := cidStatusPool.Get().(*cidStatus) + cs.pinned = false + pruner.allCids[diskCid] = cs + } + } + // delete remaining in memory cids that aren't on disk + for inMemoryCid := range inMemoryCids { + cs := pruner.allCids[inMemoryCid] + delete(pruner.allCids, inMemoryCid) + cidStatusPool.Put(cs) + } + pruner.allCidsLk.Unlock() + pruner.lastAllCidsUpdate = time.Now() } - tmpFile, err := os.Create(path.Join(os.TempDir(), "autoretrieve-prune.txt")) if err != nil { return err @@ -206,18 +260,19 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro const cidPadLength = 64 writer := bufio.NewWriter(tmpFile) cidCount := 0 - for cid := range allCids { - if ctx.Err() != nil { - return ctx.Err() - } - // Don't consider pinned blocks for deletion - if pruner.isPinned(cid) { + pruner.allCidsLk.Lock() + for cid, status := range pruner.allCids { + + if status.pinned && time.Since(status.pinTime) < pruner.pinDuration { continue } + status.pinned = false + paddedCidStr := fmt.Sprintf("%-*s", cidPadLength, cid.String()) if _, err := writer.WriteString(paddedCidStr + "\n"); err != nil { + pruner.allCidsLk.Unlock() return fmt.Errorf("failed to write cid to tmp file: %w", err) } cidCount++ @@ -295,25 +350,13 @@ func (pruner *RandomPruner) updatePin(pin cid.Cid) { emptyPinClone := make([]byte, 0, len(pin.Bytes())) pinClone := append(emptyPinClone, pin.Bytes()...) pin = cid.MustParse(pinClone) - pruner.pinsLk.Lock() - pruner.pins[pin] = time.Now() - pruner.pinsLk.Unlock() -} - -func (pruner *RandomPruner) isPinned(cid cid.Cid) bool { - pruner.pinsLk.Lock() - defer pruner.pinsLk.Unlock() - - lastUse, ok := pruner.pins[cid] + pruner.allCidsLk.Lock() + cs, ok := pruner.allCids[pin] if !ok { - return false + cs = cidStatusPool.Get().(*cidStatus) + pruner.allCids[pin] = cs } - - if time.Since(lastUse) < pruner.pinDuration { - return true - } - - delete(pruner.pins, cid) - return false - + cs.pinned = true + cs.pinTime = time.Now() + pruner.allCidsLk.Unlock() } From f1ef9540ad5c9334a2de38e446bd9acafd29a587 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Fri, 3 Feb 2023 10:32:39 -0800 Subject: [PATCH 2/5] Update blocks/randompruner.go Co-authored-by: Rod Vagg --- blocks/randompruner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blocks/randompruner.go b/blocks/randompruner.go index de6266d..a8e401b 100644 --- a/blocks/randompruner.go +++ b/blocks/randompruner.go @@ -136,8 +136,8 @@ func (pruner *RandomPruner) DeleteBlock(ctx context.Context, cid cid.Cid) error pruner.size -= uint64(blockSize) pruner.allCidsLk.Lock() cs, ok := pruner.allCids[cid] - delete(pruner.allCids, cid) if ok { + delete(pruner.allCids, cid) cidStatusPool.Put(cs) } pruner.allCidsLk.Unlock() From 0f533385b60aef44c48172c08b8c5e5644588e44 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Fri, 3 Feb 2023 10:32:56 -0800 Subject: [PATCH 3/5] Update blocks/randompruner.go Co-authored-by: Rod Vagg --- blocks/randompruner.go | 1 - 1 file changed, 1 deletion(-) diff --git a/blocks/randompruner.go b/blocks/randompruner.go index a8e401b..a35ac02 100644 --- a/blocks/randompruner.go +++ b/blocks/randompruner.go @@ -263,7 +263,6 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro pruner.allCidsLk.Lock() for cid, status := range pruner.allCids { - if status.pinned && time.Since(status.pinTime) < pruner.pinDuration { continue } From 8e60efc5736697757aefaed7d1497c633fdfcc17 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Fri, 3 Feb 2023 10:33:36 -0800 Subject: [PATCH 4/5] Update blocks/randompruner.go Co-authored-by: Rod Vagg --- blocks/randompruner.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/blocks/randompruner.go b/blocks/randompruner.go index a35ac02..35006e8 100644 --- a/blocks/randompruner.go +++ b/blocks/randompruner.go @@ -270,6 +270,10 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro status.pinned = false paddedCidStr := fmt.Sprintf("%-*s", cidPadLength, cid.String()) + if len(paddedCidStr) > cidPadLength { + paddedCidStr = paddedCidStr[:cidPadLength] + } +) if _, err := writer.WriteString(paddedCidStr + "\n"); err != nil { pruner.allCidsLk.Unlock() return fmt.Errorf("failed to write cid to tmp file: %w", err) From a1610e9a0f04c28bda3308c0eb28311ae7508c01 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 7 Feb 2023 13:04:53 -0800 Subject: [PATCH 5/5] Update blocks/randompruner.go Co-authored-by: Rod Vagg --- blocks/randompruner.go | 1 - 1 file changed, 1 deletion(-) diff --git a/blocks/randompruner.go b/blocks/randompruner.go index 35006e8..db24154 100644 --- a/blocks/randompruner.go +++ b/blocks/randompruner.go @@ -273,7 +273,6 @@ func (pruner *RandomPruner) prune(ctx context.Context, bytesToPrune uint64) erro if len(paddedCidStr) > cidPadLength { paddedCidStr = paddedCidStr[:cidPadLength] } -) if _, err := writer.WriteString(paddedCidStr + "\n"); err != nil { pruner.allCidsLk.Unlock() return fmt.Errorf("failed to write cid to tmp file: %w", err)