From c4bd13bcdfbbc99aab3e57ba983a3ab6a1d0f587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 3 Oct 2019 15:04:11 +0200 Subject: [PATCH] Introduce many SingleStore wrappers --- src/error.rs | 73 ++++++++++++ src/lib.rs | 5 +- src/main.rs | 42 +++---- src/query_builder.rs | 31 +++-- src/serde/mod.rs | 4 +- src/serde/serializer.rs | 4 +- src/store/docs_words.rs | 37 ++++++ src/store/documents_fields.rs | 16 ++- src/store/main.rs | 63 ++++++++++ src/store/mod.rs | 79 ++++++++----- src/store/{words.rs => postings_lists.rs} | 64 ++++------ src/store/synonyms.rs | 2 +- src/store/updates.rs | 27 +++++ src/update/documents_addition.rs | 138 ++++++++++++++++++++++ src/update/documents_deletion.rs | 137 +++++++++++++++++++++ src/update/mod.rs | 65 ++++++++++ 16 files changed, 678 insertions(+), 109 deletions(-) create mode 100644 src/error.rs create mode 100644 src/store/docs_words.rs create mode 100644 src/store/main.rs rename src/store/{words.rs => postings_lists.rs} (54%) create mode 100644 src/store/updates.rs create mode 100644 src/update/documents_addition.rs create mode 100644 src/update/documents_deletion.rs create mode 100644 src/update/mod.rs diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 000000000..cf140c5e8 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,73 @@ +use std::{error, fmt}; +use crate::serde::SerializerError; + +#[derive(Debug)] +pub enum Error { + SchemaDiffer, + SchemaMissing, + WordIndexMissing, + MissingDocumentId, + RkvError(rkv::StoreError), + FstError(fst::Error), + RmpDecodeError(rmp_serde::decode::Error), + RmpEncodeError(rmp_serde::encode::Error), + BincodeError(bincode::Error), + SerializerError(SerializerError), +} + +impl From for Error { + fn from(error: rkv::StoreError) -> Error { + Error::RkvError(error) + } +} + +impl From for Error { + fn from(error: fst::Error) -> Error { + Error::FstError(error) + } +} + +impl From for Error { + fn from(error: rmp_serde::decode::Error) -> Error { + Error::RmpDecodeError(error) + } +} + +impl From for Error { + fn from(error: rmp_serde::encode::Error) -> Error { + Error::RmpEncodeError(error) + } +} + +impl From for Error { + fn from(error: bincode::Error) -> Error { + Error::BincodeError(error) + } +} + +impl From for Error { + fn from(error: SerializerError) -> Error { + Error::SerializerError(error) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::Error::*; + match self { + SchemaDiffer => write!(f, "schemas differ"), + SchemaMissing => write!(f, "this index does not have a schema"), + WordIndexMissing => write!(f, "this index does not have a word index"), + MissingDocumentId => write!(f, "document id is missing"), + RkvError(e) => write!(f, "rkv error; {}", e), + FstError(e) => write!(f, "fst error; {}", e), + RmpDecodeError(e) => write!(f, "rmp decode error; {}", e), + RmpEncodeError(e) => write!(f, "rmp encode error; {}", e), + BincodeError(e) => write!(f, "bincode error; {}", e), + SerializerError(e) => write!(f, "serializer error; {}", e), + } + } +} + +impl error::Error for Error { } + diff --git a/src/lib.rs b/src/lib.rs index f9867e5d9..d70a428d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,11 @@ mod automaton; +mod error; mod number; mod query_builder; +mod ranked_map; mod raw_document; mod reordered_attrs; -mod ranked_map; +mod update; pub mod criterion; pub mod raw_indexer; pub mod serde; @@ -11,6 +13,7 @@ pub mod store; pub use self::query_builder::QueryBuilder; pub use self::raw_document::RawDocument; +pub use self::error::Error; use self::number::{Number, ParseNumberError}; use self::ranked_map::RankedMap; diff --git a/src/main.rs b/src/main.rs index 4d8d8ed63..527156aa0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,36 +16,36 @@ fn main() { let created_arc = Manager::singleton().write().unwrap().get_or_create(path, Rkv::new).unwrap(); let env = created_arc.read().unwrap(); - let (words, synonyms, documents_fields) = store::create(&env, "test").unwrap(); + let index = store::create(&env, "test").unwrap(); - { - let mut writer = env.write().unwrap(); - let mut raw_indexer = RawIndexer::new(); + // { + // let mut writer = env.write().unwrap(); + // let mut raw_indexer = RawIndexer::new(); - let docid = DocumentId(0); - let attr = SchemaAttr(0); - let text = "Zut, l’aspirateur, j’ai oublié de l’éteindre !"; - raw_indexer.index_text(docid, attr, text); + // let docid = DocumentId(0); + // let attr = SchemaAttr(0); + // let text = "Zut, l’aspirateur, j’ai oublié de l’éteindre !"; + // raw_indexer.index_text(docid, attr, text); - let Indexed { words_doc_indexes, .. } = raw_indexer.build(); + // let Indexed { words_doc_indexes, .. } = raw_indexer.build(); - let mut fst_builder = fst::SetBuilder::memory(); - fst_builder.extend_iter(words_doc_indexes.keys()); - let bytes = fst_builder.into_inner().unwrap(); - let fst = fst::raw::Fst::from_bytes(bytes).unwrap(); - let fst = fst::Set::from(fst); + // let mut fst_builder = fst::SetBuilder::memory(); + // fst_builder.extend_iter(words_doc_indexes.keys()).unwrap(); + // let bytes = fst_builder.into_inner().unwrap(); + // let fst = fst::raw::Fst::from_bytes(bytes).unwrap(); + // let fst = fst::Set::from(fst); - words.put_words_fst(&mut writer, &fst).unwrap(); + // words.put_words_fst(&mut writer, &fst).unwrap(); - for (word, indexes) in words_doc_indexes { - words.put_words_indexes(&mut writer, &word, &indexes).unwrap(); - } + // for (word, indexes) in words_doc_indexes { + // words.put_words_indexes(&mut writer, &word, &indexes).unwrap(); + // } - writer.commit().unwrap(); - } + // writer.commit().unwrap(); + // } let reader = env.read().unwrap(); - let builder = QueryBuilder::new(words, synonyms); + let builder = QueryBuilder::new(index.main, index.postings_lists, index.synonyms); let documents = builder.query(&reader, "oubli", 0..20).unwrap(); println!("{:?}", documents); diff --git a/src/query_builder.rs b/src/query_builder.rs index 0fb146825..5812ba60b 100644 --- a/src/query_builder.rs +++ b/src/query_builder.rs @@ -11,16 +11,12 @@ use crate::raw_document::{RawDocument, raw_documents_from}; use crate::{Document, DocumentId, Highlight, TmpMatch, criterion::Criteria}; use crate::{store, reordered_attrs::ReorderedAttrs}; -pub struct Automatons { - // TODO better use Vec of SmallVec - automatons: Vec>, -} - pub struct QueryBuilder<'a> { criteria: Criteria<'a>, searchables_attrs: Option, timeout: Duration, - words_store: store::Words, + main_store: store::Main, + postings_lists_store: store::PostingsLists, synonyms_store: store::Synonyms, } @@ -34,7 +30,6 @@ fn multiword_rewrite_matches( // we sort the matches by word index to make them rewritable matches.sort_unstable_by_key(|(id, match_)| (*id, match_.attribute, match_.word_index)); - let start = Instant::now(); // for each attribute of each document for same_document_attribute in matches.linear_group_by_key(|(id, m)| (*id, m.attribute)) { @@ -128,7 +123,8 @@ fn fetch_raw_documents( automatons: &[Automaton], query_enhancer: &QueryEnhancer, searchables: Option<&ReorderedAttrs>, - words_store: &store::Words, + main_store: &store::Main, + postings_lists_store: &store::PostingsLists, ) -> Result, rkv::StoreError> { let mut matches = Vec::new(); @@ -138,14 +134,17 @@ fn fetch_raw_documents( let Automaton { index, is_exact, query_len, .. } = automaton; let dfa = automaton.dfa(); - let words = words_store.words_fst(reader)?; + let words = match main_store.words_fst(reader)? { + Some(words) => words, + None => return Ok(Vec::new()), + }; let mut stream = words.search(&dfa).into_stream(); while let Some(input) = stream.next() { let distance = dfa.eval(input).to_u8(); let is_exact = *is_exact && distance == 0 && input.len() == *query_len; - let doc_indexes = match words_store.word_indexes(reader, input)? { + let doc_indexes = match postings_lists_store.postings_list(reader, input)? { Some(doc_indexes) => doc_indexes, None => continue, }; @@ -187,12 +186,17 @@ fn fetch_raw_documents( } impl<'a> QueryBuilder<'a> { - pub fn new(words: store::Words, synonyms: store::Synonyms) -> QueryBuilder<'a> { + pub fn new( + main: store::Main, + postings_lists: store::PostingsLists, + synonyms: store::Synonyms, + ) -> QueryBuilder<'a> { QueryBuilder { criteria: Criteria::default(), searchables_attrs: None, timeout: Duration::from_secs(1), - words_store: words, + main_store: main, + postings_lists_store: postings_lists, synonyms_store: synonyms, } } @@ -222,7 +226,8 @@ impl<'a> QueryBuilder<'a> { &automatons, &query_enhancer, self.searchables_attrs.as_ref(), - &self.words_store, + &self.main_store, + &self.postings_lists_store, )?; let mut groups = vec![raw_documents.as_mut_slice()]; diff --git a/src/serde/mod.rs b/src/serde/mod.rs index 52c37e4f6..e3af21f89 100644 --- a/src/serde/mod.rs +++ b/src/serde/mod.rs @@ -68,10 +68,10 @@ impl fmt::Display for SerializerError { write!(f, "error while trying to parse a number: {}", e) }, SerializerError::UnserializableType { type_name } => { - write!(f, "{} are not a serializable type", type_name) + write!(f, "{} is not a serializable type", type_name) }, SerializerError::UnindexableType { type_name } => { - write!(f, "{} are not an indexable type", type_name) + write!(f, "{} is not an indexable type", type_name) }, SerializerError::UnrankableType { type_name } => { write!(f, "{} types can not be used for ranking", type_name) diff --git a/src/serde/serializer.rs b/src/serde/serializer.rs index aaade76ba..8764ce526 100644 --- a/src/serde/serializer.rs +++ b/src/serde/serializer.rs @@ -3,7 +3,9 @@ use serde::ser; use crate::{DocumentId, RankedMap}; use crate::raw_indexer::RawIndexer; -use super::{RamDocumentStore, SerializerError, ConvertToString, ConvertToNumber, Indexer}; +use crate::serde::RamDocumentStore; + +use super::{SerializerError, ConvertToString, ConvertToNumber, Indexer}; pub struct Serializer<'a> { pub schema: &'a Schema, diff --git a/src/store/docs_words.rs b/src/store/docs_words.rs new file mode 100644 index 000000000..330051298 --- /dev/null +++ b/src/store/docs_words.rs @@ -0,0 +1,37 @@ +use crate::DocumentId; + +#[derive(Copy, Clone)] +pub struct DocsWords { + pub(crate) docs_words: rkv::SingleStore, +} + +impl DocsWords { + pub fn doc_words( + &self, + reader: &T, + document_id: DocumentId, + ) -> Result, rkv::StoreError> + { + Ok(Some(fst::Set::default())) + } + + pub fn put_doc_words( + &self, + writer: &mut rkv::Writer, + document_id: DocumentId, + words: &fst::Set, + ) -> Result<(), rkv::StoreError> + { + unimplemented!() + } + + pub fn del_doc_words( + &self, + writer: &mut rkv::Writer, + document_id: DocumentId, + ) -> Result<(), rkv::StoreError> + { + let document_id_bytes = document_id.0.to_be_bytes(); + self.docs_words.delete(writer, document_id_bytes) + } +} diff --git a/src/store/documents_fields.rs b/src/store/documents_fields.rs index 38f7a4256..ca6fd67fc 100644 --- a/src/store/documents_fields.rs +++ b/src/store/documents_fields.rs @@ -2,16 +2,28 @@ use std::convert::TryFrom; use meilidb_schema::SchemaAttr; use crate::DocumentId; +#[derive(Copy, Clone)] pub struct DocumentsFields { pub(crate) documents_fields: rkv::SingleStore, } impl DocumentsFields { pub fn del_all_document_fields( - &mut self, + &self, writer: &mut rkv::Writer, document_id: DocumentId, - ) -> Result<(), rkv::StoreError> + ) -> Result + { + unimplemented!() + } + + pub fn put_document_field( + &self, + writer: &mut rkv::Writer, + document_id: DocumentId, + attribute: SchemaAttr, + value: &[u8], + ) -> Result, rkv::StoreError> { unimplemented!() } diff --git a/src/store/main.rs b/src/store/main.rs new file mode 100644 index 000000000..d0cce80e6 --- /dev/null +++ b/src/store/main.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; +use crate::store::WORDS_KEY; +use crate::RankedMap; + +#[derive(Copy, Clone)] +pub struct Main { + pub(crate) main: rkv::SingleStore, +} + +impl Main { + pub fn put_words_fst( + &self, + writer: &mut rkv::Writer, + fst: &fst::Set, + ) -> Result<(), rkv::StoreError> + { + let blob = rkv::Value::Blob(fst.as_fst().as_bytes()); + self.main.put(writer, WORDS_KEY, &blob) + } + + pub fn words_fst( + &self, + reader: &T, + ) -> Result, rkv::StoreError> + { + match self.main.get(reader, WORDS_KEY)? { + Some(rkv::Value::Blob(bytes)) => { + let len = bytes.len(); + let bytes = Arc::from(bytes); + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); + Ok(Some(fst::Set::from(fst))) + }, + Some(value) => panic!("invalid type {:?}", value), + None => Ok(None), + } + } + + pub fn put_ranked_map( + &self, + writer: &mut rkv::Writer, + ranked_map: &RankedMap, + ) -> Result<(), rkv::StoreError> + { + unimplemented!() + } + + pub fn ranked_map( + &self, + reader: &T, + ) -> Result + { + unimplemented!() + } + + pub fn put_number_of_documents u64>( + &self, + writer: &mut rkv::Writer, + func: F, + ) -> Result<(), rkv::StoreError> + { + unimplemented!() + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index 903607fd1..49297b9ff 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,10 +1,16 @@ +mod docs_words; mod documents_fields; +mod main; +mod postings_lists; mod synonyms; -mod words; +mod updates; +pub use self::docs_words::DocsWords; pub use self::documents_fields::{DocumentsFields, DocumentFieldsIter}; +pub use self::main::Main; +pub use self::postings_lists::PostingsLists; pub use self::synonyms::Synonyms; -pub use self::words::Words; +pub use self::updates::Updates; const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents"; const RANKED_MAP_KEY: &str = "ranked-map"; @@ -16,31 +22,41 @@ fn aligned_to(bytes: &[u8], align: usize) -> bool { (bytes as *const _ as *const () as usize) % align == 0 } -fn words_indexes_name(name: &str) -> String { - format!("{}-words-indexes", name) -} - -fn synonyms_name(name: &str) -> String { - format!("{}-synonyms", name) +fn postings_lists_name(name: &str) -> String { + format!("{}-postings-lists", name) } fn documents_fields_name(name: &str) -> String { format!("{}-documents-fields", name) } -pub fn create( - env: &rkv::Rkv, - name: &str, -) -> Result<(Words, Synonyms, DocumentsFields), rkv::StoreError> -{ +fn synonyms_name(name: &str) -> String { + format!("{}-synonyms", name) +} + +fn docs_words_name(name: &str) -> String { + format!("{}-docs-words", name) +} + +fn updates_name(name: &str) -> String { + format!("{}-updates", name) +} + +#[derive(Copy, Clone)] +pub struct Index { + pub main: Main, + pub postings_lists: PostingsLists, + pub documents_fields: DocumentsFields, + pub synonyms: Synonyms, + pub docs_words: DocsWords, + pub updates: Updates, +} + +pub fn create(env: &rkv::Rkv, name: &str) -> Result { open_options(env, name, rkv::StoreOptions::create()) } -pub fn open( - env: &rkv::Rkv, - name: &str, -) -> Result<(Words, Synonyms, DocumentsFields), rkv::StoreError> -{ +pub fn open(env: &rkv::Rkv, name: &str) -> Result { let mut options = rkv::StoreOptions::default(); options.create = false; open_options(env, name, options) @@ -50,23 +66,30 @@ fn open_options( env: &rkv::Rkv, name: &str, options: rkv::StoreOptions, -) -> Result<(Words, Synonyms, DocumentsFields), rkv::StoreError> +) -> Result { // create all the database names let main_name = name; - let words_indexes_name = words_indexes_name(name); - let synonyms_name = synonyms_name(name); + let postings_lists_name = postings_lists_name(name); let documents_fields_name = documents_fields_name(name); + let synonyms_name = synonyms_name(name); + let docs_words_name = docs_words_name(name); + let updates_name = updates_name(name); // open all the database names let main = env.open_single(main_name, options)?; - let words_indexes = env.open_single(words_indexes_name.as_str(), options)?; - let synonyms = env.open_single(synonyms_name.as_str(), options)?; + let postings_lists = env.open_single(postings_lists_name.as_str(), options)?; let documents_fields = env.open_single(documents_fields_name.as_str(), options)?; + let synonyms = env.open_single(synonyms_name.as_str(), options)?; + let docs_words = env.open_single(docs_words_name.as_str(), options)?; + let updates = env.open_single(updates_name.as_str(), options)?; - let words = Words { main, words_indexes }; - let synonyms = Synonyms { main, synonyms }; - let documents_fields = DocumentsFields { documents_fields }; - - Ok((words, synonyms, documents_fields)) + Ok(Index { + main: Main { main }, + postings_lists: PostingsLists { postings_lists }, + documents_fields: DocumentsFields { documents_fields }, + synonyms: Synonyms { synonyms }, + docs_words: DocsWords { docs_words }, + updates: Updates { updates }, + }) } diff --git a/src/store/words.rs b/src/store/postings_lists.rs similarity index 54% rename from src/store/words.rs rename to src/store/postings_lists.rs index face8a979..3af7077c0 100644 --- a/src/store/words.rs +++ b/src/store/postings_lists.rs @@ -1,46 +1,17 @@ use std::borrow::Cow; -use std::sync::Arc; use std::{mem, ptr}; use zerocopy::{AsBytes, LayoutVerified}; use crate::DocIndex; use crate::store::aligned_to; -use crate::store::WORDS_KEY; -pub struct Words { - pub(crate) main: rkv::SingleStore, - pub(crate) words_indexes: rkv::SingleStore, +#[derive(Copy, Clone)] +pub struct PostingsLists { + pub(crate) postings_lists: rkv::SingleStore, } -impl Words { - pub fn put_words_fst( - &self, - writer: &mut rkv::Writer, - fst: &fst::Set, - ) -> Result<(), rkv::StoreError> - { - let blob = rkv::Value::Blob(fst.as_fst().as_bytes()); - self.main.put(writer, WORDS_KEY, &blob) - } - - pub fn words_fst( - &self, - reader: &T, - ) -> Result - { - match self.main.get(reader, WORDS_KEY)? { - Some(rkv::Value::Blob(bytes)) => { - let len = bytes.len(); - let bytes = Arc::from(bytes); - let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); - Ok(fst::Set::from(fst)) - }, - Some(value) => panic!("invalid type {:?}", value), - None => panic!("could not find word index"), - } - } - - pub fn put_words_indexes( +impl PostingsLists { + pub fn put_postings_list( &self, writer: &mut rkv::Writer, word: &[u8], @@ -48,23 +19,35 @@ impl Words { ) -> Result<(), rkv::StoreError> { let blob = rkv::Value::Blob(words_indexes.as_bytes()); - self.main.put(writer, word, &blob) + self.postings_lists.put(writer, word, &blob) } - pub fn word_indexes<'a, T: rkv::Readable>( + pub fn del_postings_list( + &self, + writer: &mut rkv::Writer, + word: &[u8], + ) -> Result<(), rkv::StoreError> + { + self.postings_lists.delete(writer, word) + } + + pub fn postings_list<'a, T: rkv::Readable>( &self, reader: &'a T, word: &[u8], - ) -> Result>, rkv::StoreError> + ) -> Result>>, rkv::StoreError> { - let bytes = match self.main.get(reader, word)? { + let bytes = match self.postings_lists.get(reader, word)? { Some(rkv::Value::Blob(bytes)) => bytes, Some(value) => panic!("invalid type {:?}", value), None => return Ok(None), }; match LayoutVerified::new_slice(bytes) { - Some(layout) => Ok(Some(Cow::Borrowed(layout.into_slice()))), + Some(layout) => { + let set = sdset::Set::new(layout.into_slice()).unwrap(); + Ok(Some(Cow::Borrowed(set))) + }, None => { let len = bytes.len(); let elem_size = mem::size_of::(); @@ -81,7 +64,8 @@ impl Words { vec.set_len(elems); } - return Ok(Some(Cow::Owned(vec))) + let setbuf = sdset::SetBuf::new(vec).unwrap(); + return Ok(Some(Cow::Owned(setbuf))) } Ok(None) diff --git a/src/store/synonyms.rs b/src/store/synonyms.rs index 4cf1186cc..d605de35a 100644 --- a/src/store/synonyms.rs +++ b/src/store/synonyms.rs @@ -1,5 +1,5 @@ +#[derive(Copy, Clone)] pub struct Synonyms { - pub(crate) main: rkv::SingleStore, pub(crate) synonyms: rkv::SingleStore, } diff --git a/src/store/updates.rs b/src/store/updates.rs new file mode 100644 index 000000000..6279c1f7e --- /dev/null +++ b/src/store/updates.rs @@ -0,0 +1,27 @@ +use crate::update::Update; + +#[derive(Copy, Clone)] +pub struct Updates { + pub(crate) updates: rkv::SingleStore, +} + +impl Updates { + pub fn push_back( + &self, + writer: &mut rkv::Writer, + update: &Update, + ) -> Result + { + // let update = rmp_serde::to_vec_named(&addition)?; + unimplemented!() + } + + pub fn alternatives_to( + &self, + reader: &T, + word: &[u8], + ) -> Result, rkv::StoreError> + { + unimplemented!() + } +} diff --git a/src/update/documents_addition.rs b/src/update/documents_addition.rs new file mode 100644 index 000000000..7e87d3121 --- /dev/null +++ b/src/update/documents_addition.rs @@ -0,0 +1,138 @@ +use std::collections::HashSet; + +use fst::{SetBuilder, set::OpBuilder}; +use meilidb_schema::Schema; +use sdset::{SetOperation, duo::Union}; +use serde::Serialize; + +use crate::raw_indexer::RawIndexer; +use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; +use crate::store; +use crate::update::{push_documents_addition, apply_documents_deletion}; +use crate::{Error, RankedMap}; + +pub struct DocumentsAddition { + updates_store: store::Updates, + documents: Vec, +} + +impl DocumentsAddition { + pub fn new(updates_store: store::Updates) -> DocumentsAddition { + DocumentsAddition { updates_store, documents: Vec::new() } + } + + pub fn update_document(&mut self, document: D) { + self.documents.push(document); + } + + pub fn finalize(self, writer: &mut rkv::Writer) -> Result + where D: serde::Serialize + { + push_documents_addition(writer, self.updates_store, self.documents) + } +} + +pub fn apply_documents_addition( + writer: &mut rkv::Writer, + main_store: store::Main, + documents_fields_store: store::DocumentsFields, + postings_lists_store: store::PostingsLists, + docs_words_store: store::DocsWords, + schema: &Schema, + mut ranked_map: RankedMap, + addition: Vec, +) -> Result<(), Error> +{ + let mut document_ids = HashSet::new(); + let mut document_store = RamDocumentStore::new(); + let mut indexer = RawIndexer::new(); + + let identifier = schema.identifier_name(); + + for document in addition { + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + // 1. store the document id for future deletion + document_ids.insert(document_id); + + // 2. index the document fields in ram stores + let serializer = Serializer { + schema, + document_store: &mut document_store, + indexer: &mut indexer, + ranked_map: &mut ranked_map, + document_id, + }; + + document.serialize(serializer)?; + } + + // 1. remove the previous documents match indexes + let documents_to_insert = document_ids.iter().cloned().collect(); + apply_documents_deletion( + writer, + main_store, + documents_fields_store, + postings_lists_store, + docs_words_store, + schema, + ranked_map.clone(), + documents_to_insert, + )?; + + // 2. insert new document attributes in the database + for ((id, attr), value) in document_store.into_inner() { + documents_fields_store.put_document_field(writer, id, attr, &value)?; + } + + let indexed = indexer.build(); + let mut delta_words_builder = SetBuilder::memory(); + + for (word, delta_set) in indexed.words_doc_indexes { + delta_words_builder.insert(&word).unwrap(); + + let set = match postings_lists_store.postings_list(writer, &word)? { + Some(set) => Union::new(&set, &delta_set).into_set_buf(), + None => delta_set, + }; + + postings_lists_store.put_postings_list(writer, &word, &set)?; + } + + for (id, words) in indexed.docs_words { + docs_words_store.put_doc_words(writer, id, &words)?; + } + + let delta_words = delta_words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + let words = match main_store.words_fst(writer)? { + Some(words) => { + let op = OpBuilder::new() + .add(words.stream()) + .add(delta_words.stream()) + .r#union(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => delta_words, + }; + + main_store.put_words_fst(writer, &words)?; + main_store.put_ranked_map(writer, &ranked_map)?; + + let inserted_documents_len = document_ids.len() as u64; + main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?; + + Ok(()) +} diff --git a/src/update/documents_deletion.rs b/src/update/documents_deletion.rs new file mode 100644 index 000000000..8528eef06 --- /dev/null +++ b/src/update/documents_deletion.rs @@ -0,0 +1,137 @@ +use std::collections::{HashMap, HashSet, BTreeSet}; + +use fst::{SetBuilder, Streamer}; +use meilidb_schema::Schema; +use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; + +use crate::{DocumentId, RankedMap, Error}; +use crate::serde::extract_document_id; +use crate::update::push_documents_deletion; +use crate::store; + +pub struct DocumentsDeletion { + updates_store: store::Updates, + documents: Vec, +} + +impl DocumentsDeletion { + pub fn new(updates_store: store::Updates) -> DocumentsDeletion { + DocumentsDeletion { updates_store, documents: Vec::new() } + } + + pub fn delete_document_by_id(&mut self, document_id: DocumentId) { + self.documents.push(document_id); + } + + pub fn delete_document(&mut self, schema: &Schema, document: D) -> Result<(), Error> + where D: serde::Serialize, + { + let identifier = schema.identifier_name(); + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + self.delete_document_by_id(document_id); + + Ok(()) + } + + pub fn finalize(self, writer: &mut rkv::Writer) -> Result { + push_documents_deletion(writer, self.updates_store, self.documents) + } +} + +impl Extend for DocumentsDeletion { + fn extend>(&mut self, iter: T) { + self.documents.extend(iter) + } +} + +pub fn apply_documents_deletion( + writer: &mut rkv::Writer, + main_store: store::Main, + documents_fields_store: store::DocumentsFields, + postings_lists_store: store::PostingsLists, + docs_words_store: store::DocsWords, + schema: &Schema, + mut ranked_map: RankedMap, + deletion: Vec, +) -> Result<(), Error> +{ + let idset = SetBuf::from_dirty(deletion); + + // collect the ranked attributes according to the schema + let ranked_attrs: Vec<_> = schema.iter() + .filter_map(|(_, attr, prop)| { + if prop.is_ranked() { Some(attr) } else { None } + }) + .collect(); + + let mut words_document_ids = HashMap::new(); + for id in idset { + // remove all the ranked attributes from the ranked_map + for ranked_attr in &ranked_attrs { + ranked_map.remove(id, *ranked_attr); + } + + if let Some(words) = docs_words_store.doc_words(writer, id)? { + let mut stream = words.stream(); + while let Some(word) = stream.next() { + let word = word.to_vec(); + words_document_ids.entry(word).or_insert_with(Vec::new).push(id); + } + } + } + + let mut deleted_documents = HashSet::new(); + let mut removed_words = BTreeSet::new(); + for (word, document_ids) in words_document_ids { + let document_ids = SetBuf::from_dirty(document_ids); + + if let Some(doc_indexes) = postings_lists_store.postings_list(writer, &word)? { + let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id); + let doc_indexes = op.into_set_buf(); + + if !doc_indexes.is_empty() { + postings_lists_store.put_postings_list(writer, &word, &doc_indexes)?; + } else { + postings_lists_store.del_postings_list(writer, &word)?; + removed_words.insert(word); + } + } + + for id in document_ids { + if documents_fields_store.del_all_document_fields(writer, id)? != 0 { + deleted_documents.insert(id); + } + docs_words_store.del_doc_words(writer, id)?; + } + } + + let removed_words = fst::Set::from_iter(removed_words).unwrap(); + let words = match main_store.words_fst(writer)? { + Some(words_set) => { + let op = fst::set::OpBuilder::new() + .add(words_set.stream()) + .add(removed_words.stream()) + .difference(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => fst::Set::default(), + }; + + main_store.put_words_fst(writer, &words)?; + main_store.put_ranked_map(writer, &ranked_map)?; + + let deleted_documents_len = deleted_documents.len() as u64; + main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?; + + Ok(()) +} diff --git a/src/update/mod.rs b/src/update/mod.rs new file mode 100644 index 000000000..85621a9fd --- /dev/null +++ b/src/update/mod.rs @@ -0,0 +1,65 @@ +mod documents_addition; +mod documents_deletion; + +pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; +pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; + +use std::collections::BTreeMap; +use serde::{Serialize, Deserialize}; +use crate::{store, DocumentId}; +use super::Error; + +#[derive(Serialize, Deserialize)] +pub enum Update { + DocumentsAddition(Vec), + DocumentsDeletion(Vec), + SynonymsAddition(BTreeMap>), + SynonymsDeletion(BTreeMap>>), +} + +pub fn push_documents_addition( + writer: &mut rkv::Writer, + updates_store: store::Updates, + addition: Vec, +) -> Result +{ + let mut values = Vec::with_capacity(addition.len()); + for add in addition { + let vec = rmp_serde::to_vec_named(&add)?; + let add = rmp_serde::from_read(&vec[..])?; + values.push(add); + } + + let update = Update::DocumentsAddition(values); + Ok(updates_store.push_back(writer, &update)?) +} + +pub fn push_documents_deletion( + writer: &mut rkv::Writer, + updates_store: store::Updates, + deletion: Vec, +) -> Result +{ + let update = Update::DocumentsDeletion(deletion); + Ok(updates_store.push_back(writer, &update)?) +} + +pub fn push_synonyms_addition( + writer: &mut rkv::Writer, + updates_store: store::Updates, + addition: BTreeMap>, +) -> Result +{ + let update = Update::SynonymsAddition(addition); + Ok(updates_store.push_back(writer, &update)?) +} + +pub fn push_synonyms_deletion( + writer: &mut rkv::Writer, + updates_store: store::Updates, + deletion: BTreeMap>>, +) -> Result +{ + let update = Update::SynonymsDeletion(deletion); + Ok(updates_store.push_back(writer, &update)?) +}