Prefer returning a abort indexation rather than throwing a panic

This commit is contained in:
Clément Renault 2024-12-02 11:53:42 +01:00
parent e9f34fb4b1
commit 767259be7e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -25,7 +25,7 @@ use crate::index::db_name;
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding;
use crate::{CboRoaringBitmapCodec, DocumentId, Index, InternalError};
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
/// Creates a tuple of senders/receiver to be used by
/// the extractors and the writer loop.
@ -454,7 +454,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?;
payload_header.serialize_into(&mut grant);
// We only send a wake up message when the channel is empty
@ -500,7 +500,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?;
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
@ -575,7 +575,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?;
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
@ -629,7 +629,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?;
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
@ -652,21 +652,21 @@ fn reserve_grant<'b>(
producer: &mut FrameProducer<'b>,
total_length: usize,
sender: &flume::Sender<ReceiverAction>,
) -> FrameGrantW<'b> {
) -> crate::Result<FrameGrantW<'b>> {
loop {
for _ in 0..10_000 {
match producer.grant(total_length) {
Ok(mut grant) => {
// We could commit only the used memory.
grant.to_commit(total_length);
return grant;
return Ok(grant);
}
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
}
}
if sender.is_disconnected() {
panic!("channel is disconnected");
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
}
}