Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions vortex-file/src/read/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use pin_project_lite::pin_project;
use vortex_buffer::Alignment;
use vortex_error::VortexExpect;
use vortex_io::CoalesceConfig;
use vortex_utils::aliases::hash_set::HashSet;

use crate::read::ReadRequest;
use crate::read::RequestId;
Expand Down Expand Up @@ -209,7 +210,10 @@ impl State {
let mut current_end = requests[0].offset + requests[0].length as u64;
let align = *self.coalesced_buffer_alignment as u64;

let mut keys_to_remove = Vec::new();
// Track requests that we've already decided to remove (or that were cancelled) so that
// we don't repeatedly process them during range scans.
let mut keys_to_remove: Vec<(u64, RequestId)> = Vec::new();
let mut ids_to_remove: HashSet<RequestId> = HashSet::new();
let mut found_new_requests = true;

// Keep expanding the window while we can find new requests within constraints
Expand All @@ -226,7 +230,7 @@ impl State {
.range((scan_start, RequestId::MIN)..=(scan_end, RequestId::MAX))
{
// Skip if we've already marked this request for removal
if keys_to_remove.iter().any(|&(_, id)| id == req_id) {
if ids_to_remove.contains(&req_id) {
continue;
}

Expand All @@ -238,7 +242,9 @@ impl State {

// Skip any cancelled requests
if req.callback.is_closed() {
keys_to_remove.push((req_offset, req_id));
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
continue;
}

Expand Down Expand Up @@ -267,7 +273,9 @@ impl State {
.vortex_expect("Missing request in requests_by_offset");

requests.push(req);
keys_to_remove.push((req_offset, req_id));
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
found_new_requests = true;
}
}
Expand Down
Loading