mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-25 12:47:28 +01:00
Clean up and remove the non-sorted merge_caches function
This commit is contained in:
parent
2e32d0474c
commit
52843123d4
@ -466,110 +466,13 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
|
||||
Ok(bucket_caches)
|
||||
}
|
||||
|
||||
/// Merges the caches that must be all associated to the same bucket.
|
||||
/// Merges the caches that must be all associated to the same bucket
|
||||
/// but make sure to sort the different buckets before performing the merges.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - If the bucket IDs in these frozen caches are not exactly the same.
|
||||
pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
|
||||
where
|
||||
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
|
||||
{
|
||||
let mut maps = Vec::new();
|
||||
let mut readers = Vec::new();
|
||||
let mut current_bucket = None;
|
||||
for FrozenCache { bucket, cache, ref mut spilled } in frozen {
|
||||
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
|
||||
maps.push(cache);
|
||||
readers.append(spilled);
|
||||
}
|
||||
|
||||
// First manage the spilled entries by looking into the HashMaps,
|
||||
// merge them and mark them as dummy.
|
||||
let mut heap = BinaryHeap::new();
|
||||
for (source_index, source) in readers.into_iter().enumerate() {
|
||||
let mut cursor = source.into_cursor()?;
|
||||
if cursor.move_on_next()?.is_some() {
|
||||
heap.push(Entry { cursor, source_index });
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut first_entry = match heap.pop() {
|
||||
Some(entry) => entry,
|
||||
None => break,
|
||||
};
|
||||
|
||||
let (first_key, first_value) = match first_entry.cursor.current() {
|
||||
Some((key, value)) => (key, value),
|
||||
None => break,
|
||||
};
|
||||
|
||||
let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
|
||||
while let Some(mut entry) = heap.peek_mut() {
|
||||
if let Some((key, _value)) = entry.cursor.current() {
|
||||
if first_key == key {
|
||||
let new = DelAddRoaringBitmap::from_bytes(first_value)?;
|
||||
output = output.merge(new);
|
||||
// When we are done we the current value of this entry move make
|
||||
// it move forward and let the heap reorganize itself (on drop)
|
||||
if entry.cursor.move_on_next()?.is_none() {
|
||||
PeekMut::pop(entry);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Once we merged all of the spilled bitmaps we must also
|
||||
// fetch the entries from the non-spilled entries (the HashMaps).
|
||||
for (map_index, map) in maps.iter_mut().enumerate() {
|
||||
if first_entry.source_index != map_index {
|
||||
if let Some(new) = map.get_mut(first_key) {
|
||||
output.union_and_clear_bbbul(new);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We send the merged entry outside.
|
||||
(f)(first_key, output)?;
|
||||
|
||||
// Don't forget to put the first entry back into the heap.
|
||||
if first_entry.cursor.move_on_next()?.is_some() {
|
||||
heap.push(first_entry)
|
||||
}
|
||||
}
|
||||
|
||||
// Then manage the content on the HashMap entries that weren't taken (mem::take).
|
||||
while let Some(mut map) = maps.pop() {
|
||||
for (key, bbbul) in map.iter_mut() {
|
||||
// Make sure we don't try to work with entries already managed by the spilled
|
||||
if bbbul.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut output = DelAddRoaringBitmap::empty();
|
||||
output.union_and_clear_bbbul(bbbul);
|
||||
|
||||
for rhs in maps.iter_mut() {
|
||||
if let Some(new) = rhs.get_mut(key) {
|
||||
output.union_and_clear_bbbul(new);
|
||||
}
|
||||
}
|
||||
|
||||
// We send the merged entry outside.
|
||||
(f)(key, output)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Merges the caches that must be all associated to the same bucket.
|
||||
///
|
||||
/// It merges entries like the `merge_caches` function
|
||||
pub fn merge_caches_alt<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
|
||||
pub fn merge_caches_sorted<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
|
||||
where
|
||||
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
|
||||
{
|
||||
|
@ -7,8 +7,7 @@ mod vectors;
|
||||
|
||||
use bumpalo::Bump;
|
||||
pub use cache::{
|
||||
merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches,
|
||||
DelAddRoaringBitmap,
|
||||
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
||||
};
|
||||
pub use documents::*;
|
||||
pub use faceted::*;
|
||||
|
@ -9,8 +9,8 @@ use roaring::RoaringBitmap;
|
||||
|
||||
use super::channel::*;
|
||||
use super::extract::{
|
||||
merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches,
|
||||
DelAddRoaringBitmap, FacetKind, GeoExtractorData,
|
||||
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
||||
FacetKind, GeoExtractorData,
|
||||
};
|
||||
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
|
||||
|
||||
@ -78,7 +78,7 @@ where
|
||||
if must_stop_processing() {
|
||||
return Err(InternalError::AbortedIndexation.into());
|
||||
}
|
||||
merge_caches_alt(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||
let current = database.get(&rtxn, key)?;
|
||||
match merge_cbo_bitmaps(current, del, add)? {
|
||||
Operation::Write(bitmap) => {
|
||||
@ -107,7 +107,7 @@ pub fn merge_and_send_facet_docids<'extractor>(
|
||||
.map(|frozen| {
|
||||
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
|
||||
let rtxn = index.read_txn()?;
|
||||
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
|
||||
match merge_cbo_bitmaps(current, del, add)? {
|
||||
Operation::Write(bitmap) => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user