From b14cca2ad9ac9c44cd0770ed38a1b4aa6ebb0466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 25 Oct 2020 18:32:01 +0100 Subject: [PATCH] Introduce the UpdateBuilder type along with some update operations --- src/indexing/mod.rs | 12 +- src/lib.rs | 4 +- src/search.rs | 13 +- src/subcommand/infos.rs | 17 +- src/subcommand/search.rs | 2 +- src/subcommand/serve.rs | 12 +- src/update/mod.rs | 5 + src/update/update_builder.rs | 356 +++++++++++++++++++++++++++++++ src/{ => update}/update_store.rs | 0 9 files changed, 382 insertions(+), 39 deletions(-) create mode 100644 src/update/mod.rs create mode 100644 src/update/update_builder.rs rename src/{ => update}/update_store.rs (100%) diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs index 471fb891c..6cbdccf33 100644 --- a/src/indexing/mod.rs +++ b/src/indexing/mod.rs @@ -340,7 +340,8 @@ where F: Fn(u32, u32) + Sync + Send, // to first delete the replaced documents for example. let mut wtxn = env.write_txn()?; - let contains_documents = index.documents_ids(&wtxn)?.map_or(false, |docids| !docids.is_empty()); + let mut documents_ids = index.documents_ids(&wtxn)?; + let contains_documents = !documents_ids.is_empty(); let write_method = if contains_documents { WriteMethod::GetMergePut } else { @@ -354,13 +355,8 @@ where F: Fn(u32, u32) + Sync + Send, index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?; // We merge the new documents ids with the existing ones. - match index.documents_ids(&wtxn)? { - Some(mut documents_ids) => { - documents_ids.union_with(&new_documents_ids); - index.put_documents_ids(&mut wtxn, &documents_ids)?; - }, - None => index.put_documents_ids(&mut wtxn, &new_documents_ids)?, - } + documents_ids.union_with(&new_documents_ids); + index.put_documents_ids(&mut wtxn, &documents_ids)?; debug!("Writing the docid word positions into LMDB on disk..."); merge_into_lmdb_database( diff --git a/src/lib.rs b/src/lib.rs index 8b4246ad7..53c7b19e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,11 +6,11 @@ mod indexing; mod mdfs; mod query_tokens; mod search; -mod update_store; pub mod heed_codec; pub mod proximity; pub mod subcommand; pub mod tokenizer; +pub mod update; use std::collections::HashMap; use std::hash::BuildHasherDefault; @@ -21,11 +21,11 @@ pub use self::criterion::{Criterion, default_criteria}; pub use self::fields_ids_map::FieldsIdsMap; pub use self::index::Index; pub use self::search::{Search, SearchResult}; -pub use self::update_store::UpdateStore; pub use self::heed_codec::{ RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec, ObkvCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec, }; +pub use self::update::UpdateStore; pub type FastMap4 = HashMap>; pub type FastMap8 = HashMap>; diff --git a/src/search.rs b/src/search.rs index 1e2329687..98376bf7e 100644 --- a/src/search.rs +++ b/src/search.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use fst::{IntoStreamer, Streamer}; @@ -81,7 +82,7 @@ impl<'a> Search<'a> { /// the associated documents ids. fn fetch_words_docids( &self, - fst: &fst::Set<&[u8]>, + fst: &fst::Set>, dfas: Vec<(String, bool, DFA)>, ) -> anyhow::Result, RoaringBitmap)>> { @@ -135,20 +136,14 @@ impl<'a> Search<'a> { pub fn execute(&self) -> anyhow::Result { let limit = self.limit; - let fst = match self.index.fst(self.rtxn)? { - Some(fst) => fst, - None => return Ok(Default::default()), - }; + let fst = self.index.words_fst(self.rtxn)?; // Construct the DFAs related to the query words. let dfas = match self.query.as_deref().map(Self::generate_query_dfas) { Some(dfas) if !dfas.is_empty() => dfas, _ => { // If the query is not set or results in no DFAs we return a placeholder. - let documents_ids = match self.index.documents_ids(self.rtxn)? { - Some(docids) => docids.iter().take(limit).collect(), - None => Vec::new(), - }; + let documents_ids = self.index.documents_ids(self.rtxn)?.iter().take(limit).collect(); return Ok(SearchResult { documents_ids, ..Default::default() }) }, }; diff --git a/src/subcommand/infos.rs b/src/subcommand/infos.rs index 6e17b1f93..63dae5a95 100644 --- a/src/subcommand/infos.rs +++ b/src/subcommand/infos.rs @@ -199,10 +199,10 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho let mut heap = BinaryHeap::with_capacity(limit + 1); if limit > 0 { - if let Some(fst) = index.fst(rtxn)? { - heap.push(Reverse((fst.as_fst().as_bytes().len(), format!("words-fst"), main_name))); - if heap.len() > limit { heap.pop(); } - } + let words_fst = index.words_fst(rtxn)?; + + heap.push(Reverse((words_fst.as_fst().as_bytes().len(), format!("words-fst"), main_name))); + if heap.len() > limit { heap.pop(); } if let Some(documents) = index.main.get::<_, Str, ByteSlice>(rtxn, "documents")? { heap.push(Reverse((documents.len(), format!("documents"), main_name))); @@ -265,13 +265,8 @@ fn export_words_fst(index: &Index, rtxn: &heed::RoTxn, output: PathBuf) -> anyho let mut output = File::create(&output) .with_context(|| format!("failed to create {} file", output.display()))?; - match index.fst(rtxn)? { - Some(fst) => output.write_all(fst.as_fst().as_bytes())?, - None => { - let fst = fst::Set::default(); - output.write_all(fst.as_fst().as_bytes())?; - }, - } + let words_fst = index.words_fst(rtxn)?; + output.write_all(words_fst.as_fst().as_bytes())?; Ok(()) } diff --git a/src/subcommand/search.rs b/src/subcommand/search.rs index 74b214b5a..0b7341c30 100644 --- a/src/subcommand/search.rs +++ b/src/subcommand/search.rs @@ -62,7 +62,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let result = index.search(&rtxn).query(query).execute().unwrap(); let mut stdout = io::stdout(); - let fields_ids_map = index.fields_ids_map(&rtxn)?.unwrap_or_default(); + let fields_ids_map = index.fields_ids_map(&rtxn)?; let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?; for (_id, record) in documents { diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index bdf601710..43302e7a1 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashSet; use std::fs::{File, create_dir_all}; use std::{mem, io}; @@ -156,13 +155,10 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { UpdateMeta::DocumentsAddition => { // We must use the write transaction of the update here. let rtxn = env_cloned.read_txn()?; - let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?.unwrap_or_default(); - let documents_ids = index_cloned.documents_ids(&rtxn)?.unwrap_or_default(); + let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?; + let documents_ids = index_cloned.documents_ids(&rtxn)?; let available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); - let users_ids_documents_ids = match index_cloned.users_ids_documents_ids(&rtxn).unwrap() { - Some(map) => map.map_data(Cow::Borrowed).unwrap(), - None => fst::Map::default().map_data(Cow::Owned).unwrap(), - }; + let users_ids_documents_ids = index_cloned.users_ids_documents_ids(&rtxn).unwrap(); let transform = Transform { fields_ids_map, @@ -395,7 +391,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let SearchResult { found_words, documents_ids } = search.execute().unwrap(); let mut documents = Vec::new(); - let fields_ids_map = index.fields_ids_map(&rtxn).unwrap().unwrap_or_default(); + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); for (_id, record) in index.documents(&rtxn, documents_ids).unwrap() { let mut record = record.iter() diff --git a/src/update/mod.rs b/src/update/mod.rs new file mode 100644 index 000000000..59b125020 --- /dev/null +++ b/src/update/mod.rs @@ -0,0 +1,5 @@ +mod update_builder; +mod update_store; + +pub use self::update_builder::UpdateBuilder; +pub use self::update_store::UpdateStore; diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs new file mode 100644 index 000000000..1910a8909 --- /dev/null +++ b/src/update/update_builder.rs @@ -0,0 +1,356 @@ +use std::borrow::Cow; +use std::convert::TryFrom; + +use fst::{IntoStreamer, Streamer}; +use grenad::CompressionType; +use itertools::Itertools; +use roaring::RoaringBitmap; + +use crate::{Index, BEU32}; + +pub struct UpdateBuilder { + log_every_n: usize, + max_nb_chunks: Option, + max_memory: usize, + linked_hash_map_size: usize, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + chunk_fusing_shrink_size: u64, + enable_chunk_fusing: bool, + indexing_jobs: Option, +} + +impl UpdateBuilder { + pub fn new() -> UpdateBuilder { + todo!() + } + + pub fn log_every_n(&mut self, log_every_n: usize) -> &mut Self { + self.log_every_n = log_every_n; + self + } + + pub fn max_nb_chunks(&mut self, max_nb_chunks: usize) -> &mut Self { + self.max_nb_chunks = Some(max_nb_chunks); + self + } + + pub fn max_memory(&mut self, max_memory: usize) -> &mut Self { + self.max_memory = max_memory; + self + } + + pub fn linked_hash_map_size(&mut self, linked_hash_map_size: usize) -> &mut Self { + self.linked_hash_map_size = linked_hash_map_size; + self + } + + pub fn chunk_compression_type(&mut self, chunk_compression_type: CompressionType) -> &mut Self { + self.chunk_compression_type = chunk_compression_type; + self + } + + pub fn chunk_compression_level(&mut self, chunk_compression_level: u32) -> &mut Self { + self.chunk_compression_level = Some(chunk_compression_level); + self + } + + pub fn chunk_fusing_shrink_size(&mut self, chunk_fusing_shrink_size: u64) -> &mut Self { + self.chunk_fusing_shrink_size = chunk_fusing_shrink_size; + self + } + + pub fn enable_chunk_fusing(&mut self, enable_chunk_fusing: bool) -> &mut Self { + self.enable_chunk_fusing = enable_chunk_fusing; + self + } + + pub fn indexing_jobs(&mut self, indexing_jobs: usize) -> &mut Self { + self.indexing_jobs = Some(indexing_jobs); + self + } + + pub fn clear_documents<'t, 'u, 'i>( + self, + wtxn: &'t mut heed::RwTxn<'u>, + index: &'i Index, + ) -> ClearDocuments<'t, 'u, 'i> + { + ClearDocuments::new(wtxn, index) + } + + pub fn delete_documents<'t, 'u, 'i>( + self, + wtxn: &'t mut heed::RwTxn<'u>, + index: &'i Index, + ) -> anyhow::Result> + { + DeleteDocuments::new(wtxn, index) + } + + pub fn index_documents<'t, 'u, 'i>( + self, + wtxn: &'t mut heed::RwTxn<'u>, + index: &'i Index, + ) -> IndexDocuments<'t, 'u, 'i> + { + IndexDocuments::new(wtxn, index) + } +} + +pub struct ClearDocuments<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'u>, + index: &'i Index, +} + +impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { + fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> ClearDocuments<'t, 'u, 'i> { + ClearDocuments { wtxn, index } + } + + pub fn execute(self) -> anyhow::Result { + let Index { + main: _main, + word_docids, + docid_word_positions, + word_pair_proximity_docids, + documents, + } = self.index; + + // We clear the word fst. + self.index.put_words_fst(self.wtxn, &fst::Set::default())?; + + // We clear the users ids documents ids. + self.index.put_users_ids_documents_ids(self.wtxn, &fst::Map::default())?; + + // We retrieve the documents ids. + let documents_ids = self.index.documents_ids(self.wtxn)?; + + // We clear the internal documents ids. + self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?; + + // We clear the word docids. + word_docids.clear(self.wtxn)?; + + // We clear the docid word positions. + docid_word_positions.clear(self.wtxn)?; + + // We clear the word pair proximity docids. + word_pair_proximity_docids.clear(self.wtxn)?; + + // We clear the documents themselves. + documents.clear(self.wtxn)?; + + Ok(documents_ids.len() as usize) + } +} + +pub struct DeleteDocuments<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'u>, + index: &'i Index, + users_ids_documents_ids: fst::Map>, + documents_ids: RoaringBitmap, +} + +impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { + fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> anyhow::Result> { + let users_ids_documents_ids = index + .users_ids_documents_ids(wtxn)? + .map_data(Cow::into_owned)?; + + Ok(DeleteDocuments { + wtxn, + index, + users_ids_documents_ids, + documents_ids: RoaringBitmap::new(), + }) + } + + pub fn delete_document(&mut self, docid: u32) { + self.documents_ids.insert(docid); + } + + pub fn delete_documents(&mut self, docids: &RoaringBitmap) { + self.documents_ids.union_with(docids); + } + + pub fn delete_user_id(&mut self, user_id: &str) -> Option { + let docid = self.users_ids_documents_ids.get(user_id).map(|id| u32::try_from(id).unwrap())?; + self.delete_document(docid); + Some(docid) + } + + pub fn execute(self) -> anyhow::Result { + // We retrieve remove the deleted documents ids and write them into the database. + let mut documents_ids = self.index.documents_ids(self.wtxn)?; + + // We can and must stop removing documents in a database that is empty. + if documents_ids.is_empty() { + return Ok(0); + } + + documents_ids.intersect_with(&self.documents_ids); + self.index.put_documents_ids(self.wtxn, &documents_ids)?; + + let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; + let id_field = fields_ids_map.id("id").expect(r#"the field "id" to be present"#); + + let Index { + main: _main, + word_docids, + docid_word_positions, + word_pair_proximity_docids, + documents, + } = self.index; + + // Retrieve the words and the users ids contained in the documents. + // TODO we must use a smallword instead of a string. + let mut words = Vec::new(); + let mut users_ids = Vec::new(); + for docid in &documents_ids { + // We create an iterator to be able to get the content and delete the document + // content itself. It's faster to acquire a cursor to get and delete, + // as we avoid traversing the LMDB B-Tree two times but only once. + let key = BEU32::new(docid); + let mut iter = documents.range_mut(self.wtxn, &(key..=key))?; + if let Some((_key, obkv)) = iter.next().transpose()? { + if let Some(content) = obkv.get(id_field) { + let user_id: String = serde_json::from_slice(content).unwrap(); + users_ids.push(user_id); + } + iter.del_current()?; + } + drop(iter); + + // We iterate througt the words positions of the document id, + // retrieve the word and delete the positions. + let mut iter = docid_word_positions.prefix_iter_mut(self.wtxn, &(docid, ""))?; + while let Some(result) = iter.next() { + let ((_docid, word), _positions) = result?; + // This boolean will indicate if we must remove this word from the words FST. + words.push((String::from(word), false)); + iter.del_current()?; + } + } + + // We create the FST map of the users ids that we must delete. + users_ids.sort_unstable(); + let users_ids_to_delete = fst::Set::from_iter(users_ids)?; + let users_ids_to_delete = fst::Map::from(users_ids_to_delete.into_fst()); + + let new_users_ids_documents_ids = { + // We acquire the current users ids documents ids map and create + // a difference operation between the current and to-delete users ids. + let users_ids_documents_ids = self.index.users_ids_documents_ids(self.wtxn)?; + let difference = users_ids_documents_ids.op().add(&users_ids_to_delete).difference(); + + // We stream the new users ids that does no more contains the to-delete users ids. + let mut iter = difference.into_stream(); + let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); + while let Some((userid, docids)) = iter.next() { + new_users_ids_documents_ids_builder.insert(userid, docids[0].value)?; + } + + // We create an FST map from the above builder. + new_users_ids_documents_ids_builder.into_map() + }; + + // We write the new users ids into the main database. + self.index.put_users_ids_documents_ids(self.wtxn, &new_users_ids_documents_ids)?; + + // Maybe we can improve the get performance of the words + // if we sort the words first, keeping the LMDB pages in cache. + words.sort_unstable(); + + // We iterate over the words and delete the documents ids + // from the word docids database. + for (word, must_remove) in &mut words { + // We create an iterator to be able to get the content and delete the word docids. + // It's faster to acquire a cursor to get and delete or put, as we avoid traversing + // the LMDB B-Tree two times but only once. + let mut iter = word_docids.prefix_iter_mut(self.wtxn, &word)?; + if let Some((key, mut docids)) = iter.next().transpose()? { + if key == word { + docids.difference_with(&mut documents_ids); + if docids.is_empty() { + iter.del_current()?; + *must_remove = true; + } else { + iter.put_current(key, &docids)?; + } + } + } + } + + // We construct an FST set that contains the words to delete from the words FST. + let words_to_delete = words.iter().filter_map(|(w, d)| if *d { Some(w) } else { None }); + let words_to_delete = fst::Set::from_iter(words_to_delete)?; + + let new_words_fst = { + // We retrieve the current words FST from the database. + let words_fst = self.index.words_fst(self.wtxn)?; + let difference = words_fst.op().add(&words_to_delete).difference(); + + // We stream the new users ids that does no more contains the to-delete users ids. + let mut new_words_fst_builder = fst::SetBuilder::memory(); + new_words_fst_builder.extend_stream(difference.into_stream())?; + + // We create an words FST set from the above builder. + new_words_fst_builder.into_set() + }; + + // We write the new words FST into the main database. + self.index.put_words_fst(self.wtxn, &new_words_fst)?; + + // We delete the documents ids that are under the pairs of words we found. + // TODO We can maybe improve this by using the `compute_words_pair_proximities` + // function instead of iterating over all the possible word pairs. + for ((w1, _), (w2, _)) in words.iter().cartesian_product(&words) { + let start = &(w1.as_str(), w2.as_str(), 0); + let end = &(w1.as_str(), w2.as_str(), 7); + let mut iter = word_pair_proximity_docids.range_mut(self.wtxn, &(start..=end))?; + while let Some(result) = iter.next() { + let ((w1, w2, prox), mut docids) = result?; + docids.difference_with(&documents_ids); + if docids.is_empty() { + iter.del_current()?; + } else { + iter.put_current(&(w1, w2, prox), &docids)?; + } + } + } + + Ok(documents_ids.len() as usize) + } +} + +pub enum IndexDocumentsMethod { + /// Replace the previous document with the new one, + /// removing all the already known attributes. + ReplaceDocuments, + + /// Merge the previous version of the document with the new version, + /// replacing old attributes values with the new ones and add the new attributes. + UpdateDocuments, +} + +pub struct IndexDocuments<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'u>, + index: &'i Index, + update_method: IndexDocumentsMethod, +} + +impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { + fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i> { + IndexDocuments { wtxn, index, update_method: IndexDocumentsMethod::ReplaceDocuments } + } + + pub fn index_documents_method(&mut self, method: IndexDocumentsMethod) -> &mut Self { + self.update_method = method; + self + } + + pub fn execute(self) -> anyhow::Result<()> { + todo!() + } +} diff --git a/src/update_store.rs b/src/update/update_store.rs similarity index 100% rename from src/update_store.rs rename to src/update/update_store.rs