5113: Fix the Minimum BBQueue channel threshold r=Kerollmops a=Kerollmops



Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-12-05 09:01:02 +00:00 committed by GitHub
commit 6298db5bea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 23 additions and 15 deletions

View File

@ -1,6 +1,6 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _}; use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
use std::{iter, mem, result}; use std::{iter, mem, result};
use bumpalo::Bump; use bumpalo::Bump;
@ -97,30 +97,34 @@ pub struct FrozenGeoExtractorData<'extractor> {
impl<'extractor> FrozenGeoExtractorData<'extractor> { impl<'extractor> FrozenGeoExtractorData<'extractor> {
pub fn iter_and_clear_removed( pub fn iter_and_clear_removed(
&mut self, &mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ { ) -> io::Result<impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_> {
mem::take(&mut self.removed) Ok(mem::take(&mut self.removed)
.iter() .iter()
.copied() .copied()
.map(Ok) .map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)) .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)?))
} }
pub fn iter_and_clear_inserted( pub fn iter_and_clear_inserted(
&mut self, &mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ { ) -> io::Result<impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_> {
mem::take(&mut self.inserted) Ok(mem::take(&mut self.inserted)
.iter() .iter()
.copied() .copied()
.map(Ok) .map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)) .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)?))
} }
} }
fn iterator_over_spilled_geopoints( fn iterator_over_spilled_geopoints(
spilled: &mut Option<BufReader<File>>, spilled: &mut Option<BufReader<File>>,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ { ) -> io::Result<impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_> {
let mut spilled = spilled.take(); let mut spilled = spilled.take();
iter::from_fn(move || match &mut spilled { if let Some(spilled) = &mut spilled {
spilled.rewind()?;
}
Ok(iter::from_fn(move || match &mut spilled {
Some(file) => { Some(file) => {
let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()]; let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
match file.read_exact(geopoint_bytes) { match file.read_exact(geopoint_bytes) {
@ -130,7 +134,7 @@ fn iterator_over_spilled_geopoints(
} }
} }
None => None, None => None,
}) }))
} }
impl<'extractor> Extractor<'extractor> for GeoExtractor { impl<'extractor> Extractor<'extractor> for GeoExtractor {
@ -157,7 +161,9 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
let mut data_ref = context.data.borrow_mut_or_yield(); let mut data_ref = context.data.borrow_mut_or_yield();
for change in changes { for change in changes {
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) { if data_ref.spilled_removed.is_none()
&& max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm)
{
// We must spill as we allocated too much memory // We must spill as we allocated too much memory
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?; data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?; data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;

View File

@ -86,9 +86,11 @@ where
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default (grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default
|max_memory| { |max_memory| {
// 2% of the indexing memory // 2% of the indexing memory
let total_bbbuffer_capacity = (max_memory / 100 / 2).min(minimum_capacity); let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity);
let new_grenad_parameters = GrenadParameters { let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory - total_bbbuffer_capacity), max_memory: Some(
max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024),
),
..grenad_parameters ..grenad_parameters
}; };
(new_grenad_parameters, total_bbbuffer_capacity) (new_grenad_parameters, total_bbbuffer_capacity)

View File

@ -34,7 +34,7 @@ where
} }
let mut frozen = data.into_inner().freeze()?; let mut frozen = data.into_inner().freeze()?;
for result in frozen.iter_and_clear_removed() { for result in frozen.iter_and_clear_removed()? {
let extracted_geo_point = result?; let extracted_geo_point = result?;
let removed = rtree.remove(&GeoPoint::from(extracted_geo_point)); let removed = rtree.remove(&GeoPoint::from(extracted_geo_point));
debug_assert!(removed.is_some()); debug_assert!(removed.is_some());
@ -42,7 +42,7 @@ where
debug_assert!(removed); debug_assert!(removed);
} }
for result in frozen.iter_and_clear_inserted() { for result in frozen.iter_and_clear_inserted()? {
let extracted_geo_point = result?; let extracted_geo_point = result?;
rtree.insert(GeoPoint::from(extracted_geo_point)); rtree.insert(GeoPoint::from(extracted_geo_point));
let inserted = faceted.insert(extracted_geo_point.docid); let inserted = faceted.insert(extracted_geo_point.docid);