mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-24 13:40:31 +01:00
Introduce the ConcurrentAvailableIds struct and rename the other to AvailableIds
This commit is contained in:
parent
271ce91b3b
commit
6487a67f2b
@ -3,12 +3,12 @@ use std::ops::RangeInclusive;
|
||||
|
||||
use roaring::bitmap::{IntoIter, RoaringBitmap};
|
||||
|
||||
pub struct AvailableDocumentsIds {
|
||||
pub struct AvailableIds {
|
||||
iter: Chain<IntoIter, RangeInclusive<u32>>,
|
||||
}
|
||||
|
||||
impl AvailableDocumentsIds {
|
||||
pub fn from_documents_ids(docids: &RoaringBitmap) -> AvailableDocumentsIds {
|
||||
impl AvailableIds {
|
||||
pub fn new(docids: &RoaringBitmap) -> AvailableIds {
|
||||
match docids.max() {
|
||||
Some(last_id) => {
|
||||
let mut available = RoaringBitmap::from_iter(0..last_id);
|
||||
@ -20,17 +20,17 @@ impl AvailableDocumentsIds {
|
||||
None => 1..=0, // empty range iterator
|
||||
};
|
||||
|
||||
AvailableDocumentsIds { iter: available.into_iter().chain(iter) }
|
||||
AvailableIds { iter: available.into_iter().chain(iter) }
|
||||
}
|
||||
None => {
|
||||
let empty = RoaringBitmap::new().into_iter();
|
||||
AvailableDocumentsIds { iter: empty.chain(0..=u32::MAX) }
|
||||
AvailableIds { iter: empty.chain(0..=u32::MAX) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for AvailableDocumentsIds {
|
||||
impl Iterator for AvailableIds {
|
||||
type Item = u32;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
@ -45,7 +45,7 @@ mod tests {
|
||||
#[test]
|
||||
fn empty() {
|
||||
let base = RoaringBitmap::new();
|
||||
let left = AvailableDocumentsIds::from_documents_ids(&base);
|
||||
let left = AvailableIds::new(&base);
|
||||
let right = 0..=u32::MAX;
|
||||
left.zip(right).take(500).for_each(|(l, r)| assert_eq!(l, r));
|
||||
}
|
||||
@ -58,7 +58,7 @@ mod tests {
|
||||
base.insert(100);
|
||||
base.insert(405);
|
||||
|
||||
let left = AvailableDocumentsIds::from_documents_ids(&base);
|
||||
let left = AvailableIds::new(&base);
|
||||
let right = (0..=u32::MAX).filter(|&n| n != 0 && n != 10 && n != 100 && n != 405);
|
||||
left.zip(right).take(500).for_each(|(l, r)| assert_eq!(l, r));
|
||||
}
|
59
milli/src/update/concurrent_available_ids.rs
Normal file
59
milli/src/update/concurrent_available_ids.rs
Normal file
@ -0,0 +1,59 @@
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
|
||||
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
/// A concurrent ID generate that will never return the same ID twice.
|
||||
#[derive(Debug)]
|
||||
pub struct ConcurrentAvailableIds {
|
||||
/// The current tree node ID we should use if there is no other IDs available.
|
||||
current: AtomicU32,
|
||||
/// The total number of tree node IDs used.
|
||||
used: AtomicU64,
|
||||
|
||||
/// A list of IDs to exhaust before picking IDs from `current`.
|
||||
available: RoaringBitmap,
|
||||
/// The current Nth ID to select in the bitmap.
|
||||
select_in_bitmap: AtomicU32,
|
||||
/// Tells if you should look in the roaring bitmap or if all the IDs are already exhausted.
|
||||
look_into_bitmap: AtomicBool,
|
||||
}
|
||||
|
||||
impl ConcurrentAvailableIds {
|
||||
/// Creates an ID generator returning unique IDs, avoiding the specified used IDs.
|
||||
pub fn new(used: RoaringBitmap) -> ConcurrentAvailableIds {
|
||||
let last_id = used.max().map_or(0, |id| id + 1);
|
||||
let used_ids = used.len();
|
||||
let available = RoaringBitmap::from_sorted_iter(0..last_id).unwrap() - used;
|
||||
|
||||
ConcurrentAvailableIds {
|
||||
current: AtomicU32::new(last_id),
|
||||
used: AtomicU64::new(used_ids),
|
||||
select_in_bitmap: AtomicU32::new(0),
|
||||
look_into_bitmap: AtomicBool::new(!available.is_empty()),
|
||||
available,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new unique ID and increase the count of IDs used.
|
||||
pub fn next(&self) -> Option<u32> {
|
||||
if self.used.fetch_add(1, Ordering::Relaxed) > u32::MAX as u64 {
|
||||
None
|
||||
} else if self.look_into_bitmap.load(Ordering::Relaxed) {
|
||||
let current = self.select_in_bitmap.fetch_add(1, Ordering::Relaxed);
|
||||
match self.available.select(current) {
|
||||
Some(id) => Some(id),
|
||||
None => {
|
||||
self.look_into_bitmap.store(false, Ordering::Relaxed);
|
||||
Some(self.current.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(self.current.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of used ids in total.
|
||||
pub fn used(&self) -> u64 {
|
||||
self.used.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
@ -27,7 +27,7 @@ use crate::update::del_add::{
|
||||
};
|
||||
use crate::update::index_documents::GrenadParameters;
|
||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
||||
use crate::update::{AvailableIds, UpdateIndexingStep};
|
||||
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
|
||||
use crate::{
|
||||
@ -55,7 +55,7 @@ pub struct Transform<'a, 'i> {
|
||||
|
||||
indexer_settings: &'a IndexerConfig,
|
||||
pub index_documents_method: IndexDocumentsMethod,
|
||||
available_documents_ids: AvailableDocumentsIds,
|
||||
available_documents_ids: AvailableIds,
|
||||
|
||||
// Both grenad follows the same format:
|
||||
// key | value
|
||||
@ -143,7 +143,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
index,
|
||||
fields_ids_map: index.fields_ids_map(wtxn)?,
|
||||
indexer_settings,
|
||||
available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids),
|
||||
available_documents_ids: AvailableIds::new(&documents_ids),
|
||||
original_sorter,
|
||||
flattened_sorter,
|
||||
index_documents_method,
|
||||
|
@ -1,5 +1,6 @@
|
||||
pub use self::available_documents_ids::AvailableDocumentsIds;
|
||||
pub use self::available_ids::AvailableIds;
|
||||
pub use self::clear_documents::ClearDocuments;
|
||||
pub use self::concurrent_available_ids::ConcurrentAvailableIds;
|
||||
pub use self::facet::bulk::FacetsUpdateBulk;
|
||||
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
|
||||
pub use self::index_documents::*;
|
||||
@ -10,8 +11,9 @@ pub use self::word_prefix_docids::WordPrefixDocids;
|
||||
pub use self::words_prefix_integer_docids::WordPrefixIntegerDocids;
|
||||
pub use self::words_prefixes_fst::WordsPrefixesFst;
|
||||
|
||||
mod available_documents_ids;
|
||||
mod available_ids;
|
||||
mod clear_documents;
|
||||
mod concurrent_available_ids;
|
||||
pub(crate) mod del_add;
|
||||
pub(crate) mod facet;
|
||||
mod index_documents;
|
||||
|
Loading…
x
Reference in New Issue
Block a user