diff --git a/milli/src/update/available_documents_ids.rs b/milli/src/update/available_ids.rs similarity index 74% rename from milli/src/update/available_documents_ids.rs rename to milli/src/update/available_ids.rs index 3b05c5d6e..68e3dd5a6 100644 --- a/milli/src/update/available_documents_ids.rs +++ b/milli/src/update/available_ids.rs @@ -3,12 +3,12 @@ use std::ops::RangeInclusive; use roaring::bitmap::{IntoIter, RoaringBitmap}; -pub struct AvailableDocumentsIds { +pub struct AvailableIds { iter: Chain>, } -impl AvailableDocumentsIds { - pub fn from_documents_ids(docids: &RoaringBitmap) -> AvailableDocumentsIds { +impl AvailableIds { + pub fn new(docids: &RoaringBitmap) -> AvailableIds { match docids.max() { Some(last_id) => { let mut available = RoaringBitmap::from_iter(0..last_id); @@ -20,17 +20,17 @@ impl AvailableDocumentsIds { None => 1..=0, // empty range iterator }; - AvailableDocumentsIds { iter: available.into_iter().chain(iter) } + AvailableIds { iter: available.into_iter().chain(iter) } } None => { let empty = RoaringBitmap::new().into_iter(); - AvailableDocumentsIds { iter: empty.chain(0..=u32::MAX) } + AvailableIds { iter: empty.chain(0..=u32::MAX) } } } } } -impl Iterator for AvailableDocumentsIds { +impl Iterator for AvailableIds { type Item = u32; fn next(&mut self) -> Option { @@ -45,7 +45,7 @@ mod tests { #[test] fn empty() { let base = RoaringBitmap::new(); - let left = AvailableDocumentsIds::from_documents_ids(&base); + let left = AvailableIds::new(&base); let right = 0..=u32::MAX; left.zip(right).take(500).for_each(|(l, r)| assert_eq!(l, r)); } @@ -58,7 +58,7 @@ mod tests { base.insert(100); base.insert(405); - let left = AvailableDocumentsIds::from_documents_ids(&base); + let left = AvailableIds::new(&base); let right = (0..=u32::MAX).filter(|&n| n != 0 && n != 10 && n != 100 && n != 405); left.zip(right).take(500).for_each(|(l, r)| assert_eq!(l, r)); } diff --git a/milli/src/update/concurrent_available_ids.rs b/milli/src/update/concurrent_available_ids.rs new file mode 100644 index 000000000..f3b15ac45 --- /dev/null +++ b/milli/src/update/concurrent_available_ids.rs @@ -0,0 +1,59 @@ +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; + +use roaring::RoaringBitmap; + +/// A concurrent ID generate that will never return the same ID twice. +#[derive(Debug)] +pub struct ConcurrentAvailableIds { + /// The current tree node ID we should use if there is no other IDs available. + current: AtomicU32, + /// The total number of tree node IDs used. + used: AtomicU64, + + /// A list of IDs to exhaust before picking IDs from `current`. + available: RoaringBitmap, + /// The current Nth ID to select in the bitmap. + select_in_bitmap: AtomicU32, + /// Tells if you should look in the roaring bitmap or if all the IDs are already exhausted. + look_into_bitmap: AtomicBool, +} + +impl ConcurrentAvailableIds { + /// Creates an ID generator returning unique IDs, avoiding the specified used IDs. + pub fn new(used: RoaringBitmap) -> ConcurrentAvailableIds { + let last_id = used.max().map_or(0, |id| id + 1); + let used_ids = used.len(); + let available = RoaringBitmap::from_sorted_iter(0..last_id).unwrap() - used; + + ConcurrentAvailableIds { + current: AtomicU32::new(last_id), + used: AtomicU64::new(used_ids), + select_in_bitmap: AtomicU32::new(0), + look_into_bitmap: AtomicBool::new(!available.is_empty()), + available, + } + } + + /// Returns a new unique ID and increase the count of IDs used. + pub fn next(&self) -> Option { + if self.used.fetch_add(1, Ordering::Relaxed) > u32::MAX as u64 { + None + } else if self.look_into_bitmap.load(Ordering::Relaxed) { + let current = self.select_in_bitmap.fetch_add(1, Ordering::Relaxed); + match self.available.select(current) { + Some(id) => Some(id), + None => { + self.look_into_bitmap.store(false, Ordering::Relaxed); + Some(self.current.fetch_add(1, Ordering::Relaxed)) + } + } + } else { + Some(self.current.fetch_add(1, Ordering::Relaxed)) + } + } + + /// Returns the number of used ids in total. + pub fn used(&self) -> u64 { + self.used.load(Ordering::Relaxed) + } +} diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index c3c48a6eb..49bada8e7 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -27,7 +27,7 @@ use crate::update::del_add::{ }; use crate::update::index_documents::GrenadParameters; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; -use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; +use crate::update::{AvailableIds, UpdateIndexingStep}; use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; use crate::{ @@ -55,7 +55,7 @@ pub struct Transform<'a, 'i> { indexer_settings: &'a IndexerConfig, pub index_documents_method: IndexDocumentsMethod, - available_documents_ids: AvailableDocumentsIds, + available_documents_ids: AvailableIds, // Both grenad follows the same format: // key | value @@ -143,7 +143,7 @@ impl<'a, 'i> Transform<'a, 'i> { index, fields_ids_map: index.fields_ids_map(wtxn)?, indexer_settings, - available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids), + available_documents_ids: AvailableIds::new(&documents_ids), original_sorter, flattened_sorter, index_documents_method, diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 9842002a4..c5e9272de 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -1,5 +1,6 @@ -pub use self::available_documents_ids::AvailableDocumentsIds; +pub use self::available_ids::AvailableIds; pub use self::clear_documents::ClearDocuments; +pub use self::concurrent_available_ids::ConcurrentAvailableIds; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::index_documents::*; @@ -10,8 +11,9 @@ pub use self::word_prefix_docids::WordPrefixDocids; pub use self::words_prefix_integer_docids::WordPrefixIntegerDocids; pub use self::words_prefixes_fst::WordsPrefixesFst; -mod available_documents_ids; +mod available_ids; mod clear_documents; +mod concurrent_available_ids; pub(crate) mod del_add; pub(crate) mod facet; mod index_documents;