diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index 09d2ce0f8..a3820609d 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; use std::fs::File; -use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _}; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; use std::{iter, mem, result}; use bumpalo::Bump; @@ -97,30 +97,34 @@ pub struct FrozenGeoExtractorData<'extractor> { impl<'extractor> FrozenGeoExtractorData<'extractor> { pub fn iter_and_clear_removed( &mut self, - ) -> impl IntoIterator> + '_ { - mem::take(&mut self.removed) + ) -> io::Result> + '_> { + Ok(mem::take(&mut self.removed) .iter() .copied() .map(Ok) - .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)?)) } pub fn iter_and_clear_inserted( &mut self, - ) -> impl IntoIterator> + '_ { - mem::take(&mut self.inserted) + ) -> io::Result> + '_> { + Ok(mem::take(&mut self.inserted) .iter() .copied() .map(Ok) - .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)?)) } } fn iterator_over_spilled_geopoints( spilled: &mut Option>, -) -> impl IntoIterator> + '_ { +) -> io::Result> + '_> { let mut spilled = spilled.take(); - iter::from_fn(move || match &mut spilled { + if let Some(spilled) = &mut spilled { + spilled.rewind()?; + } + + Ok(iter::from_fn(move || match &mut spilled { Some(file) => { let geopoint_bytes = &mut [0u8; mem::size_of::()]; match file.read_exact(geopoint_bytes) { @@ -130,7 +134,7 @@ fn iterator_over_spilled_geopoints( } } None => None, - }) + })) } impl<'extractor> Extractor<'extractor> for GeoExtractor { @@ -157,7 +161,9 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { let mut data_ref = context.data.borrow_mut_or_yield(); for change in changes { - if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) { + if data_ref.spilled_removed.is_none() + && max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) + { // We must spill as we allocated too much memory data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?; data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?; diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 7262c65cb..9ee7577a5 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -86,9 +86,11 @@ where (grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default |max_memory| { // 2% of the indexing memory - let total_bbbuffer_capacity = (max_memory / 100 / 2).min(minimum_capacity); + let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity); let new_grenad_parameters = GrenadParameters { - max_memory: Some(max_memory - total_bbbuffer_capacity), + max_memory: Some( + max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024), + ), ..grenad_parameters }; (new_grenad_parameters, total_bbbuffer_capacity) diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index b650b6b53..512e094fb 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -34,7 +34,7 @@ where } let mut frozen = data.into_inner().freeze()?; - for result in frozen.iter_and_clear_removed() { + for result in frozen.iter_and_clear_removed()? { let extracted_geo_point = result?; let removed = rtree.remove(&GeoPoint::from(extracted_geo_point)); debug_assert!(removed.is_some()); @@ -42,7 +42,7 @@ where debug_assert!(removed); } - for result in frozen.iter_and_clear_inserted() { + for result in frozen.iter_and_clear_inserted()? { let extracted_geo_point = result?; rtree.insert(GeoPoint::from(extracted_geo_point)); let inserted = faceted.insert(extracted_geo_point.docid);