From e5763e73eb1701a57b5f77736306c68731d10ab0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 5 Sep 2019 13:22:53 +0200 Subject: [PATCH 1/4] chore: Prefer using const names to avoid typos --- meilidb-data/src/database/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs index 602d9492e..44ba2155e 100644 --- a/meilidb-data/src/database/mod.rs +++ b/meilidb-data/src/database/mod.rs @@ -21,8 +21,10 @@ use self::update::apply_documents_deletion; use self::update::apply_synonyms_addition; use self::update::apply_synonyms_deletion; +const INDEXES_KEY: &str = "indexes"; + fn load_indexes(tree: &sled::Tree) -> Result, Error> { - match tree.get("indexes")? { + match tree.get(INDEXES_KEY)? { Some(bytes) => Ok(bincode::deserialize(&bytes)?), None => Ok(HashSet::new()) } @@ -54,7 +56,7 @@ impl Database { fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { let bytes = bincode::serialize(value)?; - self.inner.insert("indexes", bytes)?; + self.inner.insert(INDEXES_KEY, bytes)?; Ok(()) } From e3fa07077c0a726d42958f1102f9075f4d8cd9e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 5 Sep 2019 14:53:09 +0200 Subject: [PATCH 2/4] feat: Introduce the CfTree and CfIter types --- meilidb-data/Cargo.toml | 2 +- meilidb-data/src/cf_tree.rs | 72 +++++++++++++++++++++++++++++++++++++ meilidb-data/src/lib.rs | 5 ++- 3 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 meilidb-data/src/cf_tree.rs diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index 1491cba31..afec1b87e 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -14,11 +14,11 @@ meilidb-core = { path = "../meilidb-core", version = "0.1.0" } meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } ordered-float = { version = "1.0.2", features = ["serde"] } +rocksdb = "0.12.3" sdset = "0.3.2" serde = { version = "1.0.99", features = ["derive"] } serde_json = "1.0.40" siphasher = "0.3.0" -sled = "0.26.0" zerocopy = "0.2.8" [dependencies.rmp-serde] diff --git a/meilidb-data/src/cf_tree.rs b/meilidb-data/src/cf_tree.rs new file mode 100644 index 000000000..853ca2a53 --- /dev/null +++ b/meilidb-data/src/cf_tree.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; +use rocksdb::{DBVector, IteratorMode, Direction}; +use crate::RocksDbResult; + +#[derive(Clone)] +pub struct CfTree(Arc); + +struct CfTreeInner { + db: rocksdb::DB, + name: String, +} + +impl CfTree { + pub fn insert(&self, key: K, value: V) -> RocksDbResult<()> + where K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let cf = self.0.db.cf_handle(&self.0.name).unwrap(); + self.0.db.put_cf(cf, key, value) + } + + pub fn get(&self, key: K) -> RocksDbResult> + where K: AsRef<[u8]>, + { + let cf = self.0.db.cf_handle(&self.0.name).unwrap(); + self.0.db.get_cf(cf, key) + } + + pub fn remove(&self, key: K) -> RocksDbResult<()> + where K: AsRef<[u8]> + { + let cf = self.0.db.cf_handle(&self.0.name).unwrap(); + self.0.db.delete_cf(cf, key) + } + + /// Start and end key range is inclusive on both bounds. + pub fn range(&self, start: KS, end: KE) -> RocksDbResult + where KS: AsRef<[u8]>, + KE: AsRef<[u8]>, + { + let cf = self.0.db.cf_handle(&self.0.name).unwrap(); + + let mut iter = self.0.db.iterator_cf(cf, IteratorMode::Start)?; + iter.set_mode(IteratorMode::From(start.as_ref(), Direction::Forward)); + + let end_bound = Box::from(end.as_ref()); + Ok(CfIter { iter, end_bound: Some(end_bound) }) + } + + pub fn iter(&self) -> RocksDbResult { + let cf = self.0.db.cf_handle(&self.0.name).unwrap(); + let iter = self.0.db.iterator_cf(cf, IteratorMode::Start)?; + Ok(CfIter { iter, end_bound: None }) + } +} + +pub struct CfIter<'a> { + iter: rocksdb::DBIterator<'a>, + end_bound: Option>, +} + +impl Iterator for CfIter<'_> { + type Item = (Box<[u8]>, Box<[u8]>); + + fn next(&mut self) -> Option { + match (self.iter.next(), self.end_bound) { + (Some((key, _)), Some(end_bound)) if key > end_bound => None, + (Some(entry), _) => Some(entry), + (None, _) => None, + } + } +} diff --git a/meilidb-data/src/lib.rs b/meilidb-data/src/lib.rs index 05c6041ed..9124dcc77 100644 --- a/meilidb-data/src/lib.rs +++ b/meilidb-data/src/lib.rs @@ -1,3 +1,4 @@ +mod cf_tree; mod database; mod document_attr_key; mod indexer; @@ -5,8 +6,10 @@ mod number; mod ranked_map; mod serde; -pub use sled; +pub use self::cf_tree::{CfTree, CfIter}; pub use self::database::{Database, Index, CustomSettingsIndex}; pub use self::number::Number; pub use self::ranked_map::RankedMap; pub use self::serde::{compute_document_id, extract_document_id, value_to_string}; + +pub type RocksDbResult = Result; From f46868407c641df5307b70653744b53cf2db6ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 5 Sep 2019 18:43:10 +0200 Subject: [PATCH 3/4] feat: Make RocksDB works seemlessly like sled --- meilidb-data/Cargo.toml | 1 + meilidb-data/src/cf_tree.rs | 69 +++++-- meilidb-data/src/database/error.rs | 10 +- .../database/index/custom_settings_index.rs | 5 +- .../src/database/index/docs_words_index.rs | 2 +- .../src/database/index/documents_index.rs | 45 +++-- meilidb-data/src/database/index/main_index.rs | 2 +- meilidb-data/src/database/index/mod.rs | 177 ++++++++++-------- .../src/database/index/synonyms_index.rs | 10 +- .../src/database/index/words_index.rs | 11 +- meilidb-data/src/database/mod.rs | 14 +- meilidb-data/src/serde/deserializer.rs | 42 ++--- meilidb-data/src/serde/mod.rs | 10 +- 13 files changed, 218 insertions(+), 180 deletions(-) diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index afec1b87e..6535b56ae 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] arc-swap = "0.4.2" bincode = "1.1.4" +crossbeam-channel = "0.3.9" deunicode = "1.0.0" hashbrown = { version = "0.6.0", features = ["serde"] } log = "0.4.6" diff --git a/meilidb-data/src/cf_tree.rs b/meilidb-data/src/cf_tree.rs index 853ca2a53..791f0de81 100644 --- a/meilidb-data/src/cf_tree.rs +++ b/meilidb-data/src/cf_tree.rs @@ -1,36 +1,71 @@ use std::sync::Arc; +use crossbeam_channel::{unbounded, Sender, Receiver}; use rocksdb::{DBVector, IteratorMode, Direction}; use crate::RocksDbResult; #[derive(Clone)] -pub struct CfTree(Arc); +pub struct CfTree { + index: Arc, + sender: Option>, +} struct CfTreeInner { - db: rocksdb::DB, + db: Arc, name: String, } impl CfTree { + pub fn create(db: Arc, name: String) -> RocksDbResult { + let mut options = rocksdb::Options::default(); + options.create_missing_column_families(true); + + let _cf = db.create_cf(&name, &options)?; + let index = Arc::new(CfTreeInner { db, name }); + + Ok(CfTree { index, sender: None }) + } + + pub fn create_with_subcription( + db: Arc, + name: String, + ) -> RocksDbResult<(CfTree, Receiver<()>)> + { + let mut options = rocksdb::Options::default(); + options.create_missing_column_families(true); + + let _cf = db.create_cf(&name, &options)?; + let index = Arc::new(CfTreeInner { db, name }); + let (sender, receiver) = unbounded(); + + Ok((CfTree { index, sender: Some(sender) }, receiver)) + } + pub fn insert(&self, key: K, value: V) -> RocksDbResult<()> where K: AsRef<[u8]>, V: AsRef<[u8]>, { - let cf = self.0.db.cf_handle(&self.0.name).unwrap(); - self.0.db.put_cf(cf, key, value) + let cf = self.index.db.cf_handle(&self.index.name).unwrap(); + let result = self.index.db.put_cf(cf, key, value); + + if let Some(sender) = &self.sender { + let _err = sender.send(()); + } + + result } pub fn get(&self, key: K) -> RocksDbResult> where K: AsRef<[u8]>, { - let cf = self.0.db.cf_handle(&self.0.name).unwrap(); - self.0.db.get_cf(cf, key) + let cf = self.index.db.cf_handle(&self.index.name).unwrap(); + self.index.db.get_cf(cf, key) } pub fn remove(&self, key: K) -> RocksDbResult<()> where K: AsRef<[u8]> { - let cf = self.0.db.cf_handle(&self.0.name).unwrap(); - self.0.db.delete_cf(cf, key) + let cf = self.index.db.cf_handle(&self.index.name).unwrap(); + self.index.db.delete_cf(cf, key) } /// Start and end key range is inclusive on both bounds. @@ -38,9 +73,9 @@ impl CfTree { where KS: AsRef<[u8]>, KE: AsRef<[u8]>, { - let cf = self.0.db.cf_handle(&self.0.name).unwrap(); + let cf = self.index.db.cf_handle(&self.index.name).unwrap(); - let mut iter = self.0.db.iterator_cf(cf, IteratorMode::Start)?; + let mut iter = self.index.db.iterator_cf(cf, IteratorMode::Start)?; iter.set_mode(IteratorMode::From(start.as_ref(), Direction::Forward)); let end_bound = Box::from(end.as_ref()); @@ -48,10 +83,16 @@ impl CfTree { } pub fn iter(&self) -> RocksDbResult { - let cf = self.0.db.cf_handle(&self.0.name).unwrap(); - let iter = self.0.db.iterator_cf(cf, IteratorMode::Start)?; + let cf = self.index.db.cf_handle(&self.index.name).unwrap(); + let iter = self.index.db.iterator_cf(cf, IteratorMode::Start)?; Ok(CfIter { iter, end_bound: None }) } + + pub fn last_key(&self) -> RocksDbResult>> { + let cf = self.index.db.cf_handle(&self.index.name).unwrap(); + let mut iter = self.index.db.iterator_cf(cf, IteratorMode::End)?; + Ok(iter.next().map(|(key, _)| key)) + } } pub struct CfIter<'a> { @@ -63,8 +104,8 @@ impl Iterator for CfIter<'_> { type Item = (Box<[u8]>, Box<[u8]>); fn next(&mut self) -> Option { - match (self.iter.next(), self.end_bound) { - (Some((key, _)), Some(end_bound)) if key > end_bound => None, + match (self.iter.next(), &self.end_bound) { + (Some((ref key, _)), Some(end_bound)) if key > end_bound => None, (Some(entry), _) => Some(entry), (None, _) => None, } diff --git a/meilidb-data/src/database/error.rs b/meilidb-data/src/database/error.rs index 1b5b11b02..6da64c3e0 100644 --- a/meilidb-data/src/database/error.rs +++ b/meilidb-data/src/database/error.rs @@ -7,7 +7,7 @@ pub enum Error { SchemaMissing, WordIndexMissing, MissingDocumentId, - SledError(sled::Error), + RocksDbError(rocksdb::Error), FstError(fst::Error), RmpDecodeError(rmp_serde::decode::Error), RmpEncodeError(rmp_serde::encode::Error), @@ -15,9 +15,9 @@ pub enum Error { SerializerError(SerializerError), } -impl From for Error { - fn from(error: sled::Error) -> Error { - Error::SledError(error) +impl From for Error { + fn from(error: rocksdb::Error) -> Error { + Error::RocksDbError(error) } } @@ -59,7 +59,7 @@ impl fmt::Display for Error { SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), - SledError(e) => write!(f, "Sled error; {}", e), + RocksDbError(e) => write!(f, "RocksDB error; {}", e), FstError(e) => write!(f, "fst error; {}", e), RmpDecodeError(e) => write!(f, "rmp decode error; {}", e), RmpEncodeError(e) => write!(f, "rmp encode error; {}", e), diff --git a/meilidb-data/src/database/index/custom_settings_index.rs b/meilidb-data/src/database/index/custom_settings_index.rs index bab62bd2d..0fd0aade9 100644 --- a/meilidb-data/src/database/index/custom_settings_index.rs +++ b/meilidb-data/src/database/index/custom_settings_index.rs @@ -1,11 +1,10 @@ -use std::sync::Arc; use std::ops::Deref; #[derive(Clone)] -pub struct CustomSettingsIndex(pub(crate) Arc); +pub struct CustomSettingsIndex(pub(crate) crate::CfTree); impl Deref for CustomSettingsIndex { - type Target = sled::Tree; + type Target = crate::CfTree; fn deref(&self) -> &Self::Target { &self.0 diff --git a/meilidb-data/src/database/index/docs_words_index.rs b/meilidb-data/src/database/index/docs_words_index.rs index 5407f1cd7..8763dc588 100644 --- a/meilidb-data/src/database/index/docs_words_index.rs +++ b/meilidb-data/src/database/index/docs_words_index.rs @@ -3,7 +3,7 @@ use meilidb_core::DocumentId; use crate::database::Error; #[derive(Clone)] -pub struct DocsWordsIndex(pub Arc); +pub struct DocsWordsIndex(pub crate::CfTree); impl DocsWordsIndex { pub fn doc_words(&self, id: DocumentId) -> Result, Error> { diff --git a/meilidb-data/src/database/index/documents_index.rs b/meilidb-data/src/database/index/documents_index.rs index f9e5d6122..11475e800 100644 --- a/meilidb-data/src/database/index/documents_index.rs +++ b/meilidb-data/src/database/index/documents_index.rs @@ -1,64 +1,62 @@ -use std::sync::Arc; use std::convert::TryInto; -use std::ops::Bound; use meilidb_core::DocumentId; use meilidb_schema::SchemaAttr; +use rocksdb::DBVector; use crate::document_attr_key::DocumentAttrKey; +use crate::RocksDbResult; -fn document_fields_range(id: DocumentId) -> (Bound<[u8; 10]>, Bound<[u8; 10]>) { +fn document_fields_range(id: DocumentId) -> ([u8; 10], [u8; 10]) { let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); - (Bound::Included(start), Bound::Included(end)) + (start, end) } #[derive(Clone)] -pub struct DocumentsIndex(pub(crate) Arc); +pub struct DocumentsIndex(pub(crate) crate::CfTree); impl DocumentsIndex { - pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result> { + pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> RocksDbResult> { let key = DocumentAttrKey::new(id, attr).to_be_bytes(); self.0.get(key) } - pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> sled::Result<()> { + pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec) -> RocksDbResult<()> { let key = DocumentAttrKey::new(id, attr).to_be_bytes(); self.0.insert(key, value)?; Ok(()) } - pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { + pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> RocksDbResult<()> { let key = DocumentAttrKey::new(id, attr).to_be_bytes(); self.0.remove(key)?; Ok(()) } - pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { - let range = document_fields_range(id); + pub fn del_all_document_fields(&self, id: DocumentId) -> RocksDbResult<()> { + let (start, end) = document_fields_range(id); - for result in self.0.range(range) { - let (key, _) = result?; + for (key, _) in self.0.range(start, end)? { self.0.remove(key)?; } Ok(()) } - pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { - let range = document_fields_range(id); + pub fn document_fields(&self, id: DocumentId) -> RocksDbResult { + let (start, end) = document_fields_range(id); - let iter = self.0.range(range); - DocumentFieldsIter(iter) + let iter = self.0.range(start, end)?; + Ok(DocumentFieldsIter(iter)) } - pub fn len(&self) -> sled::Result { + pub fn len(&self) -> RocksDbResult { let mut last_document_id = None; let mut count = 0; - for result in self.0.iter() { - let (key, _) = result?; + for (key, _) in self.0.iter()? { let array = key.as_ref().try_into().unwrap(); let document_id = DocumentAttrKey::from_be_bytes(array).document_id; @@ -72,19 +70,18 @@ impl DocumentsIndex { } } -pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); +pub struct DocumentFieldsIter<'a>(crate::CfIter<'a>); impl Iterator for DocumentFieldsIter<'_> { - type Item = sled::Result<(SchemaAttr, sled::IVec)>; + type Item = (SchemaAttr, Box<[u8]>); fn next(&mut self) -> Option { match self.0.next() { - Some(Ok((key, value))) => { + Some((key, value)) => { let array = key.as_ref().try_into().unwrap(); let key = DocumentAttrKey::from_be_bytes(array); - Some(Ok((key.attribute, value))) + Some((key.attribute, value)) }, - Some(Err(e)) => return Some(Err(e)), None => None, } } diff --git a/meilidb-data/src/database/index/main_index.rs b/meilidb-data/src/database/index/main_index.rs index 3ae03f26f..f11637c85 100644 --- a/meilidb-data/src/database/index/main_index.rs +++ b/meilidb-data/src/database/index/main_index.rs @@ -11,7 +11,7 @@ const SYNONYMS_KEY: &str = "synonyms"; const RANKED_MAP_KEY: &str = "ranked-map"; #[derive(Clone)] -pub struct MainIndex(pub(crate) Arc); +pub struct MainIndex(pub(crate) crate::CfTree); impl MainIndex { pub fn schema(&self) -> Result, Error> { diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs index 769f0fd17..e9992cb55 100644 --- a/meilidb-data/src/database/index/mod.rs +++ b/meilidb-data/src/database/index/mod.rs @@ -1,17 +1,19 @@ use std::collections::{HashSet, BTreeMap}; use std::convert::TryInto; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; use std::time::{Duration, Instant}; use arc_swap::{ArcSwap, ArcSwapOption, Guard}; +use crossbeam_channel::Receiver; use meilidb_core::criterion::Criteria; use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; use meilidb_schema::Schema; use sdset::SetBuf; use serde::{de, Serialize, Deserialize}; -use sled::Transactional; +use crate::CfTree; use crate::ranked_map::RankedMap; use crate::serde::{Deserializer, DeserializerError}; @@ -22,6 +24,7 @@ use self::main_index::MainIndex; use self::synonyms_index::SynonymsIndex; use self::words_index::WordsIndex; +use crate::RocksDbResult; use crate::database::{ Error, DocumentsAddition, DocumentsDeletion, @@ -37,13 +40,6 @@ mod main_index; mod synonyms_index; mod words_index; -fn event_is_set(event: &sled::Event) -> bool { - match event { - sled::Event::Set(_, _) => true, - _ => false, - } -} - #[derive(Deserialize)] enum UpdateOwned { DocumentsAddition(Vec), @@ -81,74 +77,90 @@ pub struct UpdateStatus { pub detailed_duration: DetailedDuration, } -fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { +fn spawn_update_system(index: Index, subscription: Receiver<()>) -> thread::JoinHandle<()> { thread::spawn(move || { + let mut subscription = subscription.into_iter(); + loop { - let subscription = index.updates_index.watch_prefix(vec![]); - while let Some(result) = index.updates_index.iter().next() { - let (key, _) = result.unwrap(); + while let Some((key, _)) = index.updates_index.iter().unwrap().next() { let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap(); let updates = &index.updates_index; let results = &index.updates_results_index; - (updates, results).transaction(|(updates, results)| { - let update = updates.remove(&key)?.unwrap(); + let update = updates.get(&key).unwrap().unwrap(); - let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() { - UpdateOwned::DocumentsAddition(documents) => { - let update_type = UpdateType::DocumentsAddition { number: documents.len() }; - let ranked_map = index.cache.load().ranked_map.clone(); - let start = Instant::now(); - let result = apply_documents_addition(&index, ranked_map, documents); - (update_type, result, start.elapsed()) - }, - UpdateOwned::DocumentsDeletion(documents) => { - let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; - let ranked_map = index.cache.load().ranked_map.clone(); - let start = Instant::now(); - let result = apply_documents_deletion(&index, ranked_map, documents); - (update_type, result, start.elapsed()) - }, - UpdateOwned::SynonymsAddition(synonyms) => { - let update_type = UpdateType::SynonymsAddition { number: synonyms.len() }; - let start = Instant::now(); - let result = apply_synonyms_addition(&index, synonyms); - (update_type, result, start.elapsed()) - }, - UpdateOwned::SynonymsDeletion(synonyms) => { - let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() }; - let start = Instant::now(); - let result = apply_synonyms_deletion(&index, synonyms); - (update_type, result, start.elapsed()) - }, - }; + let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() { + UpdateOwned::DocumentsAddition(documents) => { + let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + let ranked_map = index.cache.load().ranked_map.clone(); + let start = Instant::now(); + let result = apply_documents_addition(&index, ranked_map, documents); + (update_type, result, start.elapsed()) + }, + UpdateOwned::DocumentsDeletion(documents) => { + let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + let ranked_map = index.cache.load().ranked_map.clone(); + let start = Instant::now(); + let result = apply_documents_deletion(&index, ranked_map, documents); + (update_type, result, start.elapsed()) + }, + UpdateOwned::SynonymsAddition(synonyms) => { + let update_type = UpdateType::SynonymsAddition { number: synonyms.len() }; + let start = Instant::now(); + let result = apply_synonyms_addition(&index, synonyms); + (update_type, result, start.elapsed()) + }, + UpdateOwned::SynonymsDeletion(synonyms) => { + let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() }; + let start = Instant::now(); + let result = apply_synonyms_deletion(&index, synonyms); + (update_type, result, start.elapsed()) + }, + }; - let detailed_duration = DetailedDuration { main: duration }; - let status = UpdateStatus { - update_id, - update_type, - result: result.map_err(|e| e.to_string()), - detailed_duration, - }; + let detailed_duration = DetailedDuration { main: duration }; + let status = UpdateStatus { + update_id, + update_type, + result: result.map_err(|e| e.to_string()), + detailed_duration, + }; - if let Some(callback) = &*index.update_callback.load() { - (callback)(status.clone()); - } + if let Some(callback) = &*index.update_callback.load() { + (callback)(status.clone()); + } - let value = bincode::serialize(&status).unwrap(); - results.insert(&key, value) - }) - .unwrap(); + let value = bincode::serialize(&status).unwrap(); + results.insert(&key, value).unwrap(); + updates.remove(&key).unwrap(); } // this subscription is just used to block // the loop until a new update is inserted - subscription.filter(event_is_set).next(); + subscription.next(); } }) } +fn last_update_id( + update_index: &crate::CfTree, + update_results_index: &crate::CfTree, +) -> RocksDbResult +{ + let uikey = match update_index.last_key()? { + Some(key) => Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()), + None => None, + }; + + let urikey = match update_results_index.last_key()? { + Some(key) => Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()), + None => None, + }; + + Ok(uikey.max(urikey).unwrap_or(0)) +} + #[derive(Copy, Clone)] pub struct IndexStats { pub number_of_words: usize, @@ -169,9 +181,9 @@ pub struct Index { custom_settings_index: CustomSettingsIndex, // used by the update system - db: sled::Db, - updates_index: Arc, - updates_results_index: Arc, + updates_id: Arc, + updates_index: crate::CfTree, + updates_results_index: crate::CfTree, update_callback: Arc>>, } @@ -183,23 +195,23 @@ pub(crate) struct Cache { } impl Index { - pub fn new(db: sled::Db, name: &str) -> Result { + pub fn new(db: Arc, name: &str) -> Result { Index::new_raw(db, name, None) } - pub fn with_schema(db: sled::Db, name: &str, schema: Schema) -> Result { + pub fn with_schema(db: Arc, name: &str, schema: Schema) -> Result { Index::new_raw(db, name, Some(schema)) } - fn new_raw(db: sled::Db, name: &str, schema: Option) -> Result { - let main_index = db.open_tree(name).map(MainIndex)?; - let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; - let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; - let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; - let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; - let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; - let updates_index = db.open_tree(format!("{}-updates", name))?; - let updates_results_index = db.open_tree(format!("{}-updates-results", name))?; + fn new_raw(db: Arc, name: &str, schema: Option) -> Result { + let main_index = CfTree::create(db.clone(), name.to_string()).map(MainIndex)?; + let synonyms_index = CfTree::create(db.clone(), format!("{}-synonyms", name)).map(SynonymsIndex)?; + let words_index = CfTree::create(db.clone(), format!("{}-words", name)).map(WordsIndex)?; + let docs_words_index = CfTree::create(db.clone(), format!("{}-docs-words", name)).map(DocsWordsIndex)?; + let documents_index = CfTree::create(db.clone(), format!("{}-documents", name)).map(DocumentsIndex)?; + let custom_settings_index = CfTree::create(db.clone(), format!("{}-custom", name)).map(CustomSettingsIndex)?; + let (updates_index, subscription) = CfTree::create_with_subcription(db.clone(), format!("{}-updates", name))?; + let updates_results_index = CfTree::create(db.clone(), format!("{}-updates-results", name))?; let words = match main_index.words_set()? { Some(words) => Arc::new(words), @@ -232,6 +244,9 @@ impl Index { let cache = Cache { words, synonyms, schema, ranked_map }; let cache = Arc::new(ArcSwap::from_pointee(cache)); + let last_update_id = last_update_id(&updates_index, &updates_results_index)?; + let updates_id = Arc::new(AtomicU64::new(last_update_id + 1)); + let index = Index { cache, main_index, @@ -240,13 +255,13 @@ impl Index { docs_words_index, documents_index, custom_settings_index, - db, + updates_id, updates_index, updates_results_index, update_callback: Arc::new(ArcSwapOption::empty()), }; - let _handle = spawn_update_system(index.clone()); + let _handle = spawn_update_system(index.clone(), subscription); Ok(index) } @@ -261,7 +276,7 @@ impl Index { self.update_callback.store(None); } - pub fn stats(&self) -> sled::Result { + pub fn stats(&self) -> RocksDbResult { let cache = self.cache.load(); Ok(IndexStats { number_of_words: cache.words.len(), @@ -340,17 +355,15 @@ impl Index { update_id: u64, ) -> Result { - let update_id_bytes = update_id.to_be_bytes().to_vec(); - let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes); - // if we find the update result return it now if let Some(result) = self.update_status(update_id)? { return Ok(result) } - // this subscription is used to block the thread - // until the update_id is inserted in the tree - subscription.next(); + loop { + if self.updates_results_index.get(&update_id.to_be_bytes())?.is_some() { break } + std::thread::sleep(Duration::from_millis(300)); + } // the thread has been unblocked, it means that the update result // has been inserted in the tree, retrieve it @@ -429,11 +442,9 @@ impl Index { } fn raw_push_update(&self, raw_update: Vec) -> Result { - let update_id = self.db.generate_id()?; + let update_id = self.updates_id.fetch_add(1, Ordering::SeqCst); let update_id_array = update_id.to_be_bytes(); - self.updates_index.insert(update_id_array, raw_update)?; - Ok(update_id) } } diff --git a/meilidb-data/src/database/index/synonyms_index.rs b/meilidb-data/src/database/index/synonyms_index.rs index 0d9fd9d7d..ec8901c9e 100644 --- a/meilidb-data/src/database/index/synonyms_index.rs +++ b/meilidb-data/src/database/index/synonyms_index.rs @@ -1,21 +1,21 @@ -use std::sync::Arc; +use crate::RocksDbResult; #[derive(Clone)] -pub struct SynonymsIndex(pub(crate) Arc); +pub struct SynonymsIndex(pub(crate) crate::CfTree); impl SynonymsIndex { - pub fn alternatives_to(&self, word: &[u8]) -> sled::Result> { + pub fn alternatives_to(&self, word: &[u8]) -> RocksDbResult> { match self.0.get(word)? { Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())), None => Ok(None), } } - pub fn set_alternatives_to(&self, word: &[u8], value: Vec) -> sled::Result<()> { + pub fn set_alternatives_to(&self, word: &[u8], value: Vec) -> RocksDbResult<()> { self.0.insert(word, value).map(drop) } - pub fn del_alternatives_of(&self, word: &[u8]) -> sled::Result<()> { + pub fn del_alternatives_of(&self, word: &[u8]) -> RocksDbResult<()> { self.0.remove(word).map(drop) } } diff --git a/meilidb-data/src/database/index/words_index.rs b/meilidb-data/src/database/index/words_index.rs index 5b538f273..97f9372f5 100644 --- a/meilidb-data/src/database/index/words_index.rs +++ b/meilidb-data/src/database/index/words_index.rs @@ -1,14 +1,13 @@ -use std::sync::Arc; - use meilidb_core::DocIndex; use sdset::{Set, SetBuf}; use zerocopy::{LayoutVerified, AsBytes}; +use crate::RocksDbResult; #[derive(Clone)] -pub struct WordsIndex(pub(crate) Arc); +pub struct WordsIndex(pub(crate) crate::CfTree); impl WordsIndex { - pub fn doc_indexes(&self, word: &[u8]) -> sled::Result>> { + pub fn doc_indexes(&self, word: &[u8]) -> RocksDbResult>> { // we must force an allocation to make the memory aligned match self.0.get(word)? { Some(bytes) => { @@ -36,11 +35,11 @@ impl WordsIndex { } } - pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> sled::Result<()> { + pub fn set_doc_indexes(&self, word: &[u8], set: &Set) -> RocksDbResult<()> { self.0.insert(word, set.as_bytes()).map(drop) } - pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { + pub fn del_doc_indexes(&self, word: &[u8]) -> RocksDbResult<()> { self.0.remove(word).map(drop) } } diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs index 44ba2155e..c560bbe94 100644 --- a/meilidb-data/src/database/mod.rs +++ b/meilidb-data/src/database/mod.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashSet, HashMap}; use std::path::Path; +use std::sync::Arc; use std::sync::RwLock; use meilidb_schema::Schema; @@ -23,7 +24,7 @@ use self::update::apply_synonyms_deletion; const INDEXES_KEY: &str = "indexes"; -fn load_indexes(tree: &sled::Tree) -> Result, Error> { +fn load_indexes(tree: &rocksdb::DB) -> Result, Error> { match tree.get(INDEXES_KEY)? { Some(bytes) => Ok(bincode::deserialize(&bytes)?), None => Ok(HashSet::new()) @@ -32,13 +33,18 @@ fn load_indexes(tree: &sled::Tree) -> Result, Error> { pub struct Database { cache: RwLock>, - inner: sled::Db, + inner: Arc, } impl Database { pub fn open>(path: P) -> Result { let cache = RwLock::new(HashMap::new()); - let inner = sled::Db::open(path)?; + + let mut options = rocksdb::Options::default(); + options.create_if_missing(true); + + let cfs = rocksdb::DB::list_cf(&options, &path).unwrap_or_default(); + let inner = Arc::new(rocksdb::DB::open_cf(&options, path, cfs)?); let indexes = load_indexes(&inner)?; let database = Database { cache, inner }; @@ -56,7 +62,7 @@ impl Database { fn set_indexes(&self, value: &HashSet) -> Result<(), Error> { let bytes = bincode::serialize(value)?; - self.inner.insert(INDEXES_KEY, bytes)?; + self.inner.put(INDEXES_KEY, bytes)?; Ok(()) } diff --git a/meilidb-data/src/serde/deserializer.rs b/meilidb-data/src/serde/deserializer.rs index 4f35bdc34..58c09c9d5 100644 --- a/meilidb-data/src/serde/deserializer.rs +++ b/meilidb-data/src/serde/deserializer.rs @@ -13,7 +13,7 @@ use crate::database::Index; #[derive(Debug)] pub enum DeserializerError { RmpError(RmpError), - SledError(sled::Error), + RocksDbError(rocksdb::Error), Custom(String), } @@ -27,7 +27,7 @@ impl fmt::Display for DeserializerError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), - DeserializerError::SledError(e) => write!(f, "Sled related error: {}", e), + DeserializerError::RocksDbError(e) => write!(f, "RocksDB related error: {}", e), DeserializerError::Custom(s) => f.write_str(s), } } @@ -41,9 +41,9 @@ impl From for DeserializerError { } } -impl From for DeserializerError { - fn from(error: sled::Error) -> DeserializerError { - DeserializerError::SledError(error) +impl From for DeserializerError { + fn from(error: rocksdb::Error) -> DeserializerError { + DeserializerError::RocksDbError(error) } } @@ -75,37 +75,21 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> let schema = self.index.schema(); let documents = self.index.as_ref().documents_index; - let mut error = None; - let iter = documents - .document_fields(self.document_id) - .filter_map(|result| { - match result { - Ok((attr, value)) => { - let is_displayed = schema.props(attr).is_displayed(); - if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { - let attribute_name = schema.attribute_name(attr); - Some((attribute_name, Value::new(value))) - } else { - None - } - }, - Err(e) => { - if error.is_none() { - error = Some(e); - } - None - } + .document_fields(self.document_id)? + .filter_map(|(attr, value)| { + let is_displayed = schema.props(attr).is_displayed(); + if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { + let attribute_name = schema.attribute_name(attr); + Some((attribute_name, Value::new(value))) + } else { + None } }); let map_deserializer = de::value::MapDeserializer::new(iter); let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from); - if let Some(e) = error { - return Err(DeserializerError::from(e)) - } - result } } diff --git a/meilidb-data/src/serde/mod.rs b/meilidb-data/src/serde/mod.rs index 995f46205..cdf996e9c 100644 --- a/meilidb-data/src/serde/mod.rs +++ b/meilidb-data/src/serde/mod.rs @@ -38,7 +38,7 @@ pub enum SerializerError { DocumentIdNotFound, InvalidDocumentIdType, RmpError(RmpError), - SledError(sled::Error), + RocksDbError(rocksdb::Error), SerdeJsonError(SerdeJsonError), ParseNumberError(ParseNumberError), UnserializableType { type_name: &'static str }, @@ -63,7 +63,7 @@ impl fmt::Display for SerializerError { write!(f, "document identifier can only be of type string or number") }, SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), - SerializerError::SledError(e) => write!(f, "Sled related error: {}", e), + SerializerError::RocksDbError(e) => write!(f, "RocksDB related error: {}", e), SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e), SerializerError::ParseNumberError(e) => { write!(f, "error while trying to parse a number: {}", e) @@ -102,9 +102,9 @@ impl From for SerializerError { } } -impl From for SerializerError { - fn from(error: sled::Error) -> SerializerError { - SerializerError::SledError(error) +impl From for SerializerError { + fn from(error: rocksdb::Error) -> SerializerError { + SerializerError::RocksDbError(error) } } From 400d542fef13c043d2ed44a4c563419c349245aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 12 Sep 2019 16:28:23 +0200 Subject: [PATCH 4/4] feat: Update the README to reflect the kv store update --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 889a743f4..4a7f657f0 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ A _full-text search database_ using a key-value store internally. -It uses [sled](https://github.com/spacejam/sled) as the internal key-value store. The key-value store allows us to handle updates and queries with small memory and CPU overheads. The whole ranking system is [data oriented](https://github.com/meilisearch/MeiliDB/issues/82) and provides great performances. +It uses [RocksDB](https://github.com/facebook/rocksdb) as the internal key-value store. The key-value store allows us to handle updates and queries with small memory and CPU overheads. The whole ranking system is [data oriented](https://github.com/meilisearch/MeiliDB/issues/82) and provides great performances. You can [read the deep dive](deep-dive.md) if you want more information on the engine, it describes the whole process of generating updates and handling queries or you can take a look at the [typos and ranking rules](typos-ranking-rules.md) if you want to know the default rules used to sort the documents.