mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-05-18 18:23:57 +02:00
Fix geo when spilling
This commit is contained in:
parent
8ecb726683
commit
5f896b1050
@ -1,6 +1,6 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _};
|
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
|
||||||
use std::{iter, mem, result};
|
use std::{iter, mem, result};
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
@ -97,30 +97,34 @@ pub struct FrozenGeoExtractorData<'extractor> {
|
|||||||
impl<'extractor> FrozenGeoExtractorData<'extractor> {
|
impl<'extractor> FrozenGeoExtractorData<'extractor> {
|
||||||
pub fn iter_and_clear_removed(
|
pub fn iter_and_clear_removed(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
|
) -> io::Result<impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_> {
|
||||||
mem::take(&mut self.removed)
|
Ok(mem::take(&mut self.removed)
|
||||||
.iter()
|
.iter()
|
||||||
.copied()
|
.copied()
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
.chain(iterator_over_spilled_geopoints(&mut self.spilled_removed))
|
.chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter_and_clear_inserted(
|
pub fn iter_and_clear_inserted(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
|
) -> io::Result<impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_> {
|
||||||
mem::take(&mut self.inserted)
|
Ok(mem::take(&mut self.inserted)
|
||||||
.iter()
|
.iter()
|
||||||
.copied()
|
.copied()
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
.chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted))
|
.chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)?))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iterator_over_spilled_geopoints(
|
fn iterator_over_spilled_geopoints(
|
||||||
spilled: &mut Option<BufReader<File>>,
|
spilled: &mut Option<BufReader<File>>,
|
||||||
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
|
) -> io::Result<impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_> {
|
||||||
let mut spilled = spilled.take();
|
let mut spilled = spilled.take();
|
||||||
iter::from_fn(move || match &mut spilled {
|
if let Some(spilled) = &mut spilled {
|
||||||
|
spilled.rewind()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(iter::from_fn(move || match &mut spilled {
|
||||||
Some(file) => {
|
Some(file) => {
|
||||||
let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
|
let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
|
||||||
match file.read_exact(geopoint_bytes) {
|
match file.read_exact(geopoint_bytes) {
|
||||||
@ -130,7 +134,7 @@ fn iterator_over_spilled_geopoints(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => None,
|
None => None,
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
||||||
@ -157,7 +161,9 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
|||||||
let mut data_ref = context.data.borrow_mut_or_yield();
|
let mut data_ref = context.data.borrow_mut_or_yield();
|
||||||
|
|
||||||
for change in changes {
|
for change in changes {
|
||||||
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
|
if data_ref.spilled_removed.is_none()
|
||||||
|
&& max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm)
|
||||||
|
{
|
||||||
// We must spill as we allocated too much memory
|
// We must spill as we allocated too much memory
|
||||||
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
||||||
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
||||||
|
@ -34,7 +34,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut frozen = data.into_inner().freeze()?;
|
let mut frozen = data.into_inner().freeze()?;
|
||||||
for result in frozen.iter_and_clear_removed() {
|
for result in frozen.iter_and_clear_removed()? {
|
||||||
let extracted_geo_point = result?;
|
let extracted_geo_point = result?;
|
||||||
let removed = rtree.remove(&GeoPoint::from(extracted_geo_point));
|
let removed = rtree.remove(&GeoPoint::from(extracted_geo_point));
|
||||||
debug_assert!(removed.is_some());
|
debug_assert!(removed.is_some());
|
||||||
@ -42,7 +42,7 @@ where
|
|||||||
debug_assert!(removed);
|
debug_assert!(removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
for result in frozen.iter_and_clear_inserted() {
|
for result in frozen.iter_and_clear_inserted()? {
|
||||||
let extracted_geo_point = result?;
|
let extracted_geo_point = result?;
|
||||||
rtree.insert(GeoPoint::from(extracted_geo_point));
|
rtree.insert(GeoPoint::from(extracted_geo_point));
|
||||||
let inserted = faceted.insert(extracted_geo_point.docid);
|
let inserted = faceted.insert(extracted_geo_point.docid);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user