From 8ecb726683bca6a2e2c837db8c187ddbe39554f6 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 3 Dec 2024 15:49:11 +0100 Subject: [PATCH 1/3] Fix the minimun BBQueue channel threshold --- crates/milli/src/update/new/indexer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 7262c65cb..383823de1 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -86,9 +86,9 @@ where (grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default |max_memory| { // 2% of the indexing memory - let total_bbbuffer_capacity = (max_memory / 100 / 2).min(minimum_capacity); + let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity); let new_grenad_parameters = GrenadParameters { - max_memory: Some(max_memory - total_bbbuffer_capacity), + max_memory: Some(max_memory.saturating_sub(total_bbbuffer_capacity)), ..grenad_parameters }; (new_grenad_parameters, total_bbbuffer_capacity) From 5f896b1050ebef939ab68b8ba569193278d61ebb Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 4 Dec 2024 17:51:12 +0100 Subject: [PATCH 2/3] Fix geo when spilling --- .../milli/src/update/new/extract/geo/mod.rs | 28 +++++++++++-------- crates/milli/src/update/new/merger.rs | 4 +-- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index 09d2ce0f8..a3820609d 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; use std::fs::File; -use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _}; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; use std::{iter, mem, result}; use bumpalo::Bump; @@ -97,30 +97,34 @@ pub struct FrozenGeoExtractorData<'extractor> { impl<'extractor> FrozenGeoExtractorData<'extractor> { pub fn iter_and_clear_removed( &mut self, - ) -> impl IntoIterator> + '_ { - mem::take(&mut self.removed) + ) -> io::Result> + '_> { + Ok(mem::take(&mut self.removed) .iter() .copied() .map(Ok) - .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)?)) } pub fn iter_and_clear_inserted( &mut self, - ) -> impl IntoIterator> + '_ { - mem::take(&mut self.inserted) + ) -> io::Result> + '_> { + Ok(mem::take(&mut self.inserted) .iter() .copied() .map(Ok) - .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)?)) } } fn iterator_over_spilled_geopoints( spilled: &mut Option>, -) -> impl IntoIterator> + '_ { +) -> io::Result> + '_> { let mut spilled = spilled.take(); - iter::from_fn(move || match &mut spilled { + if let Some(spilled) = &mut spilled { + spilled.rewind()?; + } + + Ok(iter::from_fn(move || match &mut spilled { Some(file) => { let geopoint_bytes = &mut [0u8; mem::size_of::()]; match file.read_exact(geopoint_bytes) { @@ -130,7 +134,7 @@ fn iterator_over_spilled_geopoints( } } None => None, - }) + })) } impl<'extractor> Extractor<'extractor> for GeoExtractor { @@ -157,7 +161,9 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { let mut data_ref = context.data.borrow_mut_or_yield(); for change in changes { - if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) { + if data_ref.spilled_removed.is_none() + && max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) + { // We must spill as we allocated too much memory data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?; data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?; diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index b650b6b53..512e094fb 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -34,7 +34,7 @@ where } let mut frozen = data.into_inner().freeze()?; - for result in frozen.iter_and_clear_removed() { + for result in frozen.iter_and_clear_removed()? { let extracted_geo_point = result?; let removed = rtree.remove(&GeoPoint::from(extracted_geo_point)); debug_assert!(removed.is_some()); @@ -42,7 +42,7 @@ where debug_assert!(removed); } - for result in frozen.iter_and_clear_inserted() { + for result in frozen.iter_and_clear_inserted()? { let extracted_geo_point = result?; rtree.insert(GeoPoint::from(extracted_geo_point)); let inserted = faceted.insert(extracted_geo_point.docid); From 3a11e39c010d474129e1c4816c61d9f96bdead00 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 4 Dec 2024 17:52:53 +0100 Subject: [PATCH 3/3] Force max_memory to a min of 100MiB --- crates/milli/src/update/new/indexer/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 383823de1..9ee7577a5 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -88,7 +88,9 @@ where // 2% of the indexing memory let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity); let new_grenad_parameters = GrenadParameters { - max_memory: Some(max_memory.saturating_sub(total_bbbuffer_capacity)), + max_memory: Some( + max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024), + ), ..grenad_parameters }; (new_grenad_parameters, total_bbbuffer_capacity)