From 739c52a3cdc420f929e45ce6189f18d624dc904f Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Wed, 4 Dec 2024 16:16:48 +0100 Subject: [PATCH 1/6] Replace HashSets by BTreeSets for the prefixes --- .../milli/src/update/new/word_fst_builder.rs | 12 +++---- .../src/update/new/words_prefix_docids.rs | 36 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/milli/src/update/new/word_fst_builder.rs b/crates/milli/src/update/new/word_fst_builder.rs index 6bc72d91d..a9a5222be 100644 --- a/crates/milli/src/update/new/word_fst_builder.rs +++ b/crates/milli/src/update/new/word_fst_builder.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use std::io::BufWriter; use fst::{Set, SetBuilder, Streamer}; @@ -75,8 +75,8 @@ pub struct PrefixData { #[derive(Debug)] pub struct PrefixDelta { - pub modified: HashSet, - pub deleted: HashSet, + pub modified: BTreeSet, + pub deleted: BTreeSet, } struct PrefixFstBuilder { @@ -86,7 +86,7 @@ struct PrefixFstBuilder { prefix_fst_builders: Vec>>, current_prefix: Vec, current_prefix_count: Vec, - modified_prefixes: HashSet, + modified_prefixes: BTreeSet, current_prefix_is_modified: Vec, } @@ -110,7 +110,7 @@ impl PrefixFstBuilder { prefix_fst_builders, current_prefix: vec![Prefix::new(); max_prefix_length], current_prefix_count: vec![0; max_prefix_length], - modified_prefixes: HashSet::new(), + modified_prefixes: BTreeSet::new(), current_prefix_is_modified: vec![false; max_prefix_length], }) } @@ -180,7 +180,7 @@ impl PrefixFstBuilder { let prefix_fst_mmap = unsafe { Mmap::map(&prefix_fst_file)? }; let new_prefix_fst = Set::new(&prefix_fst_mmap)?; let old_prefix_fst = index.words_prefixes_fst(rtxn)?; - let mut deleted_prefixes = HashSet::new(); + let mut deleted_prefixes = BTreeSet::new(); { let mut deleted_prefixes_stream = old_prefix_fst.op().add(&new_prefix_fst).difference(); while let Some(prefix) = deleted_prefixes_stream.next() { diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index 7e56beeae..bf64049c3 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::collections::HashSet; +use std::collections::BTreeSet; use std::io::{BufReader, BufWriter, Read, Seek, Write}; use hashbrown::HashMap; @@ -37,8 +37,8 @@ impl WordPrefixDocids { fn execute( self, wtxn: &mut heed::RwTxn, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; self.recompute_modified_prefixes(wtxn, prefix_to_compute) @@ -48,7 +48,7 @@ impl WordPrefixDocids { fn recompute_modified_prefixes( &self, wtxn: &mut RwTxn, - prefixes: &HashSet, + prefixes: &BTreeSet, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. // And collect the CboRoaringBitmaps pointers in an HashMap. @@ -127,7 +127,7 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> { pub fn from_prefixes( database: Database, rtxn: &'rtxn RoTxn, - prefixes: &'a HashSet, + prefixes: &'a BTreeSet, ) -> heed::Result { let database = database.remap_data_type::(); @@ -173,8 +173,8 @@ impl WordPrefixIntegerDocids { fn execute( self, wtxn: &mut heed::RwTxn, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; self.recompute_modified_prefixes(wtxn, prefix_to_compute) @@ -184,7 +184,7 @@ impl WordPrefixIntegerDocids { fn recompute_modified_prefixes( &self, wtxn: &mut RwTxn, - prefixes: &HashSet, + prefixes: &BTreeSet, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. // And collect the CboRoaringBitmaps pointers in an HashMap. @@ -262,7 +262,7 @@ impl<'a, 'rtxn> FrozenPrefixIntegerBitmaps<'a, 'rtxn> { pub fn from_prefixes( database: Database, rtxn: &'rtxn RoTxn, - prefixes: &'a HashSet, + prefixes: &'a BTreeSet, ) -> heed::Result { let database = database.remap_data_type::(); @@ -291,7 +291,7 @@ unsafe impl<'a, 'rtxn> Sync for FrozenPrefixIntegerBitmaps<'a, 'rtxn> {} fn delete_prefixes( wtxn: &mut RwTxn, prefix_database: &Database, - prefixes: &HashSet, + prefixes: &BTreeSet, ) -> Result<()> { // We remove all the entries that are no more required in this word prefix docids database. for prefix in prefixes { @@ -309,8 +309,8 @@ fn delete_prefixes( pub fn compute_word_prefix_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( @@ -325,8 +325,8 @@ pub fn compute_word_prefix_docids( pub fn compute_exact_word_prefix_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( @@ -341,8 +341,8 @@ pub fn compute_exact_word_prefix_docids( pub fn compute_word_prefix_fid_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( @@ -357,8 +357,8 @@ pub fn compute_word_prefix_fid_docids( pub fn compute_word_prefix_position_docids( wtxn: &mut RwTxn, index: &Index, - prefix_to_compute: &HashSet, - prefix_to_delete: &HashSet, + prefix_to_compute: &BTreeSet, + prefix_to_delete: &BTreeSet, grenad_parameters: GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( From 29ef1645305b5b1f1d37011fec05f7c2b8ca66f7 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 4 Dec 2024 16:33:35 +0100 Subject: [PATCH 2/6] Introduce a new semi ordered merge function --- crates/milli/src/update/new/extract/cache.rs | 110 +++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index be077d142..ae5ade17e 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -566,6 +566,116 @@ where Ok(()) } +/// Merges the caches that must be all associated to the same bucket. +/// +/// It merges entries like the `merge_caches` function +pub fn merge_caches_alt(frozen: Vec, mut f: F) -> Result<()> +where + F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, +{ + let mut maps = Vec::new(); + let mut readers = Vec::new(); + let mut current_bucket = None; + for FrozenCache { bucket, cache, ref mut spilled } in frozen { + assert_eq!(*current_bucket.get_or_insert(bucket), bucket); + maps.push(cache); + readers.append(spilled); + } + + // First manage the spilled entries by looking into the HashMaps, + // merge them and mark them as dummy. + let mut heap = BinaryHeap::new(); + for (source_index, source) in readers.into_iter().enumerate() { + let mut cursor = source.into_cursor()?; + if cursor.move_on_next()?.is_some() { + heap.push(Entry { cursor, source_index }); + } + } + + loop { + let mut first_entry = match heap.pop() { + Some(entry) => entry, + None => break, + }; + + let (first_key, first_value) = match first_entry.cursor.current() { + Some((key, value)) => (key, value), + None => break, + }; + + let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; + while let Some(mut entry) = heap.peek_mut() { + if let Some((key, _value)) = entry.cursor.current() { + if first_key == key { + let new = DelAddRoaringBitmap::from_bytes(first_value)?; + output = output.merge(new); + // When we are done we the current value of this entry move make + // it move forward and let the heap reorganize itself (on drop) + if entry.cursor.move_on_next()?.is_none() { + PeekMut::pop(entry); + } + } else { + break; + } + } + } + + // Once we merged all of the spilled bitmaps we must also + // fetch the entries from the non-spilled entries (the HashMaps). + for (map_index, map) in maps.iter_mut().enumerate() { + if first_entry.source_index != map_index { + if let Some(new) = map.get_mut(first_key) { + output.union_and_clear_bbbul(new); + } + } + } + + // We send the merged entry outside. + (f)(first_key, output)?; + + // Don't forget to put the first entry back into the heap. + if first_entry.cursor.move_on_next()?.is_some() { + heap.push(first_entry) + } + } + + // Then manage the content on the HashMap entries that weren't taken (mem::take). + let order_count = 1000; + while let Some(mut map) = maps.pop() { + let mut iter = map.iter_mut(); + + loop { + let mut ordered_buffer: Vec<_> = iter.by_ref().take(order_count).collect(); + ordered_buffer.sort_unstable_by_key(|(key, _)| *key); + + if ordered_buffer.is_empty() { + break; + } + + for (key, bbbul) in ordered_buffer.drain(..) { + // Make sure we don't try to work with entries already managed by the spilled + if bbbul.is_empty() { + continue; + } + + let mut output = DelAddRoaringBitmap::empty(); + output.union_and_clear_bbbul(bbbul); + + for rhs in maps.iter_mut() { + if let Some(new) = rhs.get_mut(key) { + output.union_and_clear_bbbul(new); + } + } + + // We send the merged entry outside. + (f)(key, output)?; + } + } + } + + Ok(()) +} + struct Entry { cursor: ReaderCursor, source_index: usize, From be411435f5248531f9b5b7891016e5e7304d5a83 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 4 Dec 2024 16:37:29 +0100 Subject: [PATCH 3/6] Use the merge_caches_alt function in the docids merging --- crates/milli/src/update/new/extract/mod.rs | 5 ++++- crates/milli/src/update/new/merger.rs | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index e67f70db1..3601dd9c6 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -6,7 +6,10 @@ mod searchable; mod vectors; use bumpalo::Bump; -pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap}; +pub use cache::{ + merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches, + DelAddRoaringBitmap, +}; pub use documents::*; pub use faceted::*; pub use geo::*; diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index b650b6b53..9f2aae5a8 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -9,8 +9,8 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ - merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, - GeoExtractorData, + merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches, + DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; @@ -78,7 +78,7 @@ where if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } - merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_alt(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { From cb99ac6f7eddef97bb4386987b3151ecd40219f4 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 4 Dec 2024 17:00:22 +0100 Subject: [PATCH 4/6] Consume vec instead of draining --- crates/milli/src/update/new/extract/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index ae5ade17e..b57ba6b9b 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -652,7 +652,7 @@ where break; } - for (key, bbbul) in ordered_buffer.drain(..) { + for (key, bbbul) in ordered_buffer { // Make sure we don't try to work with entries already managed by the spilled if bbbul.is_empty() { continue; From 2e32d0474ccc846bbe86c0bbafd88368f82e8a3e Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 4 Dec 2024 17:05:07 +0100 Subject: [PATCH 5/6] Lexicographically sort all the map to merge --- crates/milli/src/update/new/extract/cache.rs | 38 +++++++------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index b57ba6b9b..325a72280 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -640,36 +640,24 @@ where } // Then manage the content on the HashMap entries that weren't taken (mem::take). - let order_count = 1000; while let Some(mut map) = maps.pop() { - let mut iter = map.iter_mut(); + // Make sure we don't try to work with entries already managed by the spilled + let mut ordered_entries: Vec<_> = + map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect(); + ordered_entries.sort_unstable_by_key(|(key, _)| *key); - loop { - let mut ordered_buffer: Vec<_> = iter.by_ref().take(order_count).collect(); - ordered_buffer.sort_unstable_by_key(|(key, _)| *key); + for (key, bbbul) in ordered_entries { + let mut output = DelAddRoaringBitmap::empty(); + output.union_and_clear_bbbul(bbbul); - if ordered_buffer.is_empty() { - break; + for rhs in maps.iter_mut() { + if let Some(new) = rhs.get_mut(key) { + output.union_and_clear_bbbul(new); + } } - for (key, bbbul) in ordered_buffer { - // Make sure we don't try to work with entries already managed by the spilled - if bbbul.is_empty() { - continue; - } - - let mut output = DelAddRoaringBitmap::empty(); - output.union_and_clear_bbbul(bbbul); - - for rhs in maps.iter_mut() { - if let Some(new) = rhs.get_mut(key) { - output.union_and_clear_bbbul(new); - } - } - - // We send the merged entry outside. - (f)(key, output)?; - } + // We send the merged entry outside. + (f)(key, output)?; } } From 52843123d49d5b8a7903a9c6f95ae584f7e87a8c Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 5 Dec 2024 10:03:05 +0100 Subject: [PATCH 6/6] Clean up and remove the non-sorted merge_caches function --- crates/milli/src/update/new/extract/cache.rs | 103 +------------------ crates/milli/src/update/new/extract/mod.rs | 3 +- crates/milli/src/update/new/merger.rs | 8 +- 3 files changed, 8 insertions(+), 106 deletions(-) diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index 325a72280..658a3127c 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -466,110 +466,13 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>( Ok(bucket_caches) } -/// Merges the caches that must be all associated to the same bucket. +/// Merges the caches that must be all associated to the same bucket +/// but make sure to sort the different buckets before performing the merges. /// /// # Panics /// /// - If the bucket IDs in these frozen caches are not exactly the same. -pub fn merge_caches(frozen: Vec, mut f: F) -> Result<()> -where - F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, -{ - let mut maps = Vec::new(); - let mut readers = Vec::new(); - let mut current_bucket = None; - for FrozenCache { bucket, cache, ref mut spilled } in frozen { - assert_eq!(*current_bucket.get_or_insert(bucket), bucket); - maps.push(cache); - readers.append(spilled); - } - - // First manage the spilled entries by looking into the HashMaps, - // merge them and mark them as dummy. - let mut heap = BinaryHeap::new(); - for (source_index, source) in readers.into_iter().enumerate() { - let mut cursor = source.into_cursor()?; - if cursor.move_on_next()?.is_some() { - heap.push(Entry { cursor, source_index }); - } - } - - loop { - let mut first_entry = match heap.pop() { - Some(entry) => entry, - None => break, - }; - - let (first_key, first_value) = match first_entry.cursor.current() { - Some((key, value)) => (key, value), - None => break, - }; - - let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; - while let Some(mut entry) = heap.peek_mut() { - if let Some((key, _value)) = entry.cursor.current() { - if first_key == key { - let new = DelAddRoaringBitmap::from_bytes(first_value)?; - output = output.merge(new); - // When we are done we the current value of this entry move make - // it move forward and let the heap reorganize itself (on drop) - if entry.cursor.move_on_next()?.is_none() { - PeekMut::pop(entry); - } - } else { - break; - } - } - } - - // Once we merged all of the spilled bitmaps we must also - // fetch the entries from the non-spilled entries (the HashMaps). - for (map_index, map) in maps.iter_mut().enumerate() { - if first_entry.source_index != map_index { - if let Some(new) = map.get_mut(first_key) { - output.union_and_clear_bbbul(new); - } - } - } - - // We send the merged entry outside. - (f)(first_key, output)?; - - // Don't forget to put the first entry back into the heap. - if first_entry.cursor.move_on_next()?.is_some() { - heap.push(first_entry) - } - } - - // Then manage the content on the HashMap entries that weren't taken (mem::take). - while let Some(mut map) = maps.pop() { - for (key, bbbul) in map.iter_mut() { - // Make sure we don't try to work with entries already managed by the spilled - if bbbul.is_empty() { - continue; - } - - let mut output = DelAddRoaringBitmap::empty(); - output.union_and_clear_bbbul(bbbul); - - for rhs in maps.iter_mut() { - if let Some(new) = rhs.get_mut(key) { - output.union_and_clear_bbbul(new); - } - } - - // We send the merged entry outside. - (f)(key, output)?; - } - } - - Ok(()) -} - -/// Merges the caches that must be all associated to the same bucket. -/// -/// It merges entries like the `merge_caches` function -pub fn merge_caches_alt(frozen: Vec, mut f: F) -> Result<()> +pub fn merge_caches_sorted(frozen: Vec, mut f: F) -> Result<()> where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 3601dd9c6..0bdf31635 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -7,8 +7,7 @@ mod vectors; use bumpalo::Bump; pub use cache::{ - merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches, - DelAddRoaringBitmap, + merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, }; pub use documents::*; pub use faceted::*; diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 9f2aae5a8..85f5a70f7 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -9,8 +9,8 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ - merge_caches, merge_caches_alt, transpose_and_freeze_caches, BalancedCaches, - DelAddRoaringBitmap, FacetKind, GeoExtractorData, + merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, + FacetKind, GeoExtractorData, }; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; @@ -78,7 +78,7 @@ where if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } - merge_caches_alt(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { @@ -107,7 +107,7 @@ pub fn merge_and_send_facet_docids<'extractor>( .map(|frozen| { let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let rtxn = index.read_txn()?; - merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { + merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => {