Take the BBQueue capacity into account in the max memory

This commit is contained in:
Clément Renault 2024-11-28 15:43:14 +01:00
parent b57dd5c58e
commit 3c7ac093d3
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 22 additions and 12 deletions

View File

@ -27,8 +27,9 @@ use crate::{CboRoaringBitmapCodec, DocumentId, Index};
/// Creates a tuple of senders/receiver to be used by
/// the extractors and the writer loop.
///
/// The `bbqueue_capacity` represent the number of bytes allocated
/// to each BBQueue buffer and is not the sum of all of them.
/// The `total_bbbuffer_capacity` represent the number of bytes
/// allocated to all BBQueue buffer. It will be split by the
/// number of thread.
///
/// The `channel_capacity` parameter defines the number of
/// too-large-to-fit-in-BBQueue entries that can be sent through
@ -46,10 +47,12 @@ use crate::{CboRoaringBitmapCodec, DocumentId, Index};
/// to the number of available threads in the rayon threadpool.
pub fn extractor_writer_bbqueue(
bbbuffers: &mut Vec<BBBuffer>,
bbbuffer_capacity: usize,
total_bbbuffer_capacity: usize,
channel_capacity: usize,
) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) {
bbbuffers.resize_with(rayon::current_num_threads(), || BBBuffer::new(bbbuffer_capacity));
let current_num_threads = rayon::current_num_threads();
let bbbuffer_capacity = total_bbbuffer_capacity.checked_div(current_num_threads).unwrap();
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
let capacity = bbbuffers.first().unwrap().capacity();
// Read the field description to understand this

View File

@ -79,15 +79,22 @@ where
{
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory.
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(grenad_parameters, 100 * 1024 * 1024 * pool.current_num_threads()), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory / 10; // 10% of the indexing memory
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory - total_bbbuffer_capacity),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
let (extractor_sender, mut writer_receiver) = pool
.install(|| {
/// TODO restrict memory and remove this memory from the extractors bump allocators
extractor_writer_bbqueue(
&mut bbbuffers,
100 * 1024 * 1024, // 100 MiB
1000,
)
})
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;