From dfab6293c9f8829c41833e95d92963a6323f9b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 28 Oct 2023 12:56:46 +0200 Subject: [PATCH] Use an LMDB database to store the external documents ids --- index-scheduler/src/batch.rs | 7 +- meilisearch/src/routes/indexes/documents.rs | 4 +- milli/src/external_documents_ids.rs | 157 ++++++------------ milli/src/index.rs | 33 ++-- milli/src/update/clear_documents.rs | 5 +- milli/src/update/index_documents/transform.rs | 8 +- .../src/update/index_documents/typed_chunk.rs | 6 +- 7 files changed, 79 insertions(+), 141 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c4f9c12be..c273d8ebb 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1575,11 +1575,14 @@ fn delete_document_by_filter<'a>( } e => e.into(), })?; - let external_documents_ids = index.external_documents_ids(wtxn)?; + let external_documents_ids = index.external_documents_ids(); // FIXME: for filters matching a lot of documents, this will allocate a huge vec of external docids (strings). // Since what we have is an iterator, it would be better to delete in chunks let external_to_internal: std::result::Result, RoaringBitmap> = - external_documents_ids.find_external_id_of(candidates).only_external_ids().collect(); + external_documents_ids + .find_external_id_of(wtxn, candidates)? + .only_external_ids() + .collect(); let document_ids = match external_to_internal { Ok(external_ids) => external_ids, Err(remaining_ids) => panic!("Couldn't find some external ids {:?}", remaining_ids), diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 2afc1b5fb..b6950ae6e 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -612,8 +612,8 @@ fn retrieve_document>( let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let internal_id = index - .external_documents_ids(&txn)? - .get(doc_id.as_bytes()) + .external_documents_ids() + .get(&txn, doc_id)? .ok_or_else(|| MeilisearchHttpError::DocumentNotFound(doc_id.to_string()))?; let document = index diff --git a/milli/src/external_documents_ids.rs b/milli/src/external_documents_ids.rs index 02794609f..1bf08396a 100644 --- a/milli/src/external_documents_ids.rs +++ b/milli/src/external_documents_ids.rs @@ -1,12 +1,11 @@ -use std::borrow::Cow; use std::collections::HashMap; use std::convert::TryInto; -use std::fmt; -use fst::Streamer; +use heed::types::{OwnedType, Str}; +use heed::{Database, RoIter, RoTxn, RwTxn}; use roaring::RoaringBitmap; -use crate::DocumentId; +use crate::{DocumentId, BEU32}; pub enum DocumentOperationKind { Create, @@ -19,41 +18,31 @@ pub struct DocumentOperation { pub kind: DocumentOperationKind, } -pub struct ExternalDocumentsIds<'a>(fst::Map>); +pub struct ExternalDocumentsIds(Database>); -impl<'a> ExternalDocumentsIds<'a> { - pub fn new(fst: fst::Map>) -> ExternalDocumentsIds<'a> { - ExternalDocumentsIds(fst) - } - - pub fn into_static(self) -> ExternalDocumentsIds<'static> { - ExternalDocumentsIds(self.0.map_data(|c| Cow::Owned(c.into_owned())).unwrap()) +impl ExternalDocumentsIds { + pub fn new(db: Database>) -> ExternalDocumentsIds { + ExternalDocumentsIds(db) } /// Returns `true` if hard and soft external documents lists are empty. - pub fn is_empty(&self) -> bool { - self.0.is_empty() + pub fn is_empty(&self, rtxn: &RoTxn) -> heed::Result { + self.0.is_empty(rtxn).map_err(Into::into) } - pub fn get>(&self, external_id: A) -> Option { - let external_id = external_id.as_ref(); - self.0.get(external_id).map(|x| x.try_into().unwrap()) + pub fn get>(&self, rtxn: &RoTxn, external_id: A) -> heed::Result> { + Ok(self.0.get(rtxn, external_id.as_ref())?.map(|x| x.get().try_into().unwrap())) } /// An helper function to debug this type, returns an `HashMap` of both, /// soft and hard fst maps, combined. - pub fn to_hash_map(&self) -> HashMap { + pub fn to_hash_map(&self, rtxn: &RoTxn) -> heed::Result> { let mut map = HashMap::default(); - let mut stream = self.0.stream(); - while let Some((k, v)) = stream.next() { - let k = String::from_utf8(k.to_vec()).unwrap(); - map.insert(k, v.try_into().unwrap()); + for result in self.0.iter(rtxn)? { + let (external, internal) = result?; + map.insert(external.to_owned(), internal.get().try_into().unwrap()); } - map - } - - pub fn as_bytes(&self) -> &[u8] { - self.0.as_fst().as_bytes() + Ok(map) } /// Looks for the internal ids in the passed bitmap, and returns an iterator over the mapping between @@ -65,12 +54,12 @@ impl<'a> ExternalDocumentsIds<'a> { /// - `Err(remaining_ids)`: if the external ids for some of the requested internal ids weren't found. /// In that case the returned bitmap contains the internal ids whose external ids were not found after traversing /// the entire fst. - pub fn find_external_id_of( + pub fn find_external_id_of<'t>( &self, + rtxn: &'t RoTxn, internal_ids: RoaringBitmap, - ) -> ExternalToInternalOwnedIterator<'_> { - let it = ExternalToInternalOwnedIterator { stream: self.0.stream(), internal_ids }; - it + ) -> heed::Result> { + self.0.iter(rtxn).map(|iter| ExternalToInternalOwnedIterator { iter, internal_ids }) } /// Applies the list of operations passed as argument, modifying the current external to internal id mapping. @@ -81,84 +70,39 @@ impl<'a> ExternalDocumentsIds<'a> { /// /// - If attempting to delete a document that doesn't exist /// - If attempting to create a document that already exists - pub fn apply(&mut self, mut operations: Vec) { - operations.sort_unstable_by(|left, right| left.external_id.cmp(&right.external_id)); - operations.dedup_by(|left, right| left.external_id == right.external_id); - - let mut builder = fst::MapBuilder::memory(); - - let mut stream = self.0.stream(); - let mut next_stream = stream.next(); - let mut operations = operations.iter(); - let mut next_operation = operations.next(); - - loop { - (next_stream, next_operation) = match (next_stream.take(), next_operation.take()) { - (None, None) => break, - (None, Some(DocumentOperation { external_id, internal_id, kind })) => { - if matches!(kind, DocumentOperationKind::Delete) { + pub fn apply(&self, wtxn: &mut RwTxn, operations: Vec) -> heed::Result<()> { + for DocumentOperation { external_id, internal_id, kind } in operations { + match kind { + DocumentOperationKind::Create => { + // TODO should we get before insert to be able to detect bugs? + // if matches!(kind, DocumentOperationKind::Create) { + // panic!("Attempting to create an already-existing document"); + // } + self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?; + } + DocumentOperationKind::Delete => { + if !self.0.delete(wtxn, &external_id)? { panic!("Attempting to delete a non-existing document") } - builder.insert(external_id, (*internal_id).into()).unwrap(); - (None, operations.next()) } - (Some((k, v)), None) => { - builder.insert(k, v).unwrap(); - (stream.next(), None) - } - ( - current_stream @ Some((left_external_id, left_internal_id)), - current_operation @ Some(DocumentOperation { - external_id: right_external_id, - internal_id: right_internal_id, - kind, - }), - ) => match left_external_id.cmp(right_external_id.as_bytes()) { - std::cmp::Ordering::Less => { - builder.insert(left_external_id, left_internal_id).unwrap(); - (stream.next(), current_operation) - } - std::cmp::Ordering::Greater => { - builder.insert(right_external_id, (*right_internal_id).into()).unwrap(); - (current_stream, operations.next()) - } - std::cmp::Ordering::Equal => { - if matches!(kind, DocumentOperationKind::Create) { - panic!("Attempting to create an already-existing document"); - } - // we delete the document, so we just advance both iterators to skip in stream - (stream.next(), operations.next()) - } - }, } } - self.0 = builder.into_map().map_data(Cow::Owned).unwrap(); - } -} -impl fmt::Debug for ExternalDocumentsIds<'_> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("ExternalDocumentsIds").field(&self.to_hash_map()).finish() - } -} - -impl Default for ExternalDocumentsIds<'static> { - fn default() -> Self { - ExternalDocumentsIds(fst::Map::default().map_data(Cow::Owned).unwrap()) + Ok(()) } } /// An iterator over mappings between requested internal ids and external ids. /// /// See [`ExternalDocumentsIds::find_external_id_of`] for details. -pub struct ExternalToInternalOwnedIterator<'it> { - stream: fst::map::Stream<'it>, +pub struct ExternalToInternalOwnedIterator<'t> { + iter: RoIter<'t, Str, OwnedType>, internal_ids: RoaringBitmap, } -impl<'it> Iterator for ExternalToInternalOwnedIterator<'it> { +impl<'t> Iterator for ExternalToInternalOwnedIterator<'t> { /// A result indicating if a mapping was found, or if the stream was exhausted without finding all internal ids. - type Item = Result<(String, DocumentId), RoaringBitmap>; + type Item = Result<(&'t str, DocumentId), RoaringBitmap>; fn next(&mut self) -> Option { // if all requested ids were found, we won't find any other, so short-circuit @@ -166,23 +110,28 @@ impl<'it> Iterator for ExternalToInternalOwnedIterator<'it> { return None; } loop { - let Some((external, internal)) = self.stream.next() else { - // we exhausted the stream but we still have some internal ids to find - let remaining_ids = std::mem::take(&mut self.internal_ids); - return Some(Err(remaining_ids)); - // note: next calls to `next` will return `None` since we replaced the internal_ids - // with the default empty bitmap + let (external, internal) = match self.iter.next() { + Some(Ok((external, internal))) => (external, internal), + // TODO manage this better, remove panic + Some(Err(e)) => panic!("{}", e), + _ => { + // we exhausted the stream but we still have some internal ids to find + let remaining_ids = std::mem::take(&mut self.internal_ids); + return Some(Err(remaining_ids)); + // note: next calls to `next` will return `None` since we replaced the internal_ids + // with the default empty bitmap + } }; - let internal = internal.try_into().unwrap(); + let internal = internal.get(); let was_contained = self.internal_ids.remove(internal); if was_contained { - return Some(Ok((std::str::from_utf8(external).unwrap().to_owned(), internal))); + return Some(Ok((external, internal))); } } } } -impl<'it> ExternalToInternalOwnedIterator<'it> { +impl<'t> ExternalToInternalOwnedIterator<'t> { /// Returns the bitmap of internal ids whose external id are yet to be found pub fn remaining_internal_ids(&self) -> &RoaringBitmap { &self.internal_ids @@ -191,7 +140,7 @@ impl<'it> ExternalToInternalOwnedIterator<'it> { /// Consumes this iterator and returns an iterator over only the external ids, ignoring the internal ids. /// /// Use this when you don't need the mapping between the external and the internal ids. - pub fn only_external_ids(self) -> impl Iterator> + 'it { - self.map(|res| res.map(|(external, _internal)| external)) + pub fn only_external_ids(self) -> impl Iterator> + 't { + self.map(|res| res.map(|(external, _internal)| external.to_owned())) } } diff --git a/milli/src/index.rs b/milli/src/index.rs index d99c36b65..f8a37fb2b 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -51,7 +51,6 @@ pub mod main_key { /// It is concatenated with a big-endian encoded number (non-human readable). /// e.g. vector-hnsw0x0032. pub const VECTOR_HNSW_KEY_PREFIX: &str = "vector-hnsw"; - pub const EXTERNAL_DOCUMENTS_IDS_KEY: &str = "external-documents-ids"; pub const PRIMARY_KEY_KEY: &str = "primary-key"; pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields"; pub const USER_DEFINED_SEARCHABLE_FIELDS_KEY: &str = "user-defined-searchable-fields"; @@ -81,6 +80,7 @@ pub mod db_name { pub const EXACT_WORD_DOCIDS: &str = "exact-word-docids"; pub const WORD_PREFIX_DOCIDS: &str = "word-prefix-docids"; pub const EXACT_WORD_PREFIX_DOCIDS: &str = "exact-word-prefix-docids"; + pub const EXTERNAL_DOCUMENTS_IDS: &str = "external-documents-ids"; pub const DOCID_WORD_POSITIONS: &str = "docid-word-positions"; pub const WORD_PAIR_PROXIMITY_DOCIDS: &str = "word-pair-proximity-docids"; pub const WORD_PREFIX_PAIR_PROXIMITY_DOCIDS: &str = "word-prefix-pair-proximity-docids"; @@ -112,6 +112,9 @@ pub struct Index { /// Contains many different types (e.g. the fields ids map). pub(crate) main: PolyDatabase, + /// Maps the external documents ids with the internal document id. + pub external_documents_ids: Database>, + /// A word and all the documents ids containing the word. pub word_docids: Database, @@ -183,13 +186,15 @@ impl Index { ) -> Result { use db_name::*; - options.max_dbs(25); + options.max_dbs(26); unsafe { options.flag(Flags::MdbAlwaysFreePages) }; let env = options.open(path)?; let mut wtxn = env.write_txn()?; let main = env.create_poly_database(&mut wtxn, Some(MAIN))?; let word_docids = env.create_database(&mut wtxn, Some(WORD_DOCIDS))?; + let external_documents_ids = + env.create_database(&mut wtxn, Some(EXTERNAL_DOCUMENTS_IDS))?; let exact_word_docids = env.create_database(&mut wtxn, Some(EXACT_WORD_DOCIDS))?; let word_prefix_docids = env.create_database(&mut wtxn, Some(WORD_PREFIX_DOCIDS))?; let exact_word_prefix_docids = @@ -235,6 +240,7 @@ impl Index { Ok(Index { env, main, + external_documents_ids, word_docids, exact_word_docids, word_prefix_docids, @@ -386,29 +392,10 @@ impl Index { /* external documents ids */ - /// Writes the external documents ids and internal ids (i.e. `u32`). - pub(crate) fn put_external_documents_ids( - &self, - wtxn: &mut RwTxn, - external_documents_ids: &ExternalDocumentsIds<'_>, - ) -> heed::Result<()> { - self.main.put::<_, Str, ByteSlice>( - wtxn, - main_key::EXTERNAL_DOCUMENTS_IDS_KEY, - external_documents_ids.as_bytes(), - )?; - Ok(()) - } - /// Returns the external documents ids map which associate the external ids /// with the internal ids (i.e. `u32`). - pub fn external_documents_ids<'t>(&self, rtxn: &'t RoTxn) -> Result> { - let fst = self.main.get::<_, Str, ByteSlice>(rtxn, main_key::EXTERNAL_DOCUMENTS_IDS_KEY)?; - let fst = match fst { - Some(fst) => fst::Map::new(fst)?.map_data(Cow::Borrowed)?, - None => fst::Map::default().map_data(Cow::Owned)?, - }; - Ok(ExternalDocumentsIds::new(fst)) + pub fn external_documents_ids(&self) -> ExternalDocumentsIds { + ExternalDocumentsIds::new(self.external_documents_ids) } /* fields ids map */ diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index ca5f69808..7f528e928 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -1,7 +1,7 @@ use roaring::RoaringBitmap; use time::OffsetDateTime; -use crate::{ExternalDocumentsIds, FieldDistribution, Index, Result}; +use crate::{FieldDistribution, Index, Result}; pub struct ClearDocuments<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, @@ -20,6 +20,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { let Index { env: _env, main: _main, + external_documents_ids, word_docids, exact_word_docids, word_prefix_docids, @@ -54,7 +55,6 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { // We clean some of the main engine datastructures. self.index.put_words_fst(self.wtxn, &fst::Set::default())?; self.index.put_words_prefixes_fst(self.wtxn, &fst::Set::default())?; - self.index.put_external_documents_ids(self.wtxn, &ExternalDocumentsIds::default())?; self.index.put_documents_ids(self.wtxn, &empty_roaring)?; self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?; self.index.delete_geo_rtree(self.wtxn)?; @@ -62,6 +62,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { self.index.delete_vector_hnsw(self.wtxn)?; // Clear the other databases. + external_documents_ids.clear(self.wtxn)?; word_docids.clear(self.wtxn)?; exact_word_docids.clear(self.wtxn)?; word_prefix_docids.clear(self.wtxn)?; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 872230d99..98079e07b 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -162,7 +162,7 @@ impl<'a, 'i> Transform<'a, 'i> { FA: Fn() -> bool + Sync, { let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); - let external_documents_ids = self.index.external_documents_ids(wtxn)?; + let external_documents_ids = self.index.external_documents_ids(); let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let primary_key = cursor.primary_key().to_string(); @@ -221,7 +221,7 @@ impl<'a, 'i> Transform<'a, 'i> { let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { Entry::Occupied(entry) => *entry.get() as u32, Entry::Vacant(entry) => { - let docid = match external_documents_ids.get(entry.key()) { + let docid = match external_documents_ids.get(wtxn, entry.key())? { Some(docid) => { // If it was already in the list of replaced documents it means it was deleted // by the remove_document method. We should starts as if it never existed. @@ -373,7 +373,7 @@ impl<'a, 'i> Transform<'a, 'i> { to_remove.sort_unstable(); to_remove.dedup(); - let external_documents_ids = self.index.external_documents_ids(wtxn)?; + let external_documents_ids = self.index.external_documents_ids(); let mut documents_deleted = 0; let mut document_sorter_buffer = Vec::new(); @@ -410,7 +410,7 @@ impl<'a, 'i> Transform<'a, 'i> { // If the document was already in the db we mark it as a `to_delete` document. // Then we push the document in sorters in deletion mode. - let deleted_from_db = match external_documents_ids.get(&to_remove) { + let deleted_from_db = match external_documents_ids.get(wtxn, &to_remove)? { Some(docid) => { self.replaced_documents_ids.insert(docid); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 192f3d139..1b38be03b 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -194,10 +194,8 @@ pub(crate) fn write_typed_chunk_into_index( db.delete(wtxn, &BEU32::new(docid))?; } } - let mut external_documents_docids = index.external_documents_ids(wtxn)?.into_static(); - external_documents_docids.apply(operations); - index.put_external_documents_ids(wtxn, &external_documents_docids)?; - + let external_documents_docids = index.external_documents_ids(); + external_documents_docids.apply(wtxn, operations)?; index.put_documents_ids(wtxn, &docids)?; } TypedChunk::FieldIdWordCountDocids(fid_word_count_docids_iter) => {