diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index e3e998cdf..1491cba31 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -5,28 +5,34 @@ authors = ["Kerollmops "] edition = "2018" [dependencies] -arc-swap = "0.3.11" -bincode = "1.1.2" +arc-swap = "0.4.2" +bincode = "1.1.4" deunicode = "1.0.0" -hashbrown = { version = "0.2.2", features = ["serde"] } +hashbrown = { version = "0.6.0", features = ["serde"] } +log = "0.4.6" meilidb-core = { path = "../meilidb-core", version = "0.1.0" } meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } ordered-float = { version = "1.0.2", features = ["serde"] } -rocksdb = { version = "0.12.2", default-features = false } sdset = "0.3.2" serde = { version = "1.0.99", features = ["derive"] } serde_json = "1.0.40" siphasher = "0.3.0" -zerocopy = "0.2.2" +sled = "0.26.0" +zerocopy = "0.2.8" [dependencies.rmp-serde] git = "https://github.com/3Hren/msgpack-rust.git" rev = "40b3d48" +[dependencies.rmpv] +git = "https://github.com/3Hren/msgpack-rust.git" +rev = "40b3d48" +features = ["with-serde"] + [dependencies.fst] git = "https://github.com/Kerollmops/fst.git" branch = "arc-byte-slice" [dev-dependencies] -tempfile = "3.0.7" +tempfile = "3.1.0" diff --git a/meilidb-data/src/database/custom_settings.rs b/meilidb-data/src/database/custom_settings.rs deleted file mode 100644 index 46653bfb0..000000000 --- a/meilidb-data/src/database/custom_settings.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::ops::Deref; -use crate::database::raw_index::InnerRawIndex; - -#[derive(Clone)] -pub struct CustomSettings(pub(crate) InnerRawIndex); - -impl Deref for CustomSettings { - type Target = InnerRawIndex; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/meilidb-data/src/database/documents_addition.rs b/meilidb-data/src/database/documents_addition.rs deleted file mode 100644 index 6e6c2003b..000000000 --- a/meilidb-data/src/database/documents_addition.rs +++ /dev/null @@ -1,134 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; - -use meilidb_core::DocumentId; -use fst::{SetBuilder, set::OpBuilder}; -use sdset::{SetOperation, duo::Union}; - -use crate::indexer::Indexer; -use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; -use crate::RankedMap; - -use super::{Error, Index, InnerIndex, DocumentsDeletion}; - -pub struct DocumentsAddition<'a> { - inner: &'a Index, - document_ids: HashSet, - document_store: RamDocumentStore, - indexer: Indexer, - ranked_map: RankedMap, -} - -impl<'a> DocumentsAddition<'a> { - pub fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> { - DocumentsAddition { - inner, - document_ids: HashSet::new(), - document_store: RamDocumentStore::new(), - indexer: Indexer::new(), - ranked_map, - } - } - - pub fn update_document(&mut self, document: D) -> Result<(), Error> - where D: serde::Serialize, - { - let schema = &self.inner.lease_inner().schema; - let identifier = schema.identifier_name(); - - let document_id = match extract_document_id(identifier, &document)? { - Some(id) => id, - None => return Err(Error::MissingDocumentId), - }; - - // 1. store the document id for future deletion - self.document_ids.insert(document_id); - - // 2. index the document fields in ram stores - let serializer = Serializer { - schema, - document_store: &mut self.document_store, - indexer: &mut self.indexer, - ranked_map: &mut self.ranked_map, - document_id, - }; - - document.serialize(serializer)?; - - Ok(()) - } - - pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let docs_words = &lease_inner.raw.docs_words; - let documents = &lease_inner.raw.documents; - let main = &lease_inner.raw.main; - let words = &lease_inner.raw.words; - - // 1. remove the previous documents match indexes - let mut documents_deletion = DocumentsDeletion::new(self.inner, self.ranked_map.clone()); - documents_deletion.extend(self.document_ids); - documents_deletion.finalize()?; - - // 2. insert new document attributes in the database - for ((id, attr), value) in self.document_store.into_inner() { - documents.set_document_field(id, attr, value)?; - } - - let indexed = self.indexer.build(); - let mut delta_words_builder = SetBuilder::memory(); - - for (word, delta_set) in indexed.words_doc_indexes { - delta_words_builder.insert(&word).unwrap(); - - let set = match words.doc_indexes(&word)? { - Some(set) => Union::new(&set, &delta_set).into_set_buf(), - None => delta_set, - }; - - words.set_doc_indexes(&word, &set)?; - } - - for (id, words) in indexed.docs_words { - docs_words.set_doc_words(id, &words)?; - } - - let delta_words = delta_words_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap(); - - let words = match main.words_set()? { - Some(words) => { - let op = OpBuilder::new() - .add(words.stream()) - .add(delta_words.stream()) - .r#union(); - - let mut words_builder = SetBuilder::memory(); - words_builder.extend_stream(op).unwrap(); - words_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }, - None => delta_words, - }; - - main.set_words_set(&words)?; - main.set_ranked_map(&self.ranked_map)?; - - // update the "consistent" view of the Index - let words = Arc::new(words); - let ranked_map = self.ranked_map; - let synonyms = lease_inner.synonyms.clone(); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); - - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); - - Ok(()) - } -} diff --git a/meilidb-data/src/database/documents_deletion.rs b/meilidb-data/src/database/documents_deletion.rs deleted file mode 100644 index d1413e353..000000000 --- a/meilidb-data/src/database/documents_deletion.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::collections::{HashMap, BTreeSet}; -use std::sync::Arc; - -use fst::{SetBuilder, Streamer}; -use meilidb_core::DocumentId; -use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; - -use crate::RankedMap; -use crate::serde::extract_document_id; - -use super::{Index, Error, InnerIndex}; - -pub struct DocumentsDeletion<'a> { - inner: &'a Index, - documents: Vec, - ranked_map: RankedMap, -} - -impl<'a> DocumentsDeletion<'a> { - pub fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsDeletion { - DocumentsDeletion { inner, documents: Vec::new(), ranked_map } - } - - fn delete_document_by_id(&mut self, id: DocumentId) { - self.documents.push(id); - } - - pub fn delete_document(&mut self, document: D) -> Result<(), Error> - where D: serde::Serialize, - { - let schema = &self.inner.lease_inner().schema; - let identifier = schema.identifier_name(); - - let document_id = match extract_document_id(identifier, &document)? { - Some(id) => id, - None => return Err(Error::MissingDocumentId), - }; - - self.delete_document_by_id(document_id); - - Ok(()) - } - - pub fn finalize(mut self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let docs_words = &lease_inner.raw.docs_words; - let documents = &lease_inner.raw.documents; - let main = &lease_inner.raw.main; - let schema = &lease_inner.schema; - let words = &lease_inner.raw.words; - - let idset = SetBuf::from_dirty(self.documents); - - // collect the ranked attributes according to the schema - let ranked_attrs: Vec<_> = schema.iter() - .filter_map(|(_, attr, prop)| { - if prop.is_ranked() { Some(attr) } else { None } - }) - .collect(); - - let mut words_document_ids = HashMap::new(); - for id in idset { - // remove all the ranked attributes from the ranked_map - for ranked_attr in &ranked_attrs { - self.ranked_map.remove(id, *ranked_attr); - } - - if let Some(words) = docs_words.doc_words(id)? { - let mut stream = words.stream(); - while let Some(word) = stream.next() { - let word = word.to_vec(); - words_document_ids.entry(word).or_insert_with(Vec::new).push(id); - } - } - } - - let mut removed_words = BTreeSet::new(); - for (word, document_ids) in words_document_ids { - let document_ids = SetBuf::from_dirty(document_ids); - - if let Some(doc_indexes) = words.doc_indexes(&word)? { - let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id); - let doc_indexes = op.into_set_buf(); - - if !doc_indexes.is_empty() { - words.set_doc_indexes(&word, &doc_indexes)?; - } else { - words.del_doc_indexes(&word)?; - removed_words.insert(word); - } - } - - for id in document_ids { - documents.del_all_document_fields(id)?; - docs_words.del_doc_words(id)?; - } - } - - let removed_words = fst::Set::from_iter(removed_words).unwrap(); - let words = match main.words_set()? { - Some(words_set) => { - let op = fst::set::OpBuilder::new() - .add(words_set.stream()) - .add(removed_words.stream()) - .difference(); - - let mut words_builder = SetBuilder::memory(); - words_builder.extend_stream(op).unwrap(); - words_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }, - None => fst::Set::default(), - }; - - main.set_words_set(&words)?; - main.set_ranked_map(&self.ranked_map)?; - - // update the "consistent" view of the Index - let words = Arc::new(words); - let ranked_map = lease_inner.ranked_map.clone(); - let synonyms = lease_inner.synonyms.clone(); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); - - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); - - Ok(()) - } -} - -impl<'a> Extend for DocumentsDeletion<'a> { - fn extend>(&mut self, iter: T) { - self.documents.extend(iter) - } -} diff --git a/meilidb-data/src/database/documents_index.rs b/meilidb-data/src/database/documents_index.rs deleted file mode 100644 index 361f9facb..000000000 --- a/meilidb-data/src/database/documents_index.rs +++ /dev/null @@ -1,90 +0,0 @@ -use std::convert::TryInto; - -use meilidb_core::DocumentId; -use meilidb_schema::SchemaAttr; -use rocksdb::DBVector; - -use crate::database::raw_index::InnerRawIndex; -use crate::document_attr_key::DocumentAttrKey; - -#[derive(Clone)] -pub struct DocumentsIndex(pub(crate) InnerRawIndex); - -impl DocumentsIndex { - pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result, rocksdb::Error> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.get(key) - } - - pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> Result<(), rocksdb::Error> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.set(key, value)?; - Ok(()) - } - - pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.delete(key)?; - Ok(()) - } - - pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> { - let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); - let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - self.0.delete_range(start, end)?; - Ok(()) - } - - pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { - let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); - let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - - let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward); - let iter = self.0.iterator(from).unwrap(); - - DocumentFieldsIter(iter, end.to_vec()) - } - - pub fn len(&self) -> Result { - let mut last_document_id = None; - let mut count = 0; - - let from = rocksdb::IteratorMode::Start; - let iterator = self.0.iterator(from)?; - - for (key, _) in iterator { - let slice = key.as_ref().try_into().unwrap(); - let document_id = DocumentAttrKey::from_be_bytes(slice).document_id; - - if Some(document_id) != last_document_id { - last_document_id = Some(document_id); - count += 1; - } - } - - Ok(count) - } -} - -pub struct DocumentFieldsIter<'a>(rocksdb::DBIterator<'a>, Vec); - -impl<'a> Iterator for DocumentFieldsIter<'a> { - type Item = Result<(SchemaAttr, Box<[u8]>), rocksdb::Error>; - - fn next(&mut self) -> Option { - match self.0.next() { - Some((key, value)) => { - - if key.as_ref() > self.1.as_ref() { - return None; - } - - let slice: &[u8] = key.as_ref(); - let array = slice.try_into().unwrap(); - let key = DocumentAttrKey::from_be_bytes(array); - Some(Ok((key.attribute, value))) - }, - None => None, - } - } -} diff --git a/meilidb-data/src/database/error.rs b/meilidb-data/src/database/error.rs index 99b90e056..1b5b11b02 100644 --- a/meilidb-data/src/database/error.rs +++ b/meilidb-data/src/database/error.rs @@ -7,15 +7,17 @@ pub enum Error { SchemaMissing, WordIndexMissing, MissingDocumentId, - RocksdbError(rocksdb::Error), + SledError(sled::Error), FstError(fst::Error), + RmpDecodeError(rmp_serde::decode::Error), + RmpEncodeError(rmp_serde::encode::Error), BincodeError(bincode::Error), SerializerError(SerializerError), } -impl From for Error { - fn from(error: rocksdb::Error) -> Error { - Error::RocksdbError(error) +impl From for Error { + fn from(error: sled::Error) -> Error { + Error::SledError(error) } } @@ -25,6 +27,18 @@ impl From for Error { } } +impl From for Error { + fn from(error: rmp_serde::decode::Error) -> Error { + Error::RmpDecodeError(error) + } +} + +impl From for Error { + fn from(error: rmp_serde::encode::Error) -> Error { + Error::RmpEncodeError(error) + } +} + impl From for Error { fn from(error: bincode::Error) -> Error { Error::BincodeError(error) @@ -45,8 +59,10 @@ impl fmt::Display for Error { SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), - RocksdbError(e) => write!(f, "RocksDB error; {}", e), + SledError(e) => write!(f, "Sled error; {}", e), FstError(e) => write!(f, "fst error; {}", e), + RmpDecodeError(e) => write!(f, "rmp decode error; {}", e), + RmpEncodeError(e) => write!(f, "rmp encode error; {}", e), BincodeError(e) => write!(f, "bincode error; {}", e), SerializerError(e) => write!(f, "serializer error; {}", e), } diff --git a/meilidb-data/src/database/index.rs b/meilidb-data/src/database/index.rs deleted file mode 100644 index edec6a89b..000000000 --- a/meilidb-data/src/database/index.rs +++ /dev/null @@ -1,170 +0,0 @@ -use sdset::SetBuf; -use std::collections::HashSet; -use std::sync::Arc; - -use arc_swap::{ArcSwap, Lease}; -use meilidb_core::criterion::Criteria; -use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; -use meilidb_schema::Schema; -use rmp_serde::decode::Error as RmpError; -use serde::de; - -use crate::ranked_map::RankedMap; -use crate::serde::Deserializer; - -use super::{Error, CustomSettings}; -use super::{ - RawIndex, - DocumentsAddition, DocumentsDeletion, - SynonymsAddition, SynonymsDeletion, -}; - -#[derive(Copy, Clone)] -pub struct IndexStats { - pub number_of_words: usize, - pub number_of_documents: usize, - pub number_attrs_in_ranked_map: usize, -} - -#[derive(Clone)] -pub struct Index(pub ArcSwap); - -pub struct InnerIndex { - pub words: Arc, - pub synonyms: Arc, - pub schema: Schema, - pub ranked_map: RankedMap, - pub raw: RawIndex, // TODO this will be a snapshot in the future -} - -impl Index { - pub fn from_raw(raw: RawIndex) -> Result { - let words = match raw.main.words_set()? { - Some(words) => Arc::new(words), - None => Arc::new(fst::Set::default()), - }; - - let synonyms = match raw.main.synonyms_set()? { - Some(synonyms) => Arc::new(synonyms), - None => Arc::new(fst::Set::default()), - }; - - let schema = match raw.main.schema()? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; - - let ranked_map = match raw.main.ranked_map()? { - Some(map) => map, - None => RankedMap::default(), - }; - - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - let index = Index(ArcSwap::new(Arc::new(inner))); - - Ok(index) - } - - pub fn stats(&self) -> Result { - let lease = self.0.lease(); - - Ok(IndexStats { - number_of_words: lease.words.len(), - number_of_documents: lease.raw.documents.len()?, - number_attrs_in_ranked_map: lease.ranked_map.len(), - }) - } - - pub fn query_builder(&self) -> QueryBuilder { - let lease = IndexLease(self.0.lease()); - QueryBuilder::new(lease) - } - - pub fn query_builder_with_criteria<'c>( - &self, - criteria: Criteria<'c>, - ) -> QueryBuilder<'c, IndexLease> - { - let lease = IndexLease(self.0.lease()); - QueryBuilder::with_criteria(lease, criteria) - } - - pub fn lease_inner(&self) -> Lease> { - self.0.lease() - } - - pub fn schema(&self) -> Schema { - self.0.lease().schema.clone() - } - - pub fn custom_settings(&self) -> CustomSettings { - self.0.lease().raw.custom.clone() - } - - pub fn documents_addition(&self) -> DocumentsAddition { - let ranked_map = self.0.lease().ranked_map.clone(); - DocumentsAddition::new(self, ranked_map) - } - - pub fn documents_deletion(&self) -> DocumentsDeletion { - let ranked_map = self.0.lease().ranked_map.clone(); - DocumentsDeletion::new(self, ranked_map) - } - - pub fn synonyms_addition(&self) -> SynonymsAddition { - SynonymsAddition::new(self) - } - - pub fn synonyms_deletion(&self) -> SynonymsDeletion { - SynonymsDeletion::new(self) - } - - pub fn document( - &self, - fields: Option<&HashSet<&str>>, - id: DocumentId, - ) -> Result, RmpError> - where T: de::DeserializeOwned, - { - let schema = &self.lease_inner().schema; - let fields = fields - .map(|fields| { - fields - .iter() - .filter_map(|name| schema.attribute(name)) - .collect() - }); - - let mut deserializer = Deserializer { - document_id: id, - index: &self, - fields: fields.as_ref(), - }; - - // TODO: currently we return an error if all document fields are missing, - // returning None would have been better - T::deserialize(&mut deserializer).map(Some) - } -} - -pub struct IndexLease(Lease>); - -impl Store for IndexLease { - type Error = Error; - - fn words(&self) -> Result<&fst::Set, Self::Error> { - Ok(&self.0.words) - } - - fn word_indexes(&self, word: &[u8]) -> Result>, Self::Error> { - Ok(self.0.raw.words.doc_indexes(word)?) - } - - fn synonyms(&self) -> Result<&fst::Set, Self::Error> { - Ok(&self.0.synonyms) - } - - fn alternatives_to(&self, word: &[u8]) -> Result, Self::Error> { - Ok(self.0.raw.synonyms.alternatives_to(word)?) - } -} diff --git a/meilidb-data/src/database/index/custom_settings_index.rs b/meilidb-data/src/database/index/custom_settings_index.rs new file mode 100644 index 000000000..bab62bd2d --- /dev/null +++ b/meilidb-data/src/database/index/custom_settings_index.rs @@ -0,0 +1,13 @@ +use std::sync::Arc; +use std::ops::Deref; + +#[derive(Clone)] +pub struct CustomSettingsIndex(pub(crate) Arc); + +impl Deref for CustomSettingsIndex { + type Target = sled::Tree; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/meilidb-data/src/database/docs_words_index.rs b/meilidb-data/src/database/index/docs_words_index.rs similarity index 76% rename from meilidb-data/src/database/docs_words_index.rs rename to meilidb-data/src/database/index/docs_words_index.rs index f4af69ee8..5407f1cd7 100644 --- a/meilidb-data/src/database/docs_words_index.rs +++ b/meilidb-data/src/database/index/docs_words_index.rs @@ -1,17 +1,14 @@ use std::sync::Arc; - use meilidb_core::DocumentId; - -use crate::database::raw_index::InnerRawIndex; -use super::Error; +use crate::database::Error; #[derive(Clone)] -pub struct DocsWordsIndex(pub(crate) InnerRawIndex); +pub struct DocsWordsIndex(pub Arc); impl DocsWordsIndex { pub fn doc_words(&self, id: DocumentId) -> Result, Error> { let key = id.0.to_be_bytes(); - match self.0.get_pinned(key)? { + match self.0.get(key)? { Some(bytes) => { let len = bytes.len(); let value = Arc::from(bytes.as_ref()); @@ -24,13 +21,13 @@ impl DocsWordsIndex { pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { let key = id.0.to_be_bytes(); - self.0.set(key, words.as_fst().as_bytes())?; + self.0.insert(key, words.as_fst().as_bytes())?; Ok(()) } pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { let key = id.0.to_be_bytes(); - self.0.delete(key)?; + self.0.remove(key)?; Ok(()) } } diff --git a/meilidb-data/src/database/index/documents_index.rs b/meilidb-data/src/database/index/documents_index.rs new file mode 100644 index 000000000..f9e5d6122 --- /dev/null +++ b/meilidb-data/src/database/index/documents_index.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use std::convert::TryInto; +use std::ops::Bound; + +use meilidb_core::DocumentId; +use meilidb_schema::SchemaAttr; + +use crate::document_attr_key::DocumentAttrKey; + +fn document_fields_range(id: DocumentId) -> (Bound<[u8; 10]>, Bound<[u8; 10]>) { + let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); + let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); + + (Bound::Included(start), Bound::Included(end)) +} + +#[derive(Clone)] +pub struct DocumentsIndex(pub(crate) Arc); + +impl DocumentsIndex { + pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.get(key) + } + + pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.insert(key, value)?; + Ok(()) + } + + pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.remove(key)?; + Ok(()) + } + + pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { + let range = document_fields_range(id); + + for result in self.0.range(range) { + let (key, _) = result?; + self.0.remove(key)?; + } + + Ok(()) + } + + pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { + let range = document_fields_range(id); + + let iter = self.0.range(range); + DocumentFieldsIter(iter) + } + + pub fn len(&self) -> sled::Result { + let mut last_document_id = None; + let mut count = 0; + + for result in self.0.iter() { + let (key, _) = result?; + let array = key.as_ref().try_into().unwrap(); + let document_id = DocumentAttrKey::from_be_bytes(array).document_id; + + if Some(document_id) != last_document_id { + last_document_id = Some(document_id); + count += 1; + } + } + + Ok(count) + } +} + +pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); + +impl Iterator for DocumentFieldsIter<'_> { + type Item = sled::Result<(SchemaAttr, sled::IVec)>; + + fn next(&mut self) -> Option { + match self.0.next() { + Some(Ok((key, value))) => { + let array = key.as_ref().try_into().unwrap(); + let key = DocumentAttrKey::from_be_bytes(array); + Some(Ok((key.attribute, value))) + }, + Some(Err(e)) => return Some(Err(e)), + None => None, + } + } +} diff --git a/meilidb-data/src/database/main_index.rs b/meilidb-data/src/database/index/main_index.rs similarity index 78% rename from meilidb-data/src/database/main_index.rs rename to meilidb-data/src/database/index/main_index.rs index c58b76d47..3ae03f26f 100644 --- a/meilidb-data/src/database/main_index.rs +++ b/meilidb-data/src/database/index/main_index.rs @@ -2,10 +2,8 @@ use std::sync::Arc; use meilidb_schema::Schema; -use crate::database::raw_index::InnerRawIndex; use crate::ranked_map::RankedMap; - -use super::Error; +use crate::database::Error; const SCHEMA_KEY: &str = "schema"; const WORDS_KEY: &str = "words"; @@ -13,11 +11,11 @@ const SYNONYMS_KEY: &str = "synonyms"; const RANKED_MAP_KEY: &str = "ranked-map"; #[derive(Clone)] -pub struct MainIndex(pub(crate) InnerRawIndex); +pub struct MainIndex(pub(crate) Arc); impl MainIndex { pub fn schema(&self) -> Result, Error> { - match self.0.get_pinned(SCHEMA_KEY)? { + match self.0.get(SCHEMA_KEY)? { Some(bytes) => { let schema = Schema::read_from_bin(bytes.as_ref())?; Ok(Some(schema)) @@ -29,12 +27,12 @@ impl MainIndex { pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { let mut bytes = Vec::new(); schema.write_to_bin(&mut bytes)?; - self.0.set(SCHEMA_KEY, bytes)?; + self.0.insert(SCHEMA_KEY, bytes)?; Ok(()) } pub fn words_set(&self) -> Result, Error> { - match self.0.get_pinned(WORDS_KEY)? { + match self.0.get(WORDS_KEY)? { Some(bytes) => { let len = bytes.len(); let value = Arc::from(bytes.as_ref()); @@ -46,11 +44,11 @@ impl MainIndex { } pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { - self.0.set(WORDS_KEY, value.as_fst().as_bytes()).map_err(Into::into) + self.0.insert(WORDS_KEY, value.as_fst().as_bytes()).map(drop).map_err(Into::into) } pub fn synonyms_set(&self) -> Result, Error> { - match self.0.get_pinned(SYNONYMS_KEY)? { + match self.0.get(SYNONYMS_KEY)? { Some(bytes) => { let len = bytes.len(); let value = Arc::from(bytes.as_ref()); @@ -62,11 +60,11 @@ impl MainIndex { } pub fn set_synonyms_set(&self, value: &fst::Set) -> Result<(), Error> { - self.0.set(SYNONYMS_KEY, value.as_fst().as_bytes()).map_err(Into::into) + self.0.insert(SYNONYMS_KEY, value.as_fst().as_bytes()).map(drop).map_err(Into::into) } pub fn ranked_map(&self) -> Result, Error> { - match self.0.get_pinned(RANKED_MAP_KEY)? { + match self.0.get(RANKED_MAP_KEY)? { Some(bytes) => { let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; Ok(Some(ranked_map)) @@ -78,7 +76,7 @@ impl MainIndex { pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { let mut bytes = Vec::new(); value.write_to_bin(&mut bytes)?; - self.0.set(RANKED_MAP_KEY, bytes)?; + self.0.insert(RANKED_MAP_KEY, bytes)?; Ok(()) } } diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs new file mode 100644 index 000000000..769f0fd17 --- /dev/null +++ b/meilidb-data/src/database/index/mod.rs @@ -0,0 +1,469 @@ +use std::collections::{HashSet, BTreeMap}; +use std::convert::TryInto; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use arc_swap::{ArcSwap, ArcSwapOption, Guard}; +use meilidb_core::criterion::Criteria; +use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; +use meilidb_schema::Schema; +use sdset::SetBuf; +use serde::{de, Serialize, Deserialize}; +use sled::Transactional; + +use crate::ranked_map::RankedMap; +use crate::serde::{Deserializer, DeserializerError}; + +pub use self::custom_settings_index::CustomSettingsIndex; +use self::docs_words_index::DocsWordsIndex; +use self::documents_index::DocumentsIndex; +use self::main_index::MainIndex; +use self::synonyms_index::SynonymsIndex; +use self::words_index::WordsIndex; + +use crate::database::{ + Error, + DocumentsAddition, DocumentsDeletion, + SynonymsAddition, SynonymsDeletion, + apply_documents_addition, apply_documents_deletion, + apply_synonyms_addition, apply_synonyms_deletion, +}; + +mod custom_settings_index; +mod docs_words_index; +mod documents_index; +mod main_index; +mod synonyms_index; +mod words_index; + +fn event_is_set(event: &sled::Event) -> bool { + match event { + sled::Event::Set(_, _) => true, + _ => false, + } +} + +#[derive(Deserialize)] +enum UpdateOwned { + DocumentsAddition(Vec), + DocumentsDeletion(Vec), + SynonymsAddition(BTreeMap>), + SynonymsDeletion(BTreeMap>>), +} + +#[derive(Serialize)] +enum Update { + DocumentsAddition(Vec), + DocumentsDeletion(Vec), + SynonymsAddition(BTreeMap>), + SynonymsDeletion(BTreeMap>>), +} + +#[derive(Clone, Serialize, Deserialize)] +pub enum UpdateType { + DocumentsAddition { number: usize }, + DocumentsDeletion { number: usize }, + SynonymsAddition { number: usize }, + SynonymsDeletion { number: usize }, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct DetailedDuration { + main: Duration, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct UpdateStatus { + pub update_id: u64, + pub update_type: UpdateType, + pub result: Result<(), String>, + pub detailed_duration: DetailedDuration, +} + +fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { + thread::spawn(move || { + loop { + let subscription = index.updates_index.watch_prefix(vec![]); + while let Some(result) = index.updates_index.iter().next() { + let (key, _) = result.unwrap(); + let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap(); + + let updates = &index.updates_index; + let results = &index.updates_results_index; + + (updates, results).transaction(|(updates, results)| { + let update = updates.remove(&key)?.unwrap(); + + let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() { + UpdateOwned::DocumentsAddition(documents) => { + let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + let ranked_map = index.cache.load().ranked_map.clone(); + let start = Instant::now(); + let result = apply_documents_addition(&index, ranked_map, documents); + (update_type, result, start.elapsed()) + }, + UpdateOwned::DocumentsDeletion(documents) => { + let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + let ranked_map = index.cache.load().ranked_map.clone(); + let start = Instant::now(); + let result = apply_documents_deletion(&index, ranked_map, documents); + (update_type, result, start.elapsed()) + }, + UpdateOwned::SynonymsAddition(synonyms) => { + let update_type = UpdateType::SynonymsAddition { number: synonyms.len() }; + let start = Instant::now(); + let result = apply_synonyms_addition(&index, synonyms); + (update_type, result, start.elapsed()) + }, + UpdateOwned::SynonymsDeletion(synonyms) => { + let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() }; + let start = Instant::now(); + let result = apply_synonyms_deletion(&index, synonyms); + (update_type, result, start.elapsed()) + }, + }; + + let detailed_duration = DetailedDuration { main: duration }; + let status = UpdateStatus { + update_id, + update_type, + result: result.map_err(|e| e.to_string()), + detailed_duration, + }; + + if let Some(callback) = &*index.update_callback.load() { + (callback)(status.clone()); + } + + let value = bincode::serialize(&status).unwrap(); + results.insert(&key, value) + }) + .unwrap(); + } + + // this subscription is just used to block + // the loop until a new update is inserted + subscription.filter(event_is_set).next(); + } + }) +} + +#[derive(Copy, Clone)] +pub struct IndexStats { + pub number_of_words: usize, + pub number_of_documents: usize, + pub number_attrs_in_ranked_map: usize, +} + +#[derive(Clone)] +pub struct Index { + pub(crate) cache: Arc>, + + // TODO this will be a snapshot in the future + main_index: MainIndex, + synonyms_index: SynonymsIndex, + words_index: WordsIndex, + docs_words_index: DocsWordsIndex, + documents_index: DocumentsIndex, + custom_settings_index: CustomSettingsIndex, + + // used by the update system + db: sled::Db, + updates_index: Arc, + updates_results_index: Arc, + update_callback: Arc>>, +} + +pub(crate) struct Cache { + pub words: Arc, + pub synonyms: Arc, + pub schema: Schema, + pub ranked_map: RankedMap, +} + +impl Index { + pub fn new(db: sled::Db, name: &str) -> Result { + Index::new_raw(db, name, None) + } + + pub fn with_schema(db: sled::Db, name: &str, schema: Schema) -> Result { + Index::new_raw(db, name, Some(schema)) + } + + fn new_raw(db: sled::Db, name: &str, schema: Option) -> Result { + let main_index = db.open_tree(name).map(MainIndex)?; + let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; + let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; + let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; + let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; + let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; + let updates_index = db.open_tree(format!("{}-updates", name))?; + let updates_results_index = db.open_tree(format!("{}-updates-results", name))?; + + let words = match main_index.words_set()? { + Some(words) => Arc::new(words), + None => Arc::new(fst::Set::default()), + }; + + let synonyms = match main_index.synonyms_set()? { + Some(synonyms) => Arc::new(synonyms), + None => Arc::new(fst::Set::default()), + }; + + let schema = match (schema, main_index.schema()?) { + (Some(ref expected), Some(ref current)) if current != expected => { + return Err(Error::SchemaDiffer) + }, + (Some(expected), Some(_)) => expected, + (Some(expected), None) => { + main_index.set_schema(&expected)?; + expected + }, + (None, Some(current)) => current, + (None, None) => return Err(Error::SchemaMissing), + }; + + let ranked_map = match main_index.ranked_map()? { + Some(map) => map, + None => RankedMap::default(), + }; + + let cache = Cache { words, synonyms, schema, ranked_map }; + let cache = Arc::new(ArcSwap::from_pointee(cache)); + + let index = Index { + cache, + main_index, + synonyms_index, + words_index, + docs_words_index, + documents_index, + custom_settings_index, + db, + updates_index, + updates_results_index, + update_callback: Arc::new(ArcSwapOption::empty()), + }; + + let _handle = spawn_update_system(index.clone()); + + Ok(index) + } + + pub fn set_update_callback(&self, callback: F) + where F: Fn(UpdateStatus) + Send + Sync + 'static + { + self.update_callback.store(Some(Arc::new(Box::new(callback)))); + } + + pub fn unset_update_callback(&self) { + self.update_callback.store(None); + } + + pub fn stats(&self) -> sled::Result { + let cache = self.cache.load(); + Ok(IndexStats { + number_of_words: cache.words.len(), + number_of_documents: self.documents_index.len()?, + number_attrs_in_ranked_map: cache.ranked_map.len(), + }) + } + + pub fn query_builder(&self) -> QueryBuilder { + let ref_index = self.as_ref(); + QueryBuilder::new(ref_index) + } + + pub fn query_builder_with_criteria<'c>( + &self, + criteria: Criteria<'c>, + ) -> QueryBuilder<'c, RefIndex> + { + let ref_index = self.as_ref(); + QueryBuilder::with_criteria(ref_index, criteria) + } + + pub fn as_ref(&self) -> RefIndex { + RefIndex { + cache: self.cache.load(), + main_index: &self.main_index, + synonyms_index: &self.synonyms_index, + words_index: &self.words_index, + docs_words_index: &self.docs_words_index, + documents_index: &self.documents_index, + custom_settings_index: &self.custom_settings_index, + } + } + + pub fn schema(&self) -> Schema { + self.cache.load().schema.clone() + } + + pub fn custom_settings(&self) -> CustomSettingsIndex { + self.custom_settings_index.clone() + } + + pub fn documents_addition(&self) -> DocumentsAddition { + DocumentsAddition::new(self) + } + + pub fn documents_deletion(&self) -> DocumentsDeletion { + DocumentsDeletion::new(self) + } + + pub fn synonyms_addition(&self) -> SynonymsAddition { + SynonymsAddition::new(self) + } + + pub fn synonyms_deletion(&self) -> SynonymsDeletion { + SynonymsDeletion::new(self) + } + + pub fn update_status( + &self, + update_id: u64, + ) -> Result, Error> + { + let update_id = update_id.to_be_bytes(); + match self.updates_results_index.get(update_id)? { + Some(value) => { + let value = bincode::deserialize(&value)?; + Ok(Some(value)) + }, + None => Ok(None), + } + } + + pub fn update_status_blocking( + &self, + update_id: u64, + ) -> Result + { + let update_id_bytes = update_id.to_be_bytes().to_vec(); + let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes); + + // if we find the update result return it now + if let Some(result) = self.update_status(update_id)? { + return Ok(result) + } + + // this subscription is used to block the thread + // until the update_id is inserted in the tree + subscription.next(); + + // the thread has been unblocked, it means that the update result + // has been inserted in the tree, retrieve it + Ok(self.update_status(update_id)?.unwrap()) + } + + pub fn document( + &self, + fields: Option<&HashSet<&str>>, + id: DocumentId, + ) -> Result, DeserializerError> + where T: de::DeserializeOwned, + { + let schema = self.schema(); + let fields = match fields { + Some(fields) => fields.into_iter().map(|name| schema.attribute(name)).collect(), + None => None, + }; + + let mut deserializer = Deserializer { + document_id: id, + index: &self, + fields: fields.as_ref(), + }; + + // TODO: currently we return an error if all document fields are missing, + // returning None would have been better + T::deserialize(&mut deserializer).map(Some) + } +} + +impl Index { + pub(crate) fn push_documents_addition(&self, addition: Vec) -> Result + where D: serde::Serialize + { + let mut values = Vec::with_capacity(addition.len()); + for add in addition { + let vec = rmp_serde::to_vec_named(&add)?; + let add = rmp_serde::from_read(&vec[..])?; + values.push(add); + } + + let addition = Update::DocumentsAddition(values); + let update = rmp_serde::to_vec_named(&addition)?; + self.raw_push_update(update) + } + + pub(crate) fn push_documents_deletion( + &self, + deletion: Vec, + ) -> Result + { + let deletion = Update::DocumentsDeletion(deletion); + let update = rmp_serde::to_vec_named(&deletion)?; + self.raw_push_update(update) + } + + pub(crate) fn push_synonyms_addition( + &self, + addition: BTreeMap>, + ) -> Result + { + let addition = Update::SynonymsAddition(addition); + let update = rmp_serde::to_vec_named(&addition)?; + self.raw_push_update(update) + } + + pub(crate) fn push_synonyms_deletion( + &self, + deletion: BTreeMap>>, + ) -> Result + { + let deletion = Update::SynonymsDeletion(deletion); + let update = rmp_serde::to_vec_named(&deletion)?; + self.raw_push_update(update) + } + + fn raw_push_update(&self, raw_update: Vec) -> Result { + let update_id = self.db.generate_id()?; + let update_id_array = update_id.to_be_bytes(); + + self.updates_index.insert(update_id_array, raw_update)?; + + Ok(update_id) + } +} + +pub struct RefIndex<'a> { + pub(crate) cache: Guard<'static, Arc>, + pub main_index: &'a MainIndex, + pub synonyms_index: &'a SynonymsIndex, + pub words_index: &'a WordsIndex, + pub docs_words_index: &'a DocsWordsIndex, + pub documents_index: &'a DocumentsIndex, + pub custom_settings_index: &'a CustomSettingsIndex, +} + +impl Store for RefIndex<'_> { + type Error = Error; + + fn words(&self) -> Result<&fst::Set, Self::Error> { + Ok(&self.cache.words) + } + + fn word_indexes(&self, word: &[u8]) -> Result>, Self::Error> { + Ok(self.words_index.doc_indexes(word)?) + } + + fn synonyms(&self) -> Result<&fst::Set, Self::Error> { + Ok(&self.cache.synonyms) + } + + fn alternatives_to(&self, word: &[u8]) -> Result, Self::Error> { + Ok(self.synonyms_index.alternatives_to(word)?) + } +} diff --git a/meilidb-data/src/database/index/synonyms_index.rs b/meilidb-data/src/database/index/synonyms_index.rs new file mode 100644 index 000000000..0d9fd9d7d --- /dev/null +++ b/meilidb-data/src/database/index/synonyms_index.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +#[derive(Clone)] +pub struct SynonymsIndex(pub(crate) Arc); + +impl SynonymsIndex { + pub fn alternatives_to(&self, word: &[u8]) -> sled::Result> { + match self.0.get(word)? { + Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())), + None => Ok(None), + } + } + + pub fn set_alternatives_to(&self, word: &[u8], value: Vec) -> sled::Result<()> { + self.0.insert(word, value).map(drop) + } + + pub fn del_alternatives_of(&self, word: &[u8]) -> sled::Result<()> { + self.0.remove(word).map(drop) + } +} diff --git a/meilidb-data/src/database/words_index.rs b/meilidb-data/src/database/index/words_index.rs similarity index 75% rename from meilidb-data/src/database/words_index.rs rename to meilidb-data/src/database/index/words_index.rs index 4f2163650..5b538f273 100644 --- a/meilidb-data/src/database/words_index.rs +++ b/meilidb-data/src/database/index/words_index.rs @@ -1,14 +1,14 @@ +use std::sync::Arc; + use meilidb_core::DocIndex; use sdset::{Set, SetBuf}; use zerocopy::{LayoutVerified, AsBytes}; -use crate::database::raw_index::InnerRawIndex; - #[derive(Clone)] -pub struct WordsIndex(pub(crate) InnerRawIndex); +pub struct WordsIndex(pub(crate) Arc); impl WordsIndex { - pub fn doc_indexes(&self, word: &[u8]) -> Result>, rocksdb::Error> { + pub fn doc_indexes(&self, word: &[u8]) -> sled::Result>> { // we must force an allocation to make the memory aligned match self.0.get(word)? { Some(bytes) => { @@ -36,13 +36,11 @@ impl WordsIndex { } } - pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> Result<(), rocksdb::Error> { - self.0.set(word, set.as_bytes())?; - Ok(()) + pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { + self.0.insert(word, set.as_bytes()).map(drop) } - pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> { - self.0.delete(word)?; - Ok(()) + pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { + self.0.remove(word).map(drop) } } diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs index 2edf774e0..602d9492e 100644 --- a/meilidb-data/src/database/mod.rs +++ b/meilidb-data/src/database/mod.rs @@ -1,88 +1,64 @@ use std::collections::hash_map::Entry; use std::collections::{HashSet, HashMap}; use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; use meilidb_schema::Schema; -mod custom_settings; -mod docs_words_index; -mod documents_addition; -mod documents_deletion; -mod documents_index; mod error; mod index; -mod main_index; -mod raw_index; -mod synonyms_addition; -mod synonyms_deletion; -mod synonyms_index; -mod words_index; +mod update; pub use self::error::Error; -pub use self::index::Index; -pub use self::custom_settings::CustomSettings; +pub use self::index::{Index, CustomSettingsIndex}; -use self::docs_words_index::DocsWordsIndex; -use self::documents_addition::DocumentsAddition; -use self::documents_deletion::DocumentsDeletion; -use self::synonyms_addition::SynonymsAddition; -use self::synonyms_deletion::SynonymsDeletion; -use self::documents_index::DocumentsIndex; -use self::index::InnerIndex; -use self::main_index::MainIndex; -use self::raw_index::{RawIndex, InnerRawIndex}; -use self::words_index::WordsIndex; -use self::synonyms_index::SynonymsIndex; +pub use self::update::DocumentsAddition; +pub use self::update::DocumentsDeletion; +pub use self::update::SynonymsAddition; +pub use self::update::SynonymsDeletion; + +use self::update::apply_documents_addition; +use self::update::apply_documents_deletion; +use self::update::apply_synonyms_addition; +use self::update::apply_synonyms_deletion; + +fn load_indexes(tree: &sled::Tree) -> Result, Error> { + match tree.get("indexes")? { + Some(bytes) => Ok(bincode::deserialize(&bytes)?), + None => Ok(HashSet::new()) + } +} pub struct Database { - cache: RwLock>>, - inner: Arc, + cache: RwLock>, + inner: sled::Db, } impl Database { - pub fn start_default>(path: P) -> Result { - let path = path.as_ref(); + pub fn open>(path: P) -> Result { let cache = RwLock::new(HashMap::new()); + let inner = sled::Db::open(path)?; - let options = { - let mut options = rocksdb::Options::default(); - options.create_if_missing(true); - options - }; - let cfs = rocksdb::DB::list_cf(&options, path).unwrap_or(Vec::new()); - let inner = Arc::new(rocksdb::DB::open_cf(&options, path, &cfs)?); + let indexes = load_indexes(&inner)?; let database = Database { cache, inner }; - let mut indexes: Vec<_> = cfs.iter() - .filter_map(|c| c.split('-').nth(0).filter(|&c| c != "default")) - .collect(); - indexes.sort_unstable(); - indexes.dedup(); - for index in indexes { - database.open_index(index)?; + database.open_index(&index)?; } Ok(database) } - pub fn indexes(&self) -> Result>, Error> { - let bytes = match self.inner.get("indexes")? { - Some(bytes) => bytes, - None => return Ok(None), - }; - - let indexes = bincode::deserialize(&bytes)?; - Ok(Some(indexes)) + pub fn indexes(&self) -> Result, Error> { + load_indexes(&self.inner) } fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { let bytes = bincode::serialize(value)?; - self.inner.put("indexes", bytes)?; + self.inner.insert("indexes", bytes)?; Ok(()) } - pub fn open_index(&self, name: &str) -> Result>, Error> { + pub fn open_index(&self, name: &str) -> Result, Error> { { let cache = self.cache.read().unwrap(); if let Some(index) = cache.get(name).cloned() { @@ -96,56 +72,19 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - if !self.indexes()?.map_or(false, |x| x.contains(name)) { + if !self.indexes()?.contains(name) { return Ok(None) } - let main = { - self.inner.cf_handle(name).expect("cf not found"); - MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name))) - }; - - let synonyms = { - let cf_name = format!("{}-synonyms", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - SynonymsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let words = { - let cf_name = format!("{}-words", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let docs_words = { - let cf_name = format!("{}-docs-words", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let documents = { - let cf_name = format!("{}-documents", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let custom = { - let cf_name = format!("{}-custom", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let raw_index = RawIndex { main, synonyms, words, docs_words, documents, custom }; - let index = Index::from_raw(raw_index)?; - - vacant.insert(Arc::new(index)).clone() + let index = Index::new(self.inner.clone(), name)?; + vacant.insert(index).clone() }, }; Ok(Some(index)) } - pub fn create_index(&self, name: &str, schema: Schema) -> Result, Error> { + pub fn create_index(&self, name: &str, schema: Schema) -> Result { let mut cache = self.cache.write().unwrap(); let index = match cache.entry(name.to_string()) { @@ -153,57 +92,13 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - let main = { - self.inner.create_cf(name, &rocksdb::Options::default())?; - MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name))) - }; + let index = Index::with_schema(self.inner.clone(), name, schema)?; - if let Some(prev_schema) = main.schema()? { - if prev_schema != schema { - return Err(Error::SchemaDiffer) - } - } - - main.set_schema(&schema)?; - - let synonyms = { - let cf_name = format!("{}-synonyms", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - SynonymsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let words = { - let cf_name = format!("{}-words", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let docs_words = { - let cf_name = format!("{}-docs-words", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let documents = { - let cf_name = format!("{}-documents", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let custom = { - let cf_name = format!("{}-custom", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); + let mut indexes = self.indexes()?; indexes.insert(name.to_string()); self.set_indexes(&indexes)?; - let raw_index = RawIndex { main, synonyms, words, docs_words, documents, custom }; - let index = Index::from_raw(raw_index)?; - - vacant.insert(Arc::new(index)).clone() + vacant.insert(index).clone() }, }; diff --git a/meilidb-data/src/database/raw_index.rs b/meilidb-data/src/database/raw_index.rs deleted file mode 100644 index 612fb0df1..000000000 --- a/meilidb-data/src/database/raw_index.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::sync::Arc; -use super::{MainIndex, SynonymsIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings}; - -#[derive(Clone)] -pub struct RawIndex { - pub main: MainIndex, - pub synonyms: SynonymsIndex, - pub words: WordsIndex, - pub docs_words: DocsWordsIndex, - pub documents: DocumentsIndex, - pub custom: CustomSettings, -} - -impl RawIndex { - pub(crate) fn compact(&self) { - self.main.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.synonyms.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.words.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.docs_words.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.documents.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.custom.0.compact_range(None::<&[u8]>, None::<&[u8]>); - } -} - -#[derive(Clone)] -pub struct InnerRawIndex { - database: Arc, - name: Arc, -} - -impl InnerRawIndex { - pub fn new(database: Arc, name: Arc) -> InnerRawIndex { - InnerRawIndex { database, name } - } - - pub fn get(&self, key: K) -> Result, rocksdb::Error> - where K: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.get_cf(cf, key) - } - - pub fn get_pinned(&self, key: K) -> Result, rocksdb::Error> - where K: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.get_pinned_cf(cf, key) - } - - pub fn iterator(&self, from: rocksdb::IteratorMode) -> Result { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.iterator_cf(cf, from) - } - - pub fn set(&self, key: K, value: V) -> Result<(), rocksdb::Error> - where K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.put_cf(cf, key, value) - } - - pub fn delete(&self, key: K) -> Result<(), rocksdb::Error> - where K: AsRef<[u8]> - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.delete_cf(cf, key) - } - - pub fn delete_range(&self, start: K, end: K) -> Result<(), rocksdb::Error> - where K: AsRef<[u8]>, - { - let mut batch = rocksdb::WriteBatch::default(); - - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - batch.delete_range_cf(cf, start, end)?; - - self.database.write(batch) - } - - pub fn compact_range(&self, start: Option, end: Option) - where S: AsRef<[u8]>, - E: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.compact_range_cf(cf, start, end) - } -} diff --git a/meilidb-data/src/database/synonyms_addition.rs b/meilidb-data/src/database/synonyms_addition.rs deleted file mode 100644 index e4c364387..000000000 --- a/meilidb-data/src/database/synonyms_addition.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::collections::BTreeMap; -use std::sync::Arc; - -use fst::{SetBuilder, set::OpBuilder}; -use meilidb_core::normalize_str; -use sdset::SetBuf; - -use crate::database::index::InnerIndex; -use super::{Error, Index}; - -pub struct SynonymsAddition<'a> { - inner: &'a Index, - synonyms: BTreeMap>, -} - -impl<'a> SynonymsAddition<'a> { - pub fn new(inner: &'a Index) -> SynonymsAddition<'a> { - SynonymsAddition { inner, synonyms: BTreeMap::new() } - } - - pub fn add_synonym(&mut self, synonym: S, alternatives: I) - where S: AsRef, - T: AsRef, - I: IntoIterator, - { - let synonym = normalize_str(synonym.as_ref()); - let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase()); - self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives); - } - - pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let synonyms = &lease_inner.raw.synonyms; - let main = &lease_inner.raw.main; - - let mut synonyms_builder = SetBuilder::memory(); - - for (synonym, alternatives) in self.synonyms { - synonyms_builder.insert(&synonym).unwrap(); - - let alternatives = { - let alternatives = SetBuf::from_dirty(alternatives); - let mut alternatives_builder = SetBuilder::memory(); - alternatives_builder.extend_iter(alternatives).unwrap(); - alternatives_builder.into_inner().unwrap() - }; - synonyms.set_alternatives_to(synonym.as_bytes(), alternatives)?; - } - - let delta_synonyms = synonyms_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap(); - - let synonyms = match main.synonyms_set()? { - Some(synonyms) => { - let op = OpBuilder::new() - .add(synonyms.stream()) - .add(delta_synonyms.stream()) - .r#union(); - - let mut synonyms_builder = SetBuilder::memory(); - synonyms_builder.extend_stream(op).unwrap(); - synonyms_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }, - None => delta_synonyms, - }; - - main.set_synonyms_set(&synonyms)?; - - // update the "consistent" view of the Index - let words = Arc::new(main.words_set()?.unwrap_or_default()); - let ranked_map = lease_inner.ranked_map.clone(); - let synonyms = Arc::new(synonyms); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); - - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); - - Ok(()) - } -} diff --git a/meilidb-data/src/database/synonyms_deletion.rs b/meilidb-data/src/database/synonyms_deletion.rs deleted file mode 100644 index 6056dba3f..000000000 --- a/meilidb-data/src/database/synonyms_deletion.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::collections::BTreeMap; -use std::iter::FromIterator; -use std::sync::Arc; - -use fst::{SetBuilder, set::OpBuilder}; -use meilidb_core::normalize_str; -use sdset::SetBuf; - -use crate::database::index::InnerIndex; -use super::{Error, Index}; - -pub struct SynonymsDeletion<'a> { - inner: &'a Index, - synonyms: BTreeMap>>, -} - -impl<'a> SynonymsDeletion<'a> { - pub fn new(inner: &'a Index) -> SynonymsDeletion<'a> { - SynonymsDeletion { inner, synonyms: BTreeMap::new() } - } - - pub fn delete_all_alternatives_of>(&mut self, synonym: S) { - let synonym = normalize_str(synonym.as_ref()); - self.synonyms.insert(synonym, None); - } - - pub fn delete_specific_alternatives_of(&mut self, synonym: S, alternatives: I) - where S: AsRef, - T: AsRef, - I: Iterator, - { - let synonym = normalize_str(synonym.as_ref()); - let value = self.synonyms.entry(synonym).or_insert(None); - let alternatives = alternatives.map(|s| s.as_ref().to_lowercase()); - match value { - Some(v) => v.extend(alternatives), - None => *value = Some(Vec::from_iter(alternatives)), - } - } - - pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let synonyms = &lease_inner.raw.synonyms; - let main = &lease_inner.raw.main; - - let mut delete_whole_synonym_builder = SetBuilder::memory(); - - for (synonym, alternatives) in self.synonyms { - match alternatives { - Some(alternatives) => { - let prev_alternatives = synonyms.alternatives_to(synonym.as_bytes())?; - let prev_alternatives = match prev_alternatives { - Some(alternatives) => alternatives, - None => continue, - }; - - let delta_alternatives = { - let alternatives = SetBuf::from_dirty(alternatives); - let mut builder = SetBuilder::memory(); - builder.extend_iter(alternatives).unwrap(); - builder.into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }; - - let op = OpBuilder::new() - .add(prev_alternatives.stream()) - .add(delta_alternatives.stream()) - .difference(); - - let (alternatives, empty_alternatives) = { - let mut builder = SetBuilder::memory(); - let len = builder.get_ref().len(); - builder.extend_stream(op).unwrap(); - let is_empty = len == builder.get_ref().len(); - let alternatives = builder.into_inner().unwrap(); - (alternatives, is_empty) - }; - - if empty_alternatives { - delete_whole_synonym_builder.insert(synonym.as_bytes())?; - } else { - synonyms.set_alternatives_to(synonym.as_bytes(), alternatives)?; - } - }, - None => { - delete_whole_synonym_builder.insert(&synonym).unwrap(); - synonyms.del_alternatives_of(synonym.as_bytes())?; - } - } - } - - let delta_synonyms = delete_whole_synonym_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap(); - - let synonyms = match main.synonyms_set()? { - Some(synonyms) => { - let op = OpBuilder::new() - .add(synonyms.stream()) - .add(delta_synonyms.stream()) - .difference(); - - let mut synonyms_builder = SetBuilder::memory(); - synonyms_builder.extend_stream(op).unwrap(); - synonyms_builder - .into_inner() - .and_then(fst::Set::from_bytes) - .unwrap() - }, - None => fst::Set::default(), - }; - - main.set_synonyms_set(&synonyms)?; - - // update the "consistent" view of the Index - let words = Arc::new(main.words_set()?.unwrap_or_default()); - let ranked_map = lease_inner.ranked_map.clone(); - let synonyms = Arc::new(synonyms); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); - - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); - - Ok(()) - } -} diff --git a/meilidb-data/src/database/synonyms_index.rs b/meilidb-data/src/database/synonyms_index.rs deleted file mode 100644 index dfc0182e4..000000000 --- a/meilidb-data/src/database/synonyms_index.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::database::raw_index::InnerRawIndex; - -#[derive(Clone)] -pub struct SynonymsIndex(pub(crate) InnerRawIndex); - -impl SynonymsIndex { - pub fn alternatives_to(&self, word: &[u8]) -> Result, rocksdb::Error> { - match self.0.get(word)? { - Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())), - None => Ok(None), - } - } - - pub fn set_alternatives_to(&self, word: &[u8], value: Vec) -> Result<(), rocksdb::Error> { - self.0.set(word, value)?; - Ok(()) - } - - pub fn del_alternatives_of(&self, word: &[u8]) -> Result<(), rocksdb::Error> { - self.0.delete(word)?; - Ok(()) - } -} diff --git a/meilidb-data/src/database/update/documents_addition.rs b/meilidb-data/src/database/update/documents_addition.rs new file mode 100644 index 000000000..2e4e94736 --- /dev/null +++ b/meilidb-data/src/database/update/documents_addition.rs @@ -0,0 +1,137 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use fst::{SetBuilder, set::OpBuilder}; +use sdset::{SetOperation, duo::Union}; +use serde::Serialize; + +use crate::indexer::Indexer; +use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; +use crate::RankedMap; + +use crate::database::{Error, Index, index::Cache, apply_documents_deletion}; + +pub struct DocumentsAddition<'a, D> { + index: &'a Index, + documents: Vec, +} + +impl<'a, D> DocumentsAddition<'a, D> { + pub fn new(index: &'a Index) -> DocumentsAddition<'a, D> { + DocumentsAddition { index, documents: Vec::new() } + } + + pub fn update_document(&mut self, document: D) { + self.documents.push(document); + } + + pub fn finalize(self) -> Result + where D: serde::Serialize + { + self.index.push_documents_addition(self.documents) + } +} + +pub fn apply_documents_addition( + index: &Index, + mut ranked_map: RankedMap, + addition: Vec, +) -> Result<(), Error> +{ + let mut document_ids = HashSet::new(); + let mut document_store = RamDocumentStore::new(); + let mut indexer = Indexer::new(); + + let schema = &index.schema(); + let identifier = schema.identifier_name(); + + for document in addition { + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + // 1. store the document id for future deletion + document_ids.insert(document_id); + + // 2. index the document fields in ram stores + let serializer = Serializer { + schema, + document_store: &mut document_store, + indexer: &mut indexer, + ranked_map: &mut ranked_map, + document_id, + }; + + document.serialize(serializer)?; + } + + let ref_index = index.as_ref(); + let docs_words = ref_index.docs_words_index; + let documents = ref_index.documents_index; + let main = ref_index.main_index; + let words = ref_index.words_index; + + // 1. remove the previous documents match indexes + let document_ids = document_ids.into_iter().collect(); + apply_documents_deletion(index, ranked_map.clone(), document_ids)?; + + // 2. insert new document attributes in the database + for ((id, attr), value) in document_store.into_inner() { + documents.set_document_field(id, attr, value)?; + } + + let indexed = indexer.build(); + let mut delta_words_builder = SetBuilder::memory(); + + for (word, delta_set) in indexed.words_doc_indexes { + delta_words_builder.insert(&word).unwrap(); + + let set = match words.doc_indexes(&word)? { + Some(set) => Union::new(&set, &delta_set).into_set_buf(), + None => delta_set, + }; + + words.set_doc_indexes(&word, &set)?; + } + + for (id, words) in indexed.docs_words { + docs_words.set_doc_words(id, &words)?; + } + + let delta_words = delta_words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + let words = match main.words_set()? { + Some(words) => { + let op = OpBuilder::new() + .add(words.stream()) + .add(delta_words.stream()) + .r#union(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => delta_words, + }; + + main.set_words_set(&words)?; + main.set_ranked_map(&ranked_map)?; + + // update the "consistent" view of the Index + let cache = ref_index.cache; + let words = Arc::new(words); + let synonyms = cache.synonyms.clone(); + let schema = cache.schema.clone(); + + let cache = Cache { words, synonyms, schema, ranked_map }; + index.cache.store(Arc::new(cache)); + + Ok(()) +} diff --git a/meilidb-data/src/database/update/documents_deletion.rs b/meilidb-data/src/database/update/documents_deletion.rs new file mode 100644 index 000000000..2c1036b79 --- /dev/null +++ b/meilidb-data/src/database/update/documents_deletion.rs @@ -0,0 +1,144 @@ +use std::collections::{HashMap, BTreeSet}; +use std::sync::Arc; + +use fst::{SetBuilder, Streamer}; +use meilidb_core::DocumentId; +use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; + +use crate::RankedMap; +use crate::serde::extract_document_id; + +use crate::database::{Index, Error, index::Cache}; + +pub struct DocumentsDeletion<'a> { + index: &'a Index, + documents: Vec, +} + +impl<'a> DocumentsDeletion<'a> { + pub fn new(index: &'a Index) -> DocumentsDeletion<'a> { + DocumentsDeletion { index, documents: Vec::new() } + } + + pub fn delete_document_by_id(&mut self, document_id: DocumentId) { + self.documents.push(document_id); + } + + pub fn delete_document(&mut self, document: D) -> Result<(), Error> + where D: serde::Serialize, + { + let schema = self.index.schema(); + let identifier = schema.identifier_name(); + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + self.delete_document_by_id(document_id); + + Ok(()) + } + + pub fn finalize(self) -> Result { + self.index.push_documents_deletion(self.documents) + } +} + +impl Extend for DocumentsDeletion<'_> { + fn extend>(&mut self, iter: T) { + self.documents.extend(iter) + } +} + +pub fn apply_documents_deletion( + index: &Index, + mut ranked_map: RankedMap, + deletion: Vec, +) -> Result<(), Error> +{ + let ref_index = index.as_ref(); + let schema = index.schema(); + let docs_words = ref_index.docs_words_index; + let documents = ref_index.documents_index; + let main = ref_index.main_index; + let words = ref_index.words_index; + + let idset = SetBuf::from_dirty(deletion); + + // collect the ranked attributes according to the schema + let ranked_attrs: Vec<_> = schema.iter() + .filter_map(|(_, attr, prop)| { + if prop.is_ranked() { Some(attr) } else { None } + }) + .collect(); + + let mut words_document_ids = HashMap::new(); + for id in idset { + // remove all the ranked attributes from the ranked_map + for ranked_attr in &ranked_attrs { + ranked_map.remove(id, *ranked_attr); + } + + if let Some(words) = docs_words.doc_words(id)? { + let mut stream = words.stream(); + while let Some(word) = stream.next() { + let word = word.to_vec(); + words_document_ids.entry(word).or_insert_with(Vec::new).push(id); + } + } + } + + let mut removed_words = BTreeSet::new(); + for (word, document_ids) in words_document_ids { + let document_ids = SetBuf::from_dirty(document_ids); + + if let Some(doc_indexes) = words.doc_indexes(&word)? { + let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id); + let doc_indexes = op.into_set_buf(); + + if !doc_indexes.is_empty() { + words.set_doc_indexes(&word, &doc_indexes)?; + } else { + words.del_doc_indexes(&word)?; + removed_words.insert(word); + } + } + + for id in document_ids { + documents.del_all_document_fields(id)?; + docs_words.del_doc_words(id)?; + } + } + + let removed_words = fst::Set::from_iter(removed_words).unwrap(); + let words = match main.words_set()? { + Some(words_set) => { + let op = fst::set::OpBuilder::new() + .add(words_set.stream()) + .add(removed_words.stream()) + .difference(); + + let mut words_builder = SetBuilder::memory(); + words_builder.extend_stream(op).unwrap(); + words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => fst::Set::default(), + }; + + main.set_words_set(&words)?; + main.set_ranked_map(&ranked_map)?; + + // update the "consistent" view of the Index + let cache = ref_index.cache; + let words = Arc::new(words); + let synonyms = cache.synonyms.clone(); + let schema = cache.schema.clone(); + + let cache = Cache { words, synonyms, schema, ranked_map }; + index.cache.store(Arc::new(cache)); + + Ok(()) +} diff --git a/meilidb-data/src/database/update/mod.rs b/meilidb-data/src/database/update/mod.rs new file mode 100644 index 000000000..3d849256d --- /dev/null +++ b/meilidb-data/src/database/update/mod.rs @@ -0,0 +1,9 @@ +mod documents_addition; +mod documents_deletion; +mod synonyms_addition; +mod synonyms_deletion; + +pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; +pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; +pub use self::synonyms_addition::{SynonymsAddition, apply_synonyms_addition}; +pub use self::synonyms_deletion::{SynonymsDeletion, apply_synonyms_deletion}; diff --git a/meilidb-data/src/database/update/synonyms_addition.rs b/meilidb-data/src/database/update/synonyms_addition.rs new file mode 100644 index 000000000..1995adc5a --- /dev/null +++ b/meilidb-data/src/database/update/synonyms_addition.rs @@ -0,0 +1,93 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use fst::{SetBuilder, set::OpBuilder}; +use meilidb_core::normalize_str; +use sdset::SetBuf; + +use crate::database::{Error, Index,index::Cache}; + +pub struct SynonymsAddition<'a> { + index: &'a Index, + synonyms: BTreeMap>, +} + +impl<'a> SynonymsAddition<'a> { + pub fn new(index: &'a Index) -> SynonymsAddition<'a> { + SynonymsAddition { index, synonyms: BTreeMap::new() } + } + + pub fn add_synonym(&mut self, synonym: S, alternatives: I) + where S: AsRef, + T: AsRef, + I: IntoIterator, + { + let synonym = normalize_str(synonym.as_ref()); + let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase()); + self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives); + } + + pub fn finalize(self) -> Result { + self.index.push_synonyms_addition(self.synonyms) + } +} + +pub fn apply_synonyms_addition( + index: &Index, + addition: BTreeMap>, +) -> Result<(), Error> +{ + let ref_index = index.as_ref(); + let synonyms = ref_index.synonyms_index; + let main = ref_index.main_index; + + let mut synonyms_builder = SetBuilder::memory(); + + for (synonym, alternatives) in addition { + synonyms_builder.insert(&synonym).unwrap(); + + let alternatives = { + let alternatives = SetBuf::from_dirty(alternatives); + let mut alternatives_builder = SetBuilder::memory(); + alternatives_builder.extend_iter(alternatives).unwrap(); + alternatives_builder.into_inner().unwrap() + }; + synonyms.set_alternatives_to(synonym.as_bytes(), alternatives)?; + } + + let delta_synonyms = synonyms_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + let synonyms = match main.synonyms_set()? { + Some(synonyms) => { + let op = OpBuilder::new() + .add(synonyms.stream()) + .add(delta_synonyms.stream()) + .r#union(); + + let mut synonyms_builder = SetBuilder::memory(); + synonyms_builder.extend_stream(op).unwrap(); + synonyms_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => delta_synonyms, + }; + + main.set_synonyms_set(&synonyms)?; + + // update the "consistent" view of the Index + let cache = ref_index.cache; + let words = Arc::new(main.words_set()?.unwrap_or_default()); + let ranked_map = cache.ranked_map.clone(); + let synonyms = Arc::new(synonyms); + let schema = cache.schema.clone(); + + let cache = Cache { words, synonyms, schema, ranked_map }; + index.cache.store(Arc::new(cache)); + + Ok(()) +} diff --git a/meilidb-data/src/database/update/synonyms_deletion.rs b/meilidb-data/src/database/update/synonyms_deletion.rs new file mode 100644 index 000000000..d472cc73c --- /dev/null +++ b/meilidb-data/src/database/update/synonyms_deletion.rs @@ -0,0 +1,136 @@ +use std::collections::BTreeMap; +use std::iter::FromIterator; +use std::sync::Arc; + +use fst::{SetBuilder, set::OpBuilder}; +use meilidb_core::normalize_str; +use sdset::SetBuf; + +use crate::database::{Error, Index, index::Cache}; + +pub struct SynonymsDeletion<'a> { + index: &'a Index, + synonyms: BTreeMap>>, +} + +impl<'a> SynonymsDeletion<'a> { + pub fn new(index: &'a Index) -> SynonymsDeletion<'a> { + SynonymsDeletion { index, synonyms: BTreeMap::new() } + } + + pub fn delete_all_alternatives_of>(&mut self, synonym: S) { + let synonym = normalize_str(synonym.as_ref()); + self.synonyms.insert(synonym, None); + } + + pub fn delete_specific_alternatives_of(&mut self, synonym: S, alternatives: I) + where S: AsRef, + T: AsRef, + I: Iterator, + { + let synonym = normalize_str(synonym.as_ref()); + let value = self.synonyms.entry(synonym).or_insert(None); + let alternatives = alternatives.map(|s| s.as_ref().to_lowercase()); + match value { + Some(v) => v.extend(alternatives), + None => *value = Some(Vec::from_iter(alternatives)), + } + } + + pub fn finalize(self) -> Result { + self.index.push_synonyms_deletion(self.synonyms) + } +} + +pub fn apply_synonyms_deletion( + index: &Index, + deletion: BTreeMap>>, +) -> Result<(), Error> +{ + let ref_index = index.as_ref(); + let synonyms = ref_index.synonyms_index; + let main = ref_index.main_index; + + let mut delete_whole_synonym_builder = SetBuilder::memory(); + + for (synonym, alternatives) in deletion { + match alternatives { + Some(alternatives) => { + let prev_alternatives = synonyms.alternatives_to(synonym.as_bytes())?; + let prev_alternatives = match prev_alternatives { + Some(alternatives) => alternatives, + None => continue, + }; + + let delta_alternatives = { + let alternatives = SetBuf::from_dirty(alternatives); + let mut builder = SetBuilder::memory(); + builder.extend_iter(alternatives).unwrap(); + builder.into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }; + + let op = OpBuilder::new() + .add(prev_alternatives.stream()) + .add(delta_alternatives.stream()) + .difference(); + + let (alternatives, empty_alternatives) = { + let mut builder = SetBuilder::memory(); + let len = builder.get_ref().len(); + builder.extend_stream(op).unwrap(); + let is_empty = len == builder.get_ref().len(); + let alternatives = builder.into_inner().unwrap(); + (alternatives, is_empty) + }; + + if empty_alternatives { + delete_whole_synonym_builder.insert(synonym.as_bytes())?; + } else { + synonyms.set_alternatives_to(synonym.as_bytes(), alternatives)?; + } + }, + None => { + delete_whole_synonym_builder.insert(&synonym).unwrap(); + synonyms.del_alternatives_of(synonym.as_bytes())?; + } + } + } + + let delta_synonyms = delete_whole_synonym_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + let synonyms = match main.synonyms_set()? { + Some(synonyms) => { + let op = OpBuilder::new() + .add(synonyms.stream()) + .add(delta_synonyms.stream()) + .difference(); + + let mut synonyms_builder = SetBuilder::memory(); + synonyms_builder.extend_stream(op).unwrap(); + synonyms_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => fst::Set::default(), + }; + + main.set_synonyms_set(&synonyms)?; + + // update the "consistent" view of the Index + let cache = ref_index.cache; + let words = Arc::new(main.words_set()?.unwrap_or_default()); + let ranked_map = cache.ranked_map.clone(); + let synonyms = Arc::new(synonyms); + let schema = cache.schema.clone(); + + let cache = Cache { words, synonyms, schema, ranked_map }; + index.cache.store(Arc::new(cache)); + + Ok(()) +} diff --git a/meilidb-data/src/lib.rs b/meilidb-data/src/lib.rs index da5c35e28..05c6041ed 100644 --- a/meilidb-data/src/lib.rs +++ b/meilidb-data/src/lib.rs @@ -5,8 +5,8 @@ mod number; mod ranked_map; mod serde; -pub use rocksdb; -pub use self::database::{Database, Index, CustomSettings}; +pub use sled; +pub use self::database::{Database, Index, CustomSettingsIndex}; pub use self::number::Number; pub use self::ranked_map::RankedMap; pub use self::serde::{compute_document_id, extract_document_id, value_to_string}; diff --git a/meilidb-data/src/serde/deserializer.rs b/meilidb-data/src/serde/deserializer.rs index 24f00998c..4f35bdc34 100644 --- a/meilidb-data/src/serde/deserializer.rs +++ b/meilidb-data/src/serde/deserializer.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; use std::io::Cursor; +use std::{fmt, error::Error}; use meilidb_core::DocumentId; use meilidb_schema::SchemaAttr; @@ -9,6 +10,43 @@ use serde::{de, forward_to_deserialize_any}; use crate::database::Index; +#[derive(Debug)] +pub enum DeserializerError { + RmpError(RmpError), + SledError(sled::Error), + Custom(String), +} + +impl de::Error for DeserializerError { + fn custom(msg: T) -> Self { + DeserializerError::Custom(msg.to_string()) + } +} + +impl fmt::Display for DeserializerError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), + DeserializerError::SledError(e) => write!(f, "Sled related error: {}", e), + DeserializerError::Custom(s) => f.write_str(s), + } + } +} + +impl Error for DeserializerError {} + +impl From for DeserializerError { + fn from(error: RmpError) -> DeserializerError { + DeserializerError::RmpError(error) + } +} + +impl From for DeserializerError { + fn from(error: sled::Error) -> DeserializerError { + DeserializerError::SledError(error) + } +} + pub struct Deserializer<'a> { pub document_id: DocumentId, pub index: &'a Index, @@ -17,7 +55,7 @@ pub struct Deserializer<'a> { impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> { - type Error = RmpError; + type Error = DeserializerError; fn deserialize_any(self, visitor: V) -> Result where V: de::Visitor<'de> @@ -34,33 +72,41 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> fn deserialize_map(self, visitor: V) -> Result where V: de::Visitor<'de> { - let schema = &self.index.lease_inner().schema; - let documents = &self.index.lease_inner().raw.documents; + let schema = self.index.schema(); + let documents = self.index.as_ref().documents_index; - let document_attributes = documents.document_fields(self.document_id); - let document_attributes = document_attributes.filter_map(|result| { - match result { - Ok(value) => Some(value), - Err(e) => { - // TODO: must log the error - // error!("sled iter error; {}", e); - None - }, - } - }); + let mut error = None; - let iter = document_attributes.filter_map(|(attr, value)| { - let is_displayed = schema.props(attr).is_displayed(); - if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { - let attribute_name = schema.attribute_name(attr); - Some((attribute_name, Value::new(value))) - } else { - None - } - }); + let iter = documents + .document_fields(self.document_id) + .filter_map(|result| { + match result { + Ok((attr, value)) => { + let is_displayed = schema.props(attr).is_displayed(); + if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { + let attribute_name = schema.attribute_name(attr); + Some((attribute_name, Value::new(value))) + } else { + None + } + }, + Err(e) => { + if error.is_none() { + error = Some(e); + } + None + } + } + }); let map_deserializer = de::value::MapDeserializer::new(iter); - visitor.visit_map(map_deserializer) + let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from); + + if let Some(e) = error { + return Err(DeserializerError::from(e)) + } + + result } } diff --git a/meilidb-data/src/serde/mod.rs b/meilidb-data/src/serde/mod.rs index b0433f11f..995f46205 100644 --- a/meilidb-data/src/serde/mod.rs +++ b/meilidb-data/src/serde/mod.rs @@ -15,7 +15,7 @@ mod extract_document_id; mod indexer; mod serializer; -pub use self::deserializer::Deserializer; +pub use self::deserializer::{Deserializer, DeserializerError}; pub use self::extract_document_id::{extract_document_id, compute_document_id, value_to_string}; pub use self::convert_to_string::ConvertToString; pub use self::convert_to_number::ConvertToNumber; @@ -38,8 +38,8 @@ pub enum SerializerError { DocumentIdNotFound, InvalidDocumentIdType, RmpError(RmpError), + SledError(sled::Error), SerdeJsonError(SerdeJsonError), - RocksdbError(rocksdb::Error), ParseNumberError(ParseNumberError), UnserializableType { type_name: &'static str }, UnindexableType { type_name: &'static str }, @@ -63,8 +63,8 @@ impl fmt::Display for SerializerError { write!(f, "document identifier can only be of type string or number") }, SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), + SerializerError::SledError(e) => write!(f, "Sled related error: {}", e), SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e), - SerializerError::RocksdbError(e) => write!(f, "RocksDB related error: {}", e), SerializerError::ParseNumberError(e) => { write!(f, "error while trying to parse a number: {}", e) }, @@ -102,9 +102,9 @@ impl From for SerializerError { } } -impl From for SerializerError { - fn from(error: rocksdb::Error) -> SerializerError { - SerializerError::RocksdbError(error) +impl From for SerializerError { + fn from(error: sled::Error) -> SerializerError { + SerializerError::SledError(error) } } diff --git a/meilidb-data/tests/updates.rs b/meilidb-data/tests/updates.rs index e85da880e..ecdd07003 100644 --- a/meilidb-data/tests/updates.rs +++ b/meilidb-data/tests/updates.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; + use serde_json::json; use meilidb_data::Database; use meilidb_schema::{Schema, SchemaBuilder, DISPLAYED, INDEXED}; @@ -12,16 +15,24 @@ fn simple_schema() -> Schema { #[test] fn insert_delete_document() { let tmp_dir = tempfile::tempdir().unwrap(); - let database = Database::start_default(&tmp_dir).unwrap(); + let database = Database::open(&tmp_dir).unwrap(); + + let as_been_updated = Arc::new(AtomicBool::new(false)); let schema = simple_schema(); let index = database.create_index("hello", schema).unwrap(); + let as_been_updated_clone = as_been_updated.clone(); + index.set_update_callback(move |_| as_been_updated_clone.store(true, Relaxed)); + let doc1 = json!({ "objectId": 123, "title": "hello" }); let mut addition = index.documents_addition(); - addition.update_document(&doc1).unwrap(); - addition.finalize().unwrap(); + addition.update_document(&doc1); + let update_id = addition.finalize().unwrap(); + let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 1); @@ -29,7 +40,10 @@ fn insert_delete_document() { let mut deletion = index.documents_deletion(); deletion.delete_document(&doc1).unwrap(); - deletion.finalize().unwrap(); + let update_id = deletion.finalize().unwrap(); + let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 0); @@ -38,25 +52,36 @@ fn insert_delete_document() { #[test] fn replace_document() { let tmp_dir = tempfile::tempdir().unwrap(); - let database = Database::start_default(&tmp_dir).unwrap(); + let database = Database::open(&tmp_dir).unwrap(); + + let as_been_updated = Arc::new(AtomicBool::new(false)); let schema = simple_schema(); let index = database.create_index("hello", schema).unwrap(); + let as_been_updated_clone = as_been_updated.clone(); + index.set_update_callback(move |_| as_been_updated_clone.store(true, Relaxed)); + let doc1 = json!({ "objectId": 123, "title": "hello" }); let doc2 = json!({ "objectId": 123, "title": "coucou" }); let mut addition = index.documents_addition(); - addition.update_document(&doc1).unwrap(); - addition.finalize().unwrap(); + addition.update_document(&doc1); + let update_id = addition.finalize().unwrap(); + let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 1); assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1)); let mut deletion = index.documents_addition(); - deletion.update_document(&doc2).unwrap(); - deletion.finalize().unwrap(); + deletion.update_document(&doc2); + let update_id = deletion.finalize().unwrap(); + let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 0); diff --git a/meilidb-schema/src/lib.rs b/meilidb-schema/src/lib.rs index 43e82a168..85325a0f2 100644 --- a/meilidb-schema/src/lib.rs +++ b/meilidb-schema/src/lib.rs @@ -99,14 +99,14 @@ struct InnerSchema { } impl Schema { - pub fn from_toml(mut reader: R) -> Result> { + pub fn from_toml(mut reader: R) -> Result> { let mut buffer = Vec::new(); reader.read_to_end(&mut buffer)?; let builder: SchemaBuilder = toml::from_slice(&buffer)?; Ok(builder.build()) } - pub fn to_toml(&self, mut writer: W) -> Result<(), Box> { + pub fn to_toml(&self, mut writer: W) -> Result<(), Box> { let identifier = self.inner.identifier.clone(); let attributes = self.attributes_ordered(); let builder = SchemaBuilder { identifier, attributes }; @@ -117,14 +117,14 @@ impl Schema { Ok(()) } - pub fn from_json(mut reader: R) -> Result> { + pub fn from_json(mut reader: R) -> Result> { let mut buffer = Vec::new(); reader.read_to_end(&mut buffer)?; let builder: SchemaBuilder = serde_json::from_slice(&buffer)?; Ok(builder.build()) } - pub fn to_json(&self, mut writer: W) -> Result<(), Box> { + pub fn to_json(&self, mut writer: W) -> Result<(), Box> { let identifier = self.inner.identifier.clone(); let attributes = self.attributes_ordered(); let builder = SchemaBuilder { identifier, attributes }; @@ -245,7 +245,7 @@ mod tests { } #[test] - fn serialize_deserialize_toml() -> Result<(), Box> { + fn serialize_deserialize_toml() -> Result<(), Box> { let mut builder = SchemaBuilder::with_identifier("id"); builder.new_attribute("alpha", DISPLAYED); builder.new_attribute("beta", DISPLAYED | INDEXED); @@ -278,7 +278,7 @@ mod tests { } #[test] - fn serialize_deserialize_json() -> Result<(), Box> { + fn serialize_deserialize_json() -> Result<(), Box> { let mut builder = SchemaBuilder::with_identifier("id"); builder.new_attribute("alpha", DISPLAYED); builder.new_attribute("beta", DISPLAYED | INDEXED); diff --git a/meilidb/examples/create-database.rs b/meilidb/examples/create-database.rs index d8e553ed3..d49979f28 100644 --- a/meilidb/examples/create-database.rs +++ b/meilidb/examples/create-database.rs @@ -6,7 +6,6 @@ use std::io::{self, BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::time::Instant; use std::error::Error; -use std::borrow::Cow; use std::fs::File; use diskus::Walk; @@ -44,9 +43,8 @@ pub struct Opt { } #[derive(Serialize, Deserialize)] -struct Document<'a> ( - #[serde(borrow)] - HashMap, Cow<'a, str>> +struct Document ( + HashMap ); #[derive(Debug, Clone, Serialize, Deserialize)] @@ -85,7 +83,7 @@ fn index( synonyms: Vec, ) -> Result> { - let database = Database::start_default(database_path)?; + let database = Database::open(database_path)?; let mut wtr = csv::Writer::from_path("./stats.csv").unwrap(); wtr.write_record(&["NumberOfDocuments", "DiskUsed", "MemoryUsed"])?; @@ -138,7 +136,7 @@ fn index( } }; - update.update_document(&document)?; + update.update_document(document); print!("\rindexing document {}", i); i += 1; diff --git a/meilidb/examples/query-database.rs b/meilidb/examples/query-database.rs index d939c0b70..c02dbc5bf 100644 --- a/meilidb/examples/query-database.rs +++ b/meilidb/examples/query-database.rs @@ -143,7 +143,7 @@ fn main() -> Result<(), Box> { let opt = Opt::from_args(); let start = Instant::now(); - let database = Database::start_default(&opt.database_path)?; + let database = Database::open(&opt.database_path)?; let index = database.open_index("test")?.unwrap(); let schema = index.schema();