Conversation
…pand_available Refactors ExpandBeam trait to support pipelined IO through queue semantics: - submit_expand(): queue node IDs for async IO (no-op for non-pipelined) - expand_available(): expand nodes with completed IO (sync for non-pipelined) - has_pending(): check for in-flight IO operations Refactors search_internal() to use the queue-based loop with adaptive beam width and relaxed monotonicity support. Creates PipelinedDiskAccessor implementing ExpandBeam with io_uring-based IO, integrated via DiskIndexSearcher.search_pipelined(). Key advantage over PrefetchBeam approach: single trait instead of two, simpler loop, no leaked abstraction. All 14 existing ExpandBeam impls work unchanged via backwards-compatible defaults. Benchmark results (SIFT-128d, 10 queries): BeamSearch: 597-202 QPS, 89-92% recall PipeSearch: 734-286 QPS, 72-91% recall UnifiedPipeSearch: 624-258 QPS, 93-92% recall
- submit_expand() now checks the shared node cache (Arc<Cache<Data>>) before issuing io_uring reads. Cached nodes are served instantly without disk IO, matching the DiskAccessor's CachedDiskVertexProvider behavior. - Added io_count and cache_hits tracking with shared PipelinedIoStats (atomic counters) so search_pipelined() can populate QueryStatistics accurately. IO counts now show in benchmark output. - PipelinedConfig now carries Arc<Cache<Data>> and benchmark extracts the cache from DiskVertexProviderFactory before passing it.
- Guard submit_expand against reusing io_uring slots still in-flight - Add Drop impl to PipelinedReader to drain all in-flight IOs before freeing buffers (prevents kernel DMA into freed memory) - Use f32::total_cmp in post-processor sort for NaN safety
Wire the existing relaxed monotonicity support in search_internal through PipelinedConfig and the benchmark SearchMode enum, giving UnifiedPipeSearch feature parity with PipeSearch.
Add adaptive_beam_width bool to PipelinedConfig and SearchMode enum (defaults to true for backwards compatibility). Benchmark ablation shows SQPOLL is harmful for the queue-based submit/poll pattern.
- expand_available now takes an up_to limit on nodes to expand per call - search_internal uses process-one-submit-N pattern when pipelining: expand 1 node, then submit as many new IOs as were expanded (max 1) - This matches PipeSearch's tight loop behavior while keeping the generic search loop clean - Result: recall restored to 96.6% at L=100, IOs match BeamSearch, QPS within 15% of hand-tuned PipeSearch
PipelinedDiskAccessor now borrows a pooled PipelinedScratch (io_uring ring + file descriptor + PQ buffers) via ObjectPool instead of creating them fresh per query. This eliminates the two largest per-query allocation costs. Closes the QPS gap with PipeSearch from ~15% to <5%.
- Make expand_available non-blocking: return 0 instead of blocking when no data is loaded, letting the search loop submit more IOs - Add submission rank to LoadedNode/InFlightIo so expand_available processes the highest-priority (earliest-submitted) node first - Add std::hint::spin_loop() when nothing to submit or expand, reducing tail latency by ~50% at p99.9 - Add inflight_count() to ExpandBeam trait for submission gating - Keep inflight cap at cur_beam_width to avoid priority queue over-commitment
- New search_trace module in diskann-disk with SearchTrace, TraceEvent, SearchProfile, and OptionalTrace (zero-cost when disabled) - Events: Submit, Complete, CacheHit, Expand (with FP distance + neighbor counts), SpinWait, Done - Profile counters: io_poll, io_submit, fp_distance, pq_distance, queue_ops, spin_wait, parse_node (all in microseconds) - Instrumented PipeSearch: poll, submit, complete, expand phases - Instrumented UnifiedPipe: drain_completions, submit_expand, expand_available with per-phase profiling - Both accept optional trace (None = zero overhead in production)
When DISKANN_TRACE=1, both PipeSearch and UnifiedPipeSearch print per-query profile summaries to stderr. PipelinedConfig gets a trace_enabled field. Enables aggregate profiling analysis.
Speculative submission: nodes are no longer marked visited when their IO is submitted. Instead, a separate 'submitted' HashSet tracks which nodes have been submitted but not yet expanded. Nodes are only marked visited after actual expansion, matching PipeSearch's decoupled flag/visited model. Key changes: - Queue: add peek_best_unsubmitted() and mark_visited_by_id() (13 new tests) - Search loop: use peek_best_unsubmitted for pipelined submission, mark_visited_by_id after expansion, pass queue-ordered IDs to expand_available for best-available node selection - Accessor: expand_available prefers nodes by caller-supplied queue order (first loaded match), falling back to submission rank. Tracks expanded IDs via last_expanded_ids() for the search loop. - ExpandBeam trait: add last_expanded_ids() with empty default - Non-pipelined path completely unchanged (uses closest_notvisited)
- Add is_pipelined() to ExpandBeam trait (default false), true for pipelined - Use peek_best_unsubmitted from first iteration when pipelined - has_pending() only counts in-flight IOs, not stale loaded_nodes - Remove rank fallback in expand_available — stale nodes are abandoned - Result: IO/hop pattern matches PipeSearch (35/20 at L=10)
…letion Three key changes that close the recall gap between UnifiedPipe and PipeSearch: 1. Full retset reranking: post_process now uses ALL entries in distance_cache (every expanded node's fp-distance), not just candidates remaining in the priority queue. This matches PipeANN's full_retset approach where every expanded node contributes to results regardless of its PQ distance ranking. 2. Send-1 IO pacing: when pipelining, submit exactly 1 IO per iteration (after initial burst), matching PipeANN's send_best_read_req(1). Each IO decision benefits from the most recent neighbor information. 3. Reordered loop: pipelined path now does expand→insert→submit (matching PipeANN's poll→calc→send order) instead of submit→expand. 4. FIFO completion processing: drain_completions processes one completion per call in submission order using a pending_cqe_slots buffer, matching PipeANN's while(front().finished()) pattern. Results at L=10,BW=4 (4 threads): PipeSearch: 57.86% recall, 1757 QPS UnifiedPipe: 59.49% recall, 1643 QPS (was 52.3%) Gap: UnifiedPipe now +1.6% recall (was -5.6%)
…O timing Performance optimizations: - Batch drain_completions: process ALL CQEs per poll, not one-at-a-time. Removes pending_cqe_slots buffer and per-drain HashSet allocation. - Rank-based expansion fallback: when the caller passes no priority hints, expand the loaded node with lowest rank (earliest submitted = best PQ distance at submission time). Eliminates per-iteration queue_ordered Vec allocation in the pipelined search loop. - Simplified non-pipelined expand: pass beam_nodes directly instead of rebuilding queue_ordered Vec. Statistics fix: - Track io_time and cpu_time in PipelinedDiskAccessor (accumulated Durations) - Report io_us and cpu_us through PipelinedIoStats atomics - search_pipelined now populates io_time_us and cpu_time_us in QueryStatistics Results (4 threads, BW=4, sift1m, UnifiedPipe): L=10: QPS 1637→1789 (+9%), recall 59.6%→61.8%, p999 4106→3671us L=50: QPS 829→ 896 (+8%), p999 24151→6048us QPS gap vs PipeSearch: -10%→-4% at L=10
- Merge old PipeSearch and UnifiedPipeSearch enum variants into a single PipeSearch variant that uses the queue-based ExpandBeam implementation. The old standalone PipelinedSearcher is kept (deprecated) for existing tests but removed from the benchmark SearchMode enum. - Remove DISKANN_TRACE_EVENTS debug output and per-query RESULT printing from the benchmark. DISKANN_TRACE=1 profile summaries are retained. - Update pipe-search.json example config for the new API. - serde(alias = "UnifiedPipeSearch") preserves backward-compat with old configs.
Adds diskann-benchmark/scripts/sift1m_benchmark.sh that: 1. Downloads SIFT1M from the texmex corpus (fvecs/ivecs format) 2. Converts to DiskANN binary format (fbin) using numpy 3. Builds a disk index (R=64, L=100, PQ_16) 4. Runs BeamSearch vs PipeSearch ablation at L=10,20,50,100 Supports --skip-download, --skip-build, --skip-index flags for incremental runs on the same machine. Configurable via --threads and --beam-width. Ablation results (SIFT1M, 4 threads, BW=4): L=10: Recall 52.5%→62.0%, QPS +20%, p95 -15%, p999 -14% L=50: Recall 89.9%→90.6%, QPS +23%, p95 -19%, p999 -38% L=100: Recall 96.6%→96.7%, QPS +27%, p95 -21%, p999 -48%
…with charts Fixes: - Build config now includes required search_phase (was missing, caused 'missing field search_phase' error) New features: - Thread sweep: runs BeamSearch vs PipeSearch for 1..max_threads (configurable --max-threads, --thread-stride) - Generates 2x2 chart (QPS, mean latency, p95, p99.9 vs threads) with both modes as colored lines on same plot - Outputs CSV for external plotting tools - Configurable --search-l for single L value per sweep point
This reverts commit 545ee2c.
Move VecDeque, HashMaps, and Vec scratch collections into pooled PipelinedScratch so they retain capacity across queries. Replace per-poll HashSet+VecDeque in drain_completions/wait_and_drain with linear scan and in-place swap_remove_back. Reuse neighbor_buf for PQ distance computation via split borrows. Also tracks PQ preprocess time through PipelinedIoStats so it is correctly reported (was previously 0 on the pipelined path).
Add node_pool freelist to PipelinedScratch. LoadedNode instances are acquired from the pool before parsing (reusing Vec<u8>/Vec<u32> capacity) and returned after expansion. parse_from() clears and extends existing Vecs instead of allocating new ones. Results at L=10: +10.8% QPS, -84% p999 tail latency vs prior commit.
Delete pipelined_search.rs and pipelined_searcher.rs — the standalone search loop is fully replaced by PipelinedDiskAccessor which plugs into the generic search_internal() loop via the ExpandBeam trait. Remove associated tests from disk_provider.rs and builder/core.rs. Keep pipelined_reader.rs (io_uring reader used by the accessor).
Replace unconditional beam growth with PipeANN's waste ratio tracking: - Start at initial_beam_width (default 4), grow to beam_width - Track useful vs wasted IOs after 5-hop convergence gate - Grow beam +1 only when waste ratio ≤ 10% - initial_beam_width configurable via SearchParams At BW=8: +43% QPS at L=100 vs BW=4 with minimal IO increase.
- Add peek_best_unsubmitted_with_position to track queue depth (max_marker) - Both ABW waste tracking and RM convergence now gate on max_marker reaching abw_convergence_depth (default 5), matching PipeANN's max_marker >= 5 convergence gate - Rename abw_convergence_hops → abw_convergence_depth (it's queue depth) - RM counter uses hops (node expansions) matching PipeANN semantics
- Change expand_available return type from usize to Vec<Id>, eliminating last_expanded_ids() from the ExpandBeam trait entirely. Both pipelined and non-pipelined paths now uniformly iterate the returned IDs. - Add search_with_params() to DiskIndexSearcher for full SearchParams control. - Add adaptive_beam_width and relaxed_monotonicity_l fields to SearchMode::BeamSearch in the benchmark config. - ABW with BW=8 gives +21% QPS for BeamSearch at L=100 with same recall.
- Add --abw and --rm-l N flags to enable adaptive beam width and relaxed
monotonicity variants for both BeamSearch and PipeSearch.
- Dynamic mode generation: baseline (2 modes), +ABW adds 2 more, +RM adds 2 more.
- Charts and CSV now handle arbitrary number of modes with distinct colors/markers.
- Plot title includes ABW/RM parameters when enabled.
- Switch search_mode JSON output from Debug to Display format for cleaner labels
(e.g. 'BeamSearch(abw, rm_l=200)' instead of 'BeamSearch { adaptive_beam_width: true, ... }').
1. CRITICAL: Replace round-robin slot allocation with free-list (VecDeque) in PipelinedDiskAccessor. Prevents reusing an in-flight slot when NVMe completions arrive out-of-order. Mark submit_read as unsafe fn. 2. HIGH: Change PipelinedReader::reset() to call drain_all(), blocking until all kernel IOs complete before the scratch is reused. Prevents stale CQEs from corrupting subsequent queries. 3. HIGH: Use cursor in peek_best_unsubmitted_with_position and mark_visited_by_id (scan from self.cursor instead of 0). Restores O(1) amortized node selection for non-pipelined search, fixing O(L²) regression from the unified loop. 4. MEDIUM: Replace expanded_ids.clone() with std::mem::take() in pipelined expand_available, eliminating per-call Vec allocation.
- Collapse nested if-let chains in ABW/RM logic (index.rs) - Remove dead code: max_slots() fn and pq_distances() wrapper (pipelined_accessor.rs) - Simplify async fn signatures (pipelined_accessor.rs) - Remove unnecessary f64 cast (search_disk_index.rs) - Remove unused MAX_IO_CONCURRENCY import - Fix indentation after async move removal
67c5ea0 to
2ee89bc
Compare
|
This is really cool! Awesome work! One thing I can't help but think is that it adds considerable complexity to the core search algorithm, even in situations where this pattern is not needed. This makes me think that we need to restructure the search interface to keep it more open ended, allowing extensions like this to exist but not mandating it for simple cases. Otherwise, if we use an approach for every new feature that gets added, the complexity will easily get out of hand. |
Yeah, this is something I was struggling with. One thing is I think most of the complexity is due to adjustable beam width and relaxed mono rather than the pipelining -- especially because the details of those heuristics are tied up in the search loop instead of being pluggable. I can factor them out for now and see how minimal I can make the change to the core loop. In the future if there is a clean representation of all query state (current iteration beamwidth, neighbor queue, IOs in flight, etc) then each heuristic like early-stop or beamwidth adjustment can be a state->state function called each iteration, they will be composable |
Strip ABW and RM from the core search loop to minimize changes needed for the pipelining PR. These optimizations can be added back in a separate PR. - Remove adaptive_beam_width, relaxed_monotonicity_l, initial_beam_width, abw_convergence_depth from SearchParams - Remove ABW waste tracking and RM convergence logic from search_internal - Simplify SearchMode enum (BeamSearch is now a unit variant) - Simplify PipelinedConfig (no ABW/RM fields) - Update benchmark to use searcher.search() directly
The position return value was only used for ABW's max_marker convergence tracking, which was removed. Simplify to peek_best_unsubmitted everywhere.
Replace the two-branch pipelined/non-pipelined loop with a single three-phase iteration: 1. Expand — process whatever data is available 2. Select + Submit — fill pipeline up to beam_width 3. Wait — block only when idle with pending IOs Both pipelined and non-pipelined accessors follow the same path; the ExpandBeam trait defaults (no-op submit, synchronous expand) make it transparent. - Remove is_pipelined() from ExpandBeam trait - Use beam_width - inflight_count() for unified submission cap - Pass beam_nodes from previous submit as priority hint to expand_available (improves pipelined node selection) - Net -61 lines from search_internal
|
Thinking more broadly, I wonder if there are changes we can make to how search dispatches in general to make your/other contributor's lives easier. I know @narendatha and I discussed some ideas last week to try to consolidate the user-facing API for range search/multi-hop/diversity search, but I'm not sure what the status of that is. |
…cratch - Remove dead id_scratch field (was never read by any search code) - Add neighbors: Vec<Neighbor<I>> buffer (reused across search hops) - Add submitted: HashSet<I> for pipelined submission tracking - Update search_internal and range_search_internal to use scratch buffers instead of allocating per-call - Update diverse search scratch construction and tests
Replace the (I, bool) visited flag in NeighborPriorityQueue with a tri-state NodeState enum (Unvisited, Submitted, Visited). This: 1. Eliminates the submitted HashSet from SearchScratch — node state is tracked directly in the queue with no hashing overhead. 2. Fixes silent drop infinite loops — submit_expand now returns rejected IDs, and the search loop calls revert_submitted() to transition them back to Unvisited for retry on the next iteration. 3. Implements peek_best_unsubmitted/mark_submitted/revert_submitted/ mark_visited_by_id on DiverseNeighborQueue by delegating to the inner global_queue. 4. Updates all tests to use the new tri-state API (246 pass). SIFT1M benchmark shows no regression: BeamSearch: L=40 246 QPS, 86.4% recall PipeSearch: L=40 308 QPS, 87.7% recall (+25% QPS)
Address findings from multi-model code review: 1. Fuse peek_best_unsubmitted + mark_submitted into pop_best_unsubmitted (eliminates redundant double-scan in Phase 2 submit loop) 2. Propagate wait_for_io errors instead of silently dropping them (prevents infinite spin on io_uring failures) 3. Fix set_state assert off-by-one: index <= size -> index < size 4. Eliminate per-hop expanded_ids Vec allocation by passing &mut Vec output parameter through expand_available trait method 5. Eliminate double-scan in DiverseNeighborQueue tri-state methods by setting state directly at found index SIFT1M benchmark: +3-7% QPS improvement, identical recall.
|
I'll need to think through this more in the morning, but it's possible we can tweak the semantics of Also, instead of having a blocking call |
| } | ||
|
|
||
| /// Drain all available CQEs from the completion queue. | ||
| fn drain_cqes(&mut self) -> ANNResult<Vec<usize>> { |
There was a problem hiding this comment.
Could this take a closure to invoke on each completed slot rather than returning a Vec? You may be able to avoid allocations in that context.
You'll have to be careful, though, to correctly handle situations where the closure panics.
| pub struct PipelinedReader { | ||
| ring: IoUring, | ||
| /// Pre-allocated sector-aligned read buffers, one per slot. | ||
| slot_bufs: AlignedBoxWithSlice<u8>, |
There was a problem hiding this comment.
This almost surely needs to be in an UnsafeCell, and possibly the allocation needs to be behind a raw pointer as well. With IOUring writing to the other side of it, this is a violation of the exclusiveness of mutable references.
If this is behind a raw pointer, then we only materialize slices when we know that the kernel won't be mutating the backing data, which should avoid language level UB.
| /// this invariant allows the kernel to DMA into a buffer being read, causing | ||
| /// data corruption. When using a free-list for slot management (see | ||
| /// `PipelinedScratch::free_slots`), this invariant is structurally guaranteed. | ||
| pub unsafe fn submit_read(&mut self, sector_offset: u64, slot_id: usize) -> ANNResult<()> { |
There was a problem hiding this comment.
Could we make this safer by having the reader managed free slots itself? These slots can be reclaimed in drained CQEs. My worry with forcing the caller to do this correctly is that (1) they won't and there are no safe-guards against it and (2) even if they do manage it correctly, they will almost surely not do so in a way that doesn't lose a slot ID in the event of a panic.
There was a problem hiding this comment.
One other option would be merging pipelined_reader into pipelined_accessor which has the slot management currently. Originally they were separate because I had a monolithic port too and wanted to compare the perf of both with the same reader implementation.
I will think about what's easier/clearer in the morning. Do you think there's likely to be another consumer for this kind of reader? I need to think if other storage backend would work correctly with pipelined_accessor
| pub fn get_slot_buf(&self, slot_id: usize) -> &[u8] { | ||
| let start = slot_id * self.slot_size; | ||
| &self.slot_bufs[start..start + self.slot_size] | ||
| } |
There was a problem hiding this comment.
I don't think this can be a safe function since there could be an inflight write for this slot.
I am still working on a consolidated API but it is being a bit tricky to support all search calls. Will try for another couple days and update on status. |
…view - Replace AlignedBoxWithSlice with raw pointer backing to fix aliasing UB - Add SlotState machine (Free/InFlight/Completed) with internal free-list - Make get_slot_buf state-checked (panics on non-Completed slots) - Separate enqueue_read/flush to batch submissions and prevent SQE leaks - Add short-read detection in drain_cqes - Make drain_all retry on EINTR, abort on fatal errors - Replace Vec-returning drain with caller-provided buffer - Update PipelinedDiskAccessor to use new safe API - Add 12 comprehensive safety tests
What does this implement/fix? Briefly explain your changes.
This PR is heavily inspired by PipeANN, as described in the paper/open-source implementation. The goal of PipeANN is to process nodes as their individual IOs complete rather than waiting for a full beam, in order to overlap IO and computation and try to keep at least BW IOs in flight at all times. This slide from the PipeANN presentation compares the approach to the existing beam search. One important note is that the search becomes nondeterministic, because the node expansion pattern becomes dependent on IO completion timing. From the PipeANN slides:

In order to accomplish this pipelining, we split the ExpandBeam trait into two phases:
submit_expandandexpand_available. The old behavior can be maintained by havingsubmit_expandbe a no op and implementing the existing expand_beam logic inexpand_available. But by having two phases, we are able to process individual reads from a beam as they complete rather than processing the whole beam monolithically. We then modify the implementation ofsearch_internalto manage the pipeline, submitting new IOs as old ones are processed. I think this two-phase approach should be generic enough to be useful in other search patterns, such as optimistically prefetching some nodes from disk to trade IOPS for latency.I did benchmarking on a SIFT1M index built with R=64,L=100. In my testing, the pipelined search achieves on-par or better recall at all L, and has meaningful latency and throughput improvments. See this benchmark plot for reference, which was run on the local nvme of an Azure L48s v3 machine:

Any other comments?
I initially did a direct port on this branch. I then tried to integrate it more closely with the existing search loop/index abstractions. In my testing, this integrated version has similar performance to the direct port. I'm very open to suggestions about how to better integrate into the existing structure -- I think the ExpandBeam modification is necessary, but ideally more code could be shared between the accessors and the parameter handling could be cleaned up. I'm also not sure if some of the pipelining logic in the accessor could be factored out to make it easier to implement pipelined accessors for other storage backends. Please let me know if you have any suggestions or requests here.
Another thing to note is we update some of the neighbor queue semantics to allow doing adaptive beam width and relaxed monotonic early-termination. I am not fully confident these are correct, especially since adaptive beam is not giving the expected QPS bump when disk IOPS are saturated. I will do some further review, but please let me know if you prefer these optimizations to be separated into a different PRThese optimizations have been removed and can be added in a later PR when it's clearer how to implement them cleanly