diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index f659819bc..f523792f0 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -5,10 +5,11 @@ authors = ["Kerollmops "] edition = "2018" [dependencies] -arc-swap = "0.3.11" +arc-swap = "0.4.2" bincode = "1.1.4" deunicode = "1.0.0" hashbrown = { version = "0.6.0", features = ["serde"] } +log = "0.4.6" meilidb-core = { path = "../meilidb-core", version = "0.1.0" } meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } @@ -17,7 +18,7 @@ sdset = "0.3.2" serde = { version = "1.0.99", features = ["derive"] } serde_json = "1.0.40" siphasher = "0.3.0" -rocksdb = { version = "0.12.3", default-features = false } +sled = "0.25.0" zerocopy = "0.2.8" [dependencies.rmp-serde] diff --git a/meilidb-data/src/database/custom_settings.rs b/meilidb-data/src/database/custom_settings.rs deleted file mode 100644 index 46653bfb0..000000000 --- a/meilidb-data/src/database/custom_settings.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::ops::Deref; -use crate::database::raw_index::InnerRawIndex; - -#[derive(Clone)] -pub struct CustomSettings(pub(crate) InnerRawIndex); - -impl Deref for CustomSettings { - type Target = InnerRawIndex; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/meilidb-data/src/database/documents_addition.rs b/meilidb-data/src/database/documents_addition.rs index 6e6c2003b..2ca5af7d3 100644 --- a/meilidb-data/src/database/documents_addition.rs +++ b/meilidb-data/src/database/documents_addition.rs @@ -9,7 +9,8 @@ use crate::indexer::Indexer; use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::RankedMap; -use super::{Error, Index, InnerIndex, DocumentsDeletion}; +use super::{Error, Index, DocumentsDeletion}; +use super::index::Cache; pub struct DocumentsAddition<'a> { inner: &'a Index, @@ -33,7 +34,7 @@ impl<'a> DocumentsAddition<'a> { pub fn update_document(&mut self, document: D) -> Result<(), Error> where D: serde::Serialize, { - let schema = &self.inner.lease_inner().schema; + let schema = &self.inner.schema(); let identifier = schema.identifier_name(); let document_id = match extract_document_id(identifier, &document)? { @@ -59,11 +60,11 @@ impl<'a> DocumentsAddition<'a> { } pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let docs_words = &lease_inner.raw.docs_words; - let documents = &lease_inner.raw.documents; - let main = &lease_inner.raw.main; - let words = &lease_inner.raw.words; + let ref_index = self.inner.as_ref(); + let docs_words = ref_index.docs_words_index; + let documents = ref_index.documents_index; + let main = ref_index.main_index; + let words = ref_index.words_index; // 1. remove the previous documents match indexes let mut documents_deletion = DocumentsDeletion::new(self.inner, self.ranked_map.clone()); @@ -119,15 +120,14 @@ impl<'a> DocumentsAddition<'a> { main.set_ranked_map(&self.ranked_map)?; // update the "consistent" view of the Index + let cache = ref_index.cache; let words = Arc::new(words); let ranked_map = self.ranked_map; - let synonyms = lease_inner.synonyms.clone(); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); + let synonyms = cache.synonyms.clone(); + let schema = cache.schema.clone(); - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); + let cache = Cache { words, synonyms, schema, ranked_map }; + self.inner.cache.store(Arc::new(cache)); Ok(()) } diff --git a/meilidb-data/src/database/documents_deletion.rs b/meilidb-data/src/database/documents_deletion.rs index d1413e353..49f8313b9 100644 --- a/meilidb-data/src/database/documents_deletion.rs +++ b/meilidb-data/src/database/documents_deletion.rs @@ -8,7 +8,8 @@ use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; use crate::RankedMap; use crate::serde::extract_document_id; -use super::{Index, Error, InnerIndex}; +use super::{Index, Error}; +use super::index::Cache; pub struct DocumentsDeletion<'a> { inner: &'a Index, @@ -28,7 +29,7 @@ impl<'a> DocumentsDeletion<'a> { pub fn delete_document(&mut self, document: D) -> Result<(), Error> where D: serde::Serialize, { - let schema = &self.inner.lease_inner().schema; + let schema = &self.inner.schema(); let identifier = schema.identifier_name(); let document_id = match extract_document_id(identifier, &document)? { @@ -42,12 +43,12 @@ impl<'a> DocumentsDeletion<'a> { } pub fn finalize(mut self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let docs_words = &lease_inner.raw.docs_words; - let documents = &lease_inner.raw.documents; - let main = &lease_inner.raw.main; - let schema = &lease_inner.schema; - let words = &lease_inner.raw.words; + let ref_index = self.inner.as_ref(); + let schema = self.inner.schema(); + let docs_words = ref_index.docs_words_index; + let documents = ref_index.documents_index; + let main = ref_index.main_index; + let words = ref_index.words_index; let idset = SetBuf::from_dirty(self.documents); @@ -118,15 +119,14 @@ impl<'a> DocumentsDeletion<'a> { main.set_ranked_map(&self.ranked_map)?; // update the "consistent" view of the Index + let cache = ref_index.cache; let words = Arc::new(words); - let ranked_map = lease_inner.ranked_map.clone(); - let synonyms = lease_inner.synonyms.clone(); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); + let ranked_map = self.ranked_map; + let synonyms = cache.synonyms.clone(); + let schema = cache.schema.clone(); - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); + let cache = Cache { words, synonyms, schema, ranked_map }; + self.inner.cache.store(Arc::new(cache)); Ok(()) } diff --git a/meilidb-data/src/database/documents_index.rs b/meilidb-data/src/database/documents_index.rs deleted file mode 100644 index 361f9facb..000000000 --- a/meilidb-data/src/database/documents_index.rs +++ /dev/null @@ -1,90 +0,0 @@ -use std::convert::TryInto; - -use meilidb_core::DocumentId; -use meilidb_schema::SchemaAttr; -use rocksdb::DBVector; - -use crate::database::raw_index::InnerRawIndex; -use crate::document_attr_key::DocumentAttrKey; - -#[derive(Clone)] -pub struct DocumentsIndex(pub(crate) InnerRawIndex); - -impl DocumentsIndex { - pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result, rocksdb::Error> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.get(key) - } - - pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> Result<(), rocksdb::Error> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.set(key, value)?; - Ok(()) - } - - pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> { - let key = DocumentAttrKey::new(id, attr).to_be_bytes(); - self.0.delete(key)?; - Ok(()) - } - - pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> { - let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); - let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - self.0.delete_range(start, end)?; - Ok(()) - } - - pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { - let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); - let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - - let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward); - let iter = self.0.iterator(from).unwrap(); - - DocumentFieldsIter(iter, end.to_vec()) - } - - pub fn len(&self) -> Result { - let mut last_document_id = None; - let mut count = 0; - - let from = rocksdb::IteratorMode::Start; - let iterator = self.0.iterator(from)?; - - for (key, _) in iterator { - let slice = key.as_ref().try_into().unwrap(); - let document_id = DocumentAttrKey::from_be_bytes(slice).document_id; - - if Some(document_id) != last_document_id { - last_document_id = Some(document_id); - count += 1; - } - } - - Ok(count) - } -} - -pub struct DocumentFieldsIter<'a>(rocksdb::DBIterator<'a>, Vec); - -impl<'a> Iterator for DocumentFieldsIter<'a> { - type Item = Result<(SchemaAttr, Box<[u8]>), rocksdb::Error>; - - fn next(&mut self) -> Option { - match self.0.next() { - Some((key, value)) => { - - if key.as_ref() > self.1.as_ref() { - return None; - } - - let slice: &[u8] = key.as_ref(); - let array = slice.try_into().unwrap(); - let key = DocumentAttrKey::from_be_bytes(array); - Some(Ok((key.attribute, value))) - }, - None => None, - } - } -} diff --git a/meilidb-data/src/database/error.rs b/meilidb-data/src/database/error.rs index 99b90e056..16a160d93 100644 --- a/meilidb-data/src/database/error.rs +++ b/meilidb-data/src/database/error.rs @@ -7,15 +7,15 @@ pub enum Error { SchemaMissing, WordIndexMissing, MissingDocumentId, - RocksdbError(rocksdb::Error), + SledError(sled::Error), FstError(fst::Error), BincodeError(bincode::Error), SerializerError(SerializerError), } -impl From for Error { - fn from(error: rocksdb::Error) -> Error { - Error::RocksdbError(error) +impl From for Error { + fn from(error: sled::Error) -> Error { + Error::SledError(error) } } @@ -45,7 +45,7 @@ impl fmt::Display for Error { SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), - RocksdbError(e) => write!(f, "RocksDB error; {}", e), + SledError(e) => write!(f, "Sled error; {}", e), FstError(e) => write!(f, "fst error; {}", e), BincodeError(e) => write!(f, "bincode error; {}", e), SerializerError(e) => write!(f, "serializer error; {}", e), diff --git a/meilidb-data/src/database/index.rs b/meilidb-data/src/database/index.rs deleted file mode 100644 index edec6a89b..000000000 --- a/meilidb-data/src/database/index.rs +++ /dev/null @@ -1,170 +0,0 @@ -use sdset::SetBuf; -use std::collections::HashSet; -use std::sync::Arc; - -use arc_swap::{ArcSwap, Lease}; -use meilidb_core::criterion::Criteria; -use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; -use meilidb_schema::Schema; -use rmp_serde::decode::Error as RmpError; -use serde::de; - -use crate::ranked_map::RankedMap; -use crate::serde::Deserializer; - -use super::{Error, CustomSettings}; -use super::{ - RawIndex, - DocumentsAddition, DocumentsDeletion, - SynonymsAddition, SynonymsDeletion, -}; - -#[derive(Copy, Clone)] -pub struct IndexStats { - pub number_of_words: usize, - pub number_of_documents: usize, - pub number_attrs_in_ranked_map: usize, -} - -#[derive(Clone)] -pub struct Index(pub ArcSwap); - -pub struct InnerIndex { - pub words: Arc, - pub synonyms: Arc, - pub schema: Schema, - pub ranked_map: RankedMap, - pub raw: RawIndex, // TODO this will be a snapshot in the future -} - -impl Index { - pub fn from_raw(raw: RawIndex) -> Result { - let words = match raw.main.words_set()? { - Some(words) => Arc::new(words), - None => Arc::new(fst::Set::default()), - }; - - let synonyms = match raw.main.synonyms_set()? { - Some(synonyms) => Arc::new(synonyms), - None => Arc::new(fst::Set::default()), - }; - - let schema = match raw.main.schema()? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; - - let ranked_map = match raw.main.ranked_map()? { - Some(map) => map, - None => RankedMap::default(), - }; - - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - let index = Index(ArcSwap::new(Arc::new(inner))); - - Ok(index) - } - - pub fn stats(&self) -> Result { - let lease = self.0.lease(); - - Ok(IndexStats { - number_of_words: lease.words.len(), - number_of_documents: lease.raw.documents.len()?, - number_attrs_in_ranked_map: lease.ranked_map.len(), - }) - } - - pub fn query_builder(&self) -> QueryBuilder { - let lease = IndexLease(self.0.lease()); - QueryBuilder::new(lease) - } - - pub fn query_builder_with_criteria<'c>( - &self, - criteria: Criteria<'c>, - ) -> QueryBuilder<'c, IndexLease> - { - let lease = IndexLease(self.0.lease()); - QueryBuilder::with_criteria(lease, criteria) - } - - pub fn lease_inner(&self) -> Lease> { - self.0.lease() - } - - pub fn schema(&self) -> Schema { - self.0.lease().schema.clone() - } - - pub fn custom_settings(&self) -> CustomSettings { - self.0.lease().raw.custom.clone() - } - - pub fn documents_addition(&self) -> DocumentsAddition { - let ranked_map = self.0.lease().ranked_map.clone(); - DocumentsAddition::new(self, ranked_map) - } - - pub fn documents_deletion(&self) -> DocumentsDeletion { - let ranked_map = self.0.lease().ranked_map.clone(); - DocumentsDeletion::new(self, ranked_map) - } - - pub fn synonyms_addition(&self) -> SynonymsAddition { - SynonymsAddition::new(self) - } - - pub fn synonyms_deletion(&self) -> SynonymsDeletion { - SynonymsDeletion::new(self) - } - - pub fn document( - &self, - fields: Option<&HashSet<&str>>, - id: DocumentId, - ) -> Result, RmpError> - where T: de::DeserializeOwned, - { - let schema = &self.lease_inner().schema; - let fields = fields - .map(|fields| { - fields - .iter() - .filter_map(|name| schema.attribute(name)) - .collect() - }); - - let mut deserializer = Deserializer { - document_id: id, - index: &self, - fields: fields.as_ref(), - }; - - // TODO: currently we return an error if all document fields are missing, - // returning None would have been better - T::deserialize(&mut deserializer).map(Some) - } -} - -pub struct IndexLease(Lease>); - -impl Store for IndexLease { - type Error = Error; - - fn words(&self) -> Result<&fst::Set, Self::Error> { - Ok(&self.0.words) - } - - fn word_indexes(&self, word: &[u8]) -> Result>, Self::Error> { - Ok(self.0.raw.words.doc_indexes(word)?) - } - - fn synonyms(&self) -> Result<&fst::Set, Self::Error> { - Ok(&self.0.synonyms) - } - - fn alternatives_to(&self, word: &[u8]) -> Result, Self::Error> { - Ok(self.0.raw.synonyms.alternatives_to(word)?) - } -} diff --git a/meilidb-data/src/database/index/custom_settings_index.rs b/meilidb-data/src/database/index/custom_settings_index.rs new file mode 100644 index 000000000..bab62bd2d --- /dev/null +++ b/meilidb-data/src/database/index/custom_settings_index.rs @@ -0,0 +1,13 @@ +use std::sync::Arc; +use std::ops::Deref; + +#[derive(Clone)] +pub struct CustomSettingsIndex(pub(crate) Arc); + +impl Deref for CustomSettingsIndex { + type Target = sled::Tree; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/meilidb-data/src/database/docs_words_index.rs b/meilidb-data/src/database/index/docs_words_index.rs similarity index 78% rename from meilidb-data/src/database/docs_words_index.rs rename to meilidb-data/src/database/index/docs_words_index.rs index f4af69ee8..18c11de77 100644 --- a/meilidb-data/src/database/docs_words_index.rs +++ b/meilidb-data/src/database/index/docs_words_index.rs @@ -1,17 +1,14 @@ use std::sync::Arc; - use meilidb_core::DocumentId; - -use crate::database::raw_index::InnerRawIndex; use super::Error; #[derive(Clone)] -pub struct DocsWordsIndex(pub(crate) InnerRawIndex); +pub struct DocsWordsIndex(pub Arc); impl DocsWordsIndex { pub fn doc_words(&self, id: DocumentId) -> Result, Error> { let key = id.0.to_be_bytes(); - match self.0.get_pinned(key)? { + match self.0.get(key)? { Some(bytes) => { let len = bytes.len(); let value = Arc::from(bytes.as_ref()); @@ -24,13 +21,13 @@ impl DocsWordsIndex { pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { let key = id.0.to_be_bytes(); - self.0.set(key, words.as_fst().as_bytes())?; + self.0.insert(key, words.as_fst().as_bytes())?; Ok(()) } pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { let key = id.0.to_be_bytes(); - self.0.delete(key)?; + self.0.remove(key)?; Ok(()) } } diff --git a/meilidb-data/src/database/index/documents_index.rs b/meilidb-data/src/database/index/documents_index.rs new file mode 100644 index 000000000..f9e5d6122 --- /dev/null +++ b/meilidb-data/src/database/index/documents_index.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use std::convert::TryInto; +use std::ops::Bound; + +use meilidb_core::DocumentId; +use meilidb_schema::SchemaAttr; + +use crate::document_attr_key::DocumentAttrKey; + +fn document_fields_range(id: DocumentId) -> (Bound<[u8; 10]>, Bound<[u8; 10]>) { + let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); + let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); + + (Bound::Included(start), Bound::Included(end)) +} + +#[derive(Clone)] +pub struct DocumentsIndex(pub(crate) Arc); + +impl DocumentsIndex { + pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.get(key) + } + + pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.insert(key, value)?; + Ok(()) + } + + pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { + let key = DocumentAttrKey::new(id, attr).to_be_bytes(); + self.0.remove(key)?; + Ok(()) + } + + pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { + let range = document_fields_range(id); + + for result in self.0.range(range) { + let (key, _) = result?; + self.0.remove(key)?; + } + + Ok(()) + } + + pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { + let range = document_fields_range(id); + + let iter = self.0.range(range); + DocumentFieldsIter(iter) + } + + pub fn len(&self) -> sled::Result { + let mut last_document_id = None; + let mut count = 0; + + for result in self.0.iter() { + let (key, _) = result?; + let array = key.as_ref().try_into().unwrap(); + let document_id = DocumentAttrKey::from_be_bytes(array).document_id; + + if Some(document_id) != last_document_id { + last_document_id = Some(document_id); + count += 1; + } + } + + Ok(count) + } +} + +pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); + +impl Iterator for DocumentFieldsIter<'_> { + type Item = sled::Result<(SchemaAttr, sled::IVec)>; + + fn next(&mut self) -> Option { + match self.0.next() { + Some(Ok((key, value))) => { + let array = key.as_ref().try_into().unwrap(); + let key = DocumentAttrKey::from_be_bytes(array); + Some(Ok((key.attribute, value))) + }, + Some(Err(e)) => return Some(Err(e)), + None => None, + } + } +} diff --git a/meilidb-data/src/database/main_index.rs b/meilidb-data/src/database/index/main_index.rs similarity index 79% rename from meilidb-data/src/database/main_index.rs rename to meilidb-data/src/database/index/main_index.rs index c58b76d47..cee3ec878 100644 --- a/meilidb-data/src/database/main_index.rs +++ b/meilidb-data/src/database/index/main_index.rs @@ -1,8 +1,5 @@ use std::sync::Arc; - use meilidb_schema::Schema; - -use crate::database::raw_index::InnerRawIndex; use crate::ranked_map::RankedMap; use super::Error; @@ -13,11 +10,11 @@ const SYNONYMS_KEY: &str = "synonyms"; const RANKED_MAP_KEY: &str = "ranked-map"; #[derive(Clone)] -pub struct MainIndex(pub(crate) InnerRawIndex); +pub struct MainIndex(pub(crate) Arc); impl MainIndex { pub fn schema(&self) -> Result, Error> { - match self.0.get_pinned(SCHEMA_KEY)? { + match self.0.get(SCHEMA_KEY)? { Some(bytes) => { let schema = Schema::read_from_bin(bytes.as_ref())?; Ok(Some(schema)) @@ -29,12 +26,12 @@ impl MainIndex { pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { let mut bytes = Vec::new(); schema.write_to_bin(&mut bytes)?; - self.0.set(SCHEMA_KEY, bytes)?; + self.0.insert(SCHEMA_KEY, bytes)?; Ok(()) } pub fn words_set(&self) -> Result, Error> { - match self.0.get_pinned(WORDS_KEY)? { + match self.0.get(WORDS_KEY)? { Some(bytes) => { let len = bytes.len(); let value = Arc::from(bytes.as_ref()); @@ -46,11 +43,11 @@ impl MainIndex { } pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { - self.0.set(WORDS_KEY, value.as_fst().as_bytes()).map_err(Into::into) + self.0.insert(WORDS_KEY, value.as_fst().as_bytes()).map(drop).map_err(Into::into) } pub fn synonyms_set(&self) -> Result, Error> { - match self.0.get_pinned(SYNONYMS_KEY)? { + match self.0.get(SYNONYMS_KEY)? { Some(bytes) => { let len = bytes.len(); let value = Arc::from(bytes.as_ref()); @@ -62,11 +59,11 @@ impl MainIndex { } pub fn set_synonyms_set(&self, value: &fst::Set) -> Result<(), Error> { - self.0.set(SYNONYMS_KEY, value.as_fst().as_bytes()).map_err(Into::into) + self.0.insert(SYNONYMS_KEY, value.as_fst().as_bytes()).map(drop).map_err(Into::into) } pub fn ranked_map(&self) -> Result, Error> { - match self.0.get_pinned(RANKED_MAP_KEY)? { + match self.0.get(RANKED_MAP_KEY)? { Some(bytes) => { let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; Ok(Some(ranked_map)) @@ -78,7 +75,7 @@ impl MainIndex { pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { let mut bytes = Vec::new(); value.write_to_bin(&mut bytes)?; - self.0.set(RANKED_MAP_KEY, bytes)?; + self.0.insert(RANKED_MAP_KEY, bytes)?; Ok(()) } } diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs new file mode 100644 index 000000000..2fa6c20e2 --- /dev/null +++ b/meilidb-data/src/database/index/mod.rs @@ -0,0 +1,276 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use arc_swap::{ArcSwap, Guard}; +use meilidb_core::criterion::Criteria; +use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; +use meilidb_schema::Schema; +use sdset::SetBuf; +use serde::de; + +use crate::ranked_map::RankedMap; +use crate::serde::{Deserializer, DeserializerError}; + +use super::Error; + +pub use self::custom_settings_index::CustomSettingsIndex; +use self::docs_words_index::DocsWordsIndex; +use self::documents_index::DocumentsIndex; +use self::main_index::MainIndex; +use self::synonyms_index::SynonymsIndex; +use self::updates_index::UpdatesIndex; +use self::words_index::WordsIndex; + +use super::{ + DocumentsAddition, DocumentsDeletion, + SynonymsAddition, SynonymsDeletion, +}; + +mod custom_settings_index; +mod docs_words_index; +mod documents_index; +mod main_index; +mod synonyms_index; +mod updates_index; +mod words_index; + +#[derive(Copy, Clone)] +pub struct IndexStats { + pub number_of_words: usize, + pub number_of_documents: usize, + pub number_attrs_in_ranked_map: usize, +} + +#[derive(Clone)] +pub struct Index { + pub(crate) cache: ArcSwap, + + // TODO this will be a snapshot in the future + main_index: MainIndex, + synonyms_index: SynonymsIndex, + words_index: WordsIndex, + docs_words_index: DocsWordsIndex, + documents_index: DocumentsIndex, + custom_settings_index: CustomSettingsIndex, + updates_index: UpdatesIndex, +} + +pub(crate) struct Cache { + pub words: Arc, + pub synonyms: Arc, + pub schema: Schema, + pub ranked_map: RankedMap, +} + +impl Index { + pub fn new(db: &sled::Db, name: &str) -> Result { + let main_index = db.open_tree(name).map(MainIndex)?; + let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; + let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; + let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; + let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; + let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; + + let updates = db.open_tree(format!("{}-updates", name))?; + let updates_results = db.open_tree(format!("{}-updates-results", name))?; + let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results); + + let words = match main_index.words_set()? { + Some(words) => Arc::new(words), + None => Arc::new(fst::Set::default()), + }; + + let synonyms = match main_index.synonyms_set()? { + Some(synonyms) => Arc::new(synonyms), + None => Arc::new(fst::Set::default()), + }; + + let schema = match main_index.schema()? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + + let ranked_map = match main_index.ranked_map()? { + Some(map) => map, + None => RankedMap::default(), + }; + + let cache = Cache { words, synonyms, schema, ranked_map }; + let cache = ArcSwap::from_pointee(cache); + + Ok(Index { + cache, + main_index, + synonyms_index, + words_index, + docs_words_index, + documents_index, + custom_settings_index, + updates_index, + }) + } + + pub fn with_schema(db: &sled::Db, name: &str, schema: Schema) -> Result { + let main_index = db.open_tree(name).map(MainIndex)?; + let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; + let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; + let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; + let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; + let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; + + let updates = db.open_tree(format!("{}-updates", name))?; + let updates_results = db.open_tree(format!("{}-updates-results", name))?; + let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results); + + let words = match main_index.words_set()? { + Some(words) => Arc::new(words), + None => Arc::new(fst::Set::default()), + }; + + let synonyms = match main_index.synonyms_set()? { + Some(synonyms) => Arc::new(synonyms), + None => Arc::new(fst::Set::default()), + }; + + match main_index.schema()? { + Some(current) => if current != schema { + return Err(Error::SchemaDiffer) + }, + None => main_index.set_schema(&schema)?, + } + + let ranked_map = match main_index.ranked_map()? { + Some(map) => map, + None => RankedMap::default(), + }; + + let cache = Cache { words, synonyms, schema, ranked_map }; + let cache = ArcSwap::from_pointee(cache); + + Ok(Index { + cache, + main_index, + synonyms_index, + words_index, + docs_words_index, + documents_index, + custom_settings_index, + updates_index, + }) + } + + pub fn stats(&self) -> sled::Result { + let cache = self.cache.load(); + Ok(IndexStats { + number_of_words: cache.words.len(), + number_of_documents: self.documents_index.len()?, + number_attrs_in_ranked_map: cache.ranked_map.len(), + }) + } + + pub fn query_builder(&self) -> QueryBuilder { + let ref_index = self.as_ref(); + QueryBuilder::new(ref_index) + } + + pub fn query_builder_with_criteria<'c>( + &self, + criteria: Criteria<'c>, + ) -> QueryBuilder<'c, RefIndex> + { + let ref_index = self.as_ref(); + QueryBuilder::with_criteria(ref_index, criteria) + } + + pub fn as_ref(&self) -> RefIndex { + RefIndex { + cache: self.cache.load(), + main_index: &self.main_index, + synonyms_index: &self.synonyms_index, + words_index: &self.words_index, + docs_words_index: &self.docs_words_index, + documents_index: &self.documents_index, + custom_settings_index: &self.custom_settings_index, + } + } + + pub fn schema(&self) -> Schema { + self.cache.load().schema.clone() + } + + pub fn custom_settings(&self) -> CustomSettingsIndex { + self.custom_settings_index.clone() + } + + pub fn documents_addition(&self) -> DocumentsAddition { + let ranked_map = self.cache.load().ranked_map.clone(); + DocumentsAddition::new(self, ranked_map) + } + + pub fn documents_deletion(&self) -> DocumentsDeletion { + let ranked_map = self.cache.load().ranked_map.clone(); + DocumentsDeletion::new(self, ranked_map) + } + + pub fn synonyms_addition(&self) -> SynonymsAddition { + SynonymsAddition::new(self) + } + + pub fn synonyms_deletion(&self) -> SynonymsDeletion { + SynonymsDeletion::new(self) + } + + pub fn document( + &self, + fields: Option<&HashSet<&str>>, + id: DocumentId, + ) -> Result, DeserializerError> + where T: de::DeserializeOwned, + { + let schema = self.schema(); + let fields = match fields { + Some(fields) => fields.into_iter().map(|name| schema.attribute(name)).collect(), + None => None, + }; + + let mut deserializer = Deserializer { + document_id: id, + index: &self, + fields: fields.as_ref(), + }; + + // TODO: currently we return an error if all document fields are missing, + // returning None would have been better + T::deserialize(&mut deserializer).map(Some) + } +} + +pub struct RefIndex<'a> { + pub(crate) cache: Guard<'static, Arc>, + pub main_index: &'a MainIndex, + pub synonyms_index: &'a SynonymsIndex, + pub words_index: &'a WordsIndex, + pub docs_words_index: &'a DocsWordsIndex, + pub documents_index: &'a DocumentsIndex, + pub custom_settings_index: &'a CustomSettingsIndex, +} + +impl Store for RefIndex<'_> { + type Error = Error; + + fn words(&self) -> Result<&fst::Set, Self::Error> { + Ok(&self.cache.words) + } + + fn word_indexes(&self, word: &[u8]) -> Result>, Self::Error> { + Ok(self.words_index.doc_indexes(word)?) + } + + fn synonyms(&self) -> Result<&fst::Set, Self::Error> { + Ok(&self.cache.synonyms) + } + + fn alternatives_to(&self, word: &[u8]) -> Result, Self::Error> { + Ok(self.synonyms_index.alternatives_to(word)?) + } +} diff --git a/meilidb-data/src/database/index/synonyms_index.rs b/meilidb-data/src/database/index/synonyms_index.rs new file mode 100644 index 000000000..0d9fd9d7d --- /dev/null +++ b/meilidb-data/src/database/index/synonyms_index.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +#[derive(Clone)] +pub struct SynonymsIndex(pub(crate) Arc); + +impl SynonymsIndex { + pub fn alternatives_to(&self, word: &[u8]) -> sled::Result> { + match self.0.get(word)? { + Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())), + None => Ok(None), + } + } + + pub fn set_alternatives_to(&self, word: &[u8], value: Vec) -> sled::Result<()> { + self.0.insert(word, value).map(drop) + } + + pub fn del_alternatives_of(&self, word: &[u8]) -> sled::Result<()> { + self.0.remove(word).map(drop) + } +} diff --git a/meilidb-data/src/database/index/updates_index.rs b/meilidb-data/src/database/index/updates_index.rs new file mode 100644 index 000000000..7b91eaaa2 --- /dev/null +++ b/meilidb-data/src/database/index/updates_index.rs @@ -0,0 +1,110 @@ +use std::convert::TryInto; +use std::sync::Arc; +use std::thread; + +use log::info; +use sled::Event; +use serde::{Serialize, Deserialize}; + +use super::Error; +use crate::database::{ + DocumentsAddition, DocumentsDeletion, SynonymsAddition, SynonymsDeletion +}; + +fn event_is_set(event: &Event) -> bool { + match event { + Event::Set(_, _) => true, + _ => false, + } +} + +#[derive(Serialize, Deserialize)] +enum Update { + DocumentsAddition( () /*DocumentsAddition*/), + DocumentsDeletion( () /*DocumentsDeletion*/), + SynonymsAddition( () /*SynonymsAddition*/), + SynonymsDeletion( () /*SynonymsDeletion*/), +} + +#[derive(Clone)] +pub struct UpdatesIndex { + db: sled::Db, + updates: Arc, + results: Arc, +} + +impl UpdatesIndex { + pub fn new( + db: sled::Db, + updates: Arc, + results: Arc, + ) -> UpdatesIndex + { + let updates_clone = updates.clone(); + let results_clone = results.clone(); + let _handle = thread::spawn(move || { + loop { + let mut subscription = updates_clone.watch_prefix(vec![]); + + while let Some((key, update)) = updates_clone.pop_min().unwrap() { + let array = key.as_ref().try_into().unwrap(); + let id = u64::from_be_bytes(array); + + match bincode::deserialize(&update).unwrap() { + Update::DocumentsAddition(_) => { + info!("processing the document addition (update number {})", id); + // ... + }, + Update::DocumentsDeletion(_) => { + info!("processing the document deletion (update number {})", id); + // ... + }, + Update::SynonymsAddition(_) => { + info!("processing the synonyms addition (update number {})", id); + // ... + }, + Update::SynonymsDeletion(_) => { + info!("processing the synonyms deletion (update number {})", id); + // ... + }, + } + } + + // this subscription is just used to block + // the loop until a new update is inserted + subscription.filter(event_is_set).next(); + } + }); + + UpdatesIndex { db, updates, results } + } + + pub fn push_documents_addition(&self, addition: DocumentsAddition) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + pub fn push_documents_deletion(&self, deletion: DocumentsDeletion) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + pub fn push_synonyms_addition(&self, addition: SynonymsAddition) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + pub fn push_synonyms_deletion(&self, deletion: SynonymsDeletion) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + fn raw_push_update(&self, raw_update: Vec) -> Result { + let update_id = self.db.generate_id()?; + let update_id_array = update_id.to_be_bytes(); + + self.updates.insert(update_id_array, raw_update)?; + + Ok(update_id) + } +} diff --git a/meilidb-data/src/database/words_index.rs b/meilidb-data/src/database/index/words_index.rs similarity index 75% rename from meilidb-data/src/database/words_index.rs rename to meilidb-data/src/database/index/words_index.rs index 4f2163650..094d159d3 100644 --- a/meilidb-data/src/database/words_index.rs +++ b/meilidb-data/src/database/index/words_index.rs @@ -1,14 +1,13 @@ +use std::sync::Arc; use meilidb_core::DocIndex; use sdset::{Set, SetBuf}; use zerocopy::{LayoutVerified, AsBytes}; -use crate::database::raw_index::InnerRawIndex; - #[derive(Clone)] -pub struct WordsIndex(pub(crate) InnerRawIndex); +pub struct WordsIndex(pub(crate) Arc); impl WordsIndex { - pub fn doc_indexes(&self, word: &[u8]) -> Result>, rocksdb::Error> { + pub fn doc_indexes(&self, word: &[u8]) -> sled::Result>> { // we must force an allocation to make the memory aligned match self.0.get(word)? { Some(bytes) => { @@ -36,13 +35,11 @@ impl WordsIndex { } } - pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> Result<(), rocksdb::Error> { - self.0.set(word, set.as_bytes())?; - Ok(()) + pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { + self.0.insert(word, set.as_bytes()).map(drop) } - pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> { - self.0.delete(word)?; - Ok(()) + pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { + self.0.remove(word).map(drop) } } diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs index 2edf774e0..4a1bb7a0b 100644 --- a/meilidb-data/src/database/mod.rs +++ b/meilidb-data/src/database/mod.rs @@ -1,88 +1,63 @@ use std::collections::hash_map::Entry; use std::collections::{HashSet, HashMap}; use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; use meilidb_schema::Schema; -mod custom_settings; -mod docs_words_index; -mod documents_addition; -mod documents_deletion; -mod documents_index; mod error; mod index; -mod main_index; -mod raw_index; + +mod documents_addition; +mod documents_deletion; mod synonyms_addition; mod synonyms_deletion; -mod synonyms_index; -mod words_index; pub use self::error::Error; -pub use self::index::Index; -pub use self::custom_settings::CustomSettings; +pub use self::index::{Index, CustomSettingsIndex}; -use self::docs_words_index::DocsWordsIndex; use self::documents_addition::DocumentsAddition; use self::documents_deletion::DocumentsDeletion; use self::synonyms_addition::SynonymsAddition; use self::synonyms_deletion::SynonymsDeletion; -use self::documents_index::DocumentsIndex; -use self::index::InnerIndex; -use self::main_index::MainIndex; -use self::raw_index::{RawIndex, InnerRawIndex}; -use self::words_index::WordsIndex; -use self::synonyms_index::SynonymsIndex; + +fn load_indexes(tree: &sled::Tree) -> Result, Error> { + match tree.get("indexes")? { + Some(bytes) => Ok(bincode::deserialize(&bytes)?), + None => Ok(HashSet::new()) + } +} pub struct Database { - cache: RwLock>>, - inner: Arc, + cache: RwLock>, + inner: sled::Db, } impl Database { - pub fn start_default>(path: P) -> Result { - let path = path.as_ref(); + pub fn open>(path: P) -> Result { let cache = RwLock::new(HashMap::new()); + let inner = sled::Db::open(path)?; - let options = { - let mut options = rocksdb::Options::default(); - options.create_if_missing(true); - options - }; - let cfs = rocksdb::DB::list_cf(&options, path).unwrap_or(Vec::new()); - let inner = Arc::new(rocksdb::DB::open_cf(&options, path, &cfs)?); + let indexes = load_indexes(&inner)?; let database = Database { cache, inner }; - let mut indexes: Vec<_> = cfs.iter() - .filter_map(|c| c.split('-').nth(0).filter(|&c| c != "default")) - .collect(); - indexes.sort_unstable(); - indexes.dedup(); - for index in indexes { - database.open_index(index)?; + database.open_index(&index)?; } Ok(database) } - pub fn indexes(&self) -> Result>, Error> { - let bytes = match self.inner.get("indexes")? { - Some(bytes) => bytes, - None => return Ok(None), - }; - - let indexes = bincode::deserialize(&bytes)?; - Ok(Some(indexes)) + pub fn indexes(&self) -> Result, Error> { + load_indexes(&self.inner) } fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { let bytes = bincode::serialize(value)?; - self.inner.put("indexes", bytes)?; + self.inner.insert("indexes", bytes)?; Ok(()) } - pub fn open_index(&self, name: &str) -> Result>, Error> { + pub fn open_index(&self, name: &str) -> Result, Error> { { let cache = self.cache.read().unwrap(); if let Some(index) = cache.get(name).cloned() { @@ -96,56 +71,19 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - if !self.indexes()?.map_or(false, |x| x.contains(name)) { + if !self.indexes()?.contains(name) { return Ok(None) } - let main = { - self.inner.cf_handle(name).expect("cf not found"); - MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name))) - }; - - let synonyms = { - let cf_name = format!("{}-synonyms", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - SynonymsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let words = { - let cf_name = format!("{}-words", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let docs_words = { - let cf_name = format!("{}-docs-words", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let documents = { - let cf_name = format!("{}-documents", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let custom = { - let cf_name = format!("{}-custom", name); - self.inner.cf_handle(&cf_name).expect("cf not found"); - CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let raw_index = RawIndex { main, synonyms, words, docs_words, documents, custom }; - let index = Index::from_raw(raw_index)?; - - vacant.insert(Arc::new(index)).clone() + let index = Index::new(&self.inner, name)?; + vacant.insert(index).clone() }, }; Ok(Some(index)) } - pub fn create_index(&self, name: &str, schema: Schema) -> Result, Error> { + pub fn create_index(&self, name: &str, schema: Schema) -> Result { let mut cache = self.cache.write().unwrap(); let index = match cache.entry(name.to_string()) { @@ -153,57 +91,13 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - let main = { - self.inner.create_cf(name, &rocksdb::Options::default())?; - MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name))) - }; + let index = Index::with_schema(&self.inner, name, schema)?; - if let Some(prev_schema) = main.schema()? { - if prev_schema != schema { - return Err(Error::SchemaDiffer) - } - } - - main.set_schema(&schema)?; - - let synonyms = { - let cf_name = format!("{}-synonyms", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - SynonymsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let words = { - let cf_name = format!("{}-words", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let docs_words = { - let cf_name = format!("{}-docs-words", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let documents = { - let cf_name = format!("{}-documents", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let custom = { - let cf_name = format!("{}-custom", name); - self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; - CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name))) - }; - - let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); + let mut indexes = self.indexes()?; indexes.insert(name.to_string()); self.set_indexes(&indexes)?; - let raw_index = RawIndex { main, synonyms, words, docs_words, documents, custom }; - let index = Index::from_raw(raw_index)?; - - vacant.insert(Arc::new(index)).clone() + vacant.insert(index).clone() }, }; diff --git a/meilidb-data/src/database/raw_index.rs b/meilidb-data/src/database/raw_index.rs deleted file mode 100644 index 612fb0df1..000000000 --- a/meilidb-data/src/database/raw_index.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::sync::Arc; -use super::{MainIndex, SynonymsIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings}; - -#[derive(Clone)] -pub struct RawIndex { - pub main: MainIndex, - pub synonyms: SynonymsIndex, - pub words: WordsIndex, - pub docs_words: DocsWordsIndex, - pub documents: DocumentsIndex, - pub custom: CustomSettings, -} - -impl RawIndex { - pub(crate) fn compact(&self) { - self.main.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.synonyms.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.words.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.docs_words.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.documents.0.compact_range(None::<&[u8]>, None::<&[u8]>); - self.custom.0.compact_range(None::<&[u8]>, None::<&[u8]>); - } -} - -#[derive(Clone)] -pub struct InnerRawIndex { - database: Arc, - name: Arc, -} - -impl InnerRawIndex { - pub fn new(database: Arc, name: Arc) -> InnerRawIndex { - InnerRawIndex { database, name } - } - - pub fn get(&self, key: K) -> Result, rocksdb::Error> - where K: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.get_cf(cf, key) - } - - pub fn get_pinned(&self, key: K) -> Result, rocksdb::Error> - where K: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.get_pinned_cf(cf, key) - } - - pub fn iterator(&self, from: rocksdb::IteratorMode) -> Result { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.iterator_cf(cf, from) - } - - pub fn set(&self, key: K, value: V) -> Result<(), rocksdb::Error> - where K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.put_cf(cf, key, value) - } - - pub fn delete(&self, key: K) -> Result<(), rocksdb::Error> - where K: AsRef<[u8]> - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.delete_cf(cf, key) - } - - pub fn delete_range(&self, start: K, end: K) -> Result<(), rocksdb::Error> - where K: AsRef<[u8]>, - { - let mut batch = rocksdb::WriteBatch::default(); - - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - batch.delete_range_cf(cf, start, end)?; - - self.database.write(batch) - } - - pub fn compact_range(&self, start: Option, end: Option) - where S: AsRef<[u8]>, - E: AsRef<[u8]>, - { - let cf = self.database.cf_handle(&self.name).expect("cf not found"); - self.database.compact_range_cf(cf, start, end) - } -} diff --git a/meilidb-data/src/database/synonyms_addition.rs b/meilidb-data/src/database/synonyms_addition.rs index e4c364387..68945af2d 100644 --- a/meilidb-data/src/database/synonyms_addition.rs +++ b/meilidb-data/src/database/synonyms_addition.rs @@ -5,8 +5,8 @@ use fst::{SetBuilder, set::OpBuilder}; use meilidb_core::normalize_str; use sdset::SetBuf; -use crate::database::index::InnerIndex; use super::{Error, Index}; +use super::index::Cache; pub struct SynonymsAddition<'a> { inner: &'a Index, @@ -29,9 +29,9 @@ impl<'a> SynonymsAddition<'a> { } pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let synonyms = &lease_inner.raw.synonyms; - let main = &lease_inner.raw.main; + let ref_index = self.inner.as_ref(); + let synonyms = ref_index.synonyms_index; + let main = ref_index.main_index; let mut synonyms_builder = SetBuilder::memory(); @@ -72,15 +72,14 @@ impl<'a> SynonymsAddition<'a> { main.set_synonyms_set(&synonyms)?; // update the "consistent" view of the Index + let cache = ref_index.cache; let words = Arc::new(main.words_set()?.unwrap_or_default()); - let ranked_map = lease_inner.ranked_map.clone(); + let ranked_map = cache.ranked_map.clone(); let synonyms = Arc::new(synonyms); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); + let schema = cache.schema.clone(); - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); + let cache = Cache { words, synonyms, schema, ranked_map }; + self.inner.cache.store(Arc::new(cache)); Ok(()) } diff --git a/meilidb-data/src/database/synonyms_deletion.rs b/meilidb-data/src/database/synonyms_deletion.rs index 6056dba3f..9ce415ead 100644 --- a/meilidb-data/src/database/synonyms_deletion.rs +++ b/meilidb-data/src/database/synonyms_deletion.rs @@ -6,8 +6,8 @@ use fst::{SetBuilder, set::OpBuilder}; use meilidb_core::normalize_str; use sdset::SetBuf; -use crate::database::index::InnerIndex; use super::{Error, Index}; +use super::index::Cache; pub struct SynonymsDeletion<'a> { inner: &'a Index, @@ -39,9 +39,9 @@ impl<'a> SynonymsDeletion<'a> { } pub fn finalize(self) -> Result<(), Error> { - let lease_inner = self.inner.lease_inner(); - let synonyms = &lease_inner.raw.synonyms; - let main = &lease_inner.raw.main; + let ref_index = self.inner.as_ref(); + let synonyms = ref_index.synonyms_index; + let main = ref_index.main_index; let mut delete_whole_synonym_builder = SetBuilder::memory(); @@ -115,15 +115,14 @@ impl<'a> SynonymsDeletion<'a> { main.set_synonyms_set(&synonyms)?; // update the "consistent" view of the Index + let cache = ref_index.cache; let words = Arc::new(main.words_set()?.unwrap_or_default()); - let ranked_map = lease_inner.ranked_map.clone(); + let ranked_map = cache.ranked_map.clone(); let synonyms = Arc::new(synonyms); - let schema = lease_inner.schema.clone(); - let raw = lease_inner.raw.clone(); - lease_inner.raw.compact(); + let schema = cache.schema.clone(); - let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; - self.inner.0.store(Arc::new(inner)); + let cache = Cache { words, synonyms, schema, ranked_map }; + self.inner.cache.store(Arc::new(cache)); Ok(()) } diff --git a/meilidb-data/src/database/synonyms_index.rs b/meilidb-data/src/database/synonyms_index.rs deleted file mode 100644 index dfc0182e4..000000000 --- a/meilidb-data/src/database/synonyms_index.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::database::raw_index::InnerRawIndex; - -#[derive(Clone)] -pub struct SynonymsIndex(pub(crate) InnerRawIndex); - -impl SynonymsIndex { - pub fn alternatives_to(&self, word: &[u8]) -> Result, rocksdb::Error> { - match self.0.get(word)? { - Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())), - None => Ok(None), - } - } - - pub fn set_alternatives_to(&self, word: &[u8], value: Vec) -> Result<(), rocksdb::Error> { - self.0.set(word, value)?; - Ok(()) - } - - pub fn del_alternatives_of(&self, word: &[u8]) -> Result<(), rocksdb::Error> { - self.0.delete(word)?; - Ok(()) - } -} diff --git a/meilidb-data/src/lib.rs b/meilidb-data/src/lib.rs index da5c35e28..05c6041ed 100644 --- a/meilidb-data/src/lib.rs +++ b/meilidb-data/src/lib.rs @@ -5,8 +5,8 @@ mod number; mod ranked_map; mod serde; -pub use rocksdb; -pub use self::database::{Database, Index, CustomSettings}; +pub use sled; +pub use self::database::{Database, Index, CustomSettingsIndex}; pub use self::number::Number; pub use self::ranked_map::RankedMap; pub use self::serde::{compute_document_id, extract_document_id, value_to_string}; diff --git a/meilidb-data/src/serde/deserializer.rs b/meilidb-data/src/serde/deserializer.rs index 24f00998c..4f35bdc34 100644 --- a/meilidb-data/src/serde/deserializer.rs +++ b/meilidb-data/src/serde/deserializer.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; use std::io::Cursor; +use std::{fmt, error::Error}; use meilidb_core::DocumentId; use meilidb_schema::SchemaAttr; @@ -9,6 +10,43 @@ use serde::{de, forward_to_deserialize_any}; use crate::database::Index; +#[derive(Debug)] +pub enum DeserializerError { + RmpError(RmpError), + SledError(sled::Error), + Custom(String), +} + +impl de::Error for DeserializerError { + fn custom(msg: T) -> Self { + DeserializerError::Custom(msg.to_string()) + } +} + +impl fmt::Display for DeserializerError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), + DeserializerError::SledError(e) => write!(f, "Sled related error: {}", e), + DeserializerError::Custom(s) => f.write_str(s), + } + } +} + +impl Error for DeserializerError {} + +impl From for DeserializerError { + fn from(error: RmpError) -> DeserializerError { + DeserializerError::RmpError(error) + } +} + +impl From for DeserializerError { + fn from(error: sled::Error) -> DeserializerError { + DeserializerError::SledError(error) + } +} + pub struct Deserializer<'a> { pub document_id: DocumentId, pub index: &'a Index, @@ -17,7 +55,7 @@ pub struct Deserializer<'a> { impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> { - type Error = RmpError; + type Error = DeserializerError; fn deserialize_any(self, visitor: V) -> Result where V: de::Visitor<'de> @@ -34,33 +72,41 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> fn deserialize_map(self, visitor: V) -> Result where V: de::Visitor<'de> { - let schema = &self.index.lease_inner().schema; - let documents = &self.index.lease_inner().raw.documents; + let schema = self.index.schema(); + let documents = self.index.as_ref().documents_index; - let document_attributes = documents.document_fields(self.document_id); - let document_attributes = document_attributes.filter_map(|result| { - match result { - Ok(value) => Some(value), - Err(e) => { - // TODO: must log the error - // error!("sled iter error; {}", e); - None - }, - } - }); + let mut error = None; - let iter = document_attributes.filter_map(|(attr, value)| { - let is_displayed = schema.props(attr).is_displayed(); - if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { - let attribute_name = schema.attribute_name(attr); - Some((attribute_name, Value::new(value))) - } else { - None - } - }); + let iter = documents + .document_fields(self.document_id) + .filter_map(|result| { + match result { + Ok((attr, value)) => { + let is_displayed = schema.props(attr).is_displayed(); + if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { + let attribute_name = schema.attribute_name(attr); + Some((attribute_name, Value::new(value))) + } else { + None + } + }, + Err(e) => { + if error.is_none() { + error = Some(e); + } + None + } + } + }); let map_deserializer = de::value::MapDeserializer::new(iter); - visitor.visit_map(map_deserializer) + let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from); + + if let Some(e) = error { + return Err(DeserializerError::from(e)) + } + + result } } diff --git a/meilidb-data/src/serde/mod.rs b/meilidb-data/src/serde/mod.rs index b0433f11f..995f46205 100644 --- a/meilidb-data/src/serde/mod.rs +++ b/meilidb-data/src/serde/mod.rs @@ -15,7 +15,7 @@ mod extract_document_id; mod indexer; mod serializer; -pub use self::deserializer::Deserializer; +pub use self::deserializer::{Deserializer, DeserializerError}; pub use self::extract_document_id::{extract_document_id, compute_document_id, value_to_string}; pub use self::convert_to_string::ConvertToString; pub use self::convert_to_number::ConvertToNumber; @@ -38,8 +38,8 @@ pub enum SerializerError { DocumentIdNotFound, InvalidDocumentIdType, RmpError(RmpError), + SledError(sled::Error), SerdeJsonError(SerdeJsonError), - RocksdbError(rocksdb::Error), ParseNumberError(ParseNumberError), UnserializableType { type_name: &'static str }, UnindexableType { type_name: &'static str }, @@ -63,8 +63,8 @@ impl fmt::Display for SerializerError { write!(f, "document identifier can only be of type string or number") }, SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), + SerializerError::SledError(e) => write!(f, "Sled related error: {}", e), SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e), - SerializerError::RocksdbError(e) => write!(f, "RocksDB related error: {}", e), SerializerError::ParseNumberError(e) => { write!(f, "error while trying to parse a number: {}", e) }, @@ -102,9 +102,9 @@ impl From for SerializerError { } } -impl From for SerializerError { - fn from(error: rocksdb::Error) -> SerializerError { - SerializerError::RocksdbError(error) +impl From for SerializerError { + fn from(error: sled::Error) -> SerializerError { + SerializerError::SledError(error) } } diff --git a/meilidb-data/tests/updates.rs b/meilidb-data/tests/updates.rs index e85da880e..2b832236e 100644 --- a/meilidb-data/tests/updates.rs +++ b/meilidb-data/tests/updates.rs @@ -12,7 +12,7 @@ fn simple_schema() -> Schema { #[test] fn insert_delete_document() { let tmp_dir = tempfile::tempdir().unwrap(); - let database = Database::start_default(&tmp_dir).unwrap(); + let database = Database::open(&tmp_dir).unwrap(); let schema = simple_schema(); let index = database.create_index("hello", schema).unwrap(); @@ -38,7 +38,7 @@ fn insert_delete_document() { #[test] fn replace_document() { let tmp_dir = tempfile::tempdir().unwrap(); - let database = Database::start_default(&tmp_dir).unwrap(); + let database = Database::open(&tmp_dir).unwrap(); let schema = simple_schema(); let index = database.create_index("hello", schema).unwrap(); diff --git a/meilidb/examples/create-database.rs b/meilidb/examples/create-database.rs index d8e553ed3..a663bc35c 100644 --- a/meilidb/examples/create-database.rs +++ b/meilidb/examples/create-database.rs @@ -85,7 +85,7 @@ fn index( synonyms: Vec, ) -> Result> { - let database = Database::start_default(database_path)?; + let database = Database::open(database_path)?; let mut wtr = csv::Writer::from_path("./stats.csv").unwrap(); wtr.write_record(&["NumberOfDocuments", "DiskUsed", "MemoryUsed"])?; diff --git a/meilidb/examples/query-database.rs b/meilidb/examples/query-database.rs index d939c0b70..c02dbc5bf 100644 --- a/meilidb/examples/query-database.rs +++ b/meilidb/examples/query-database.rs @@ -143,7 +143,7 @@ fn main() -> Result<(), Box> { let opt = Opt::from_args(); let start = Instant::now(); - let database = Database::start_default(&opt.database_path)?; + let database = Database::open(&opt.database_path)?; let index = database.open_index("test")?.unwrap(); let schema = index.schema();