Merge pull request #182 from meilisearch/replace-sled-by-rocksdb

Replace sled by RocksDB
This commit is contained in:
Clément Renault 2019-09-14 11:32:26 +02:00 committed by GitHub
commit 2658ef0176
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 285 additions and 170 deletions

View File

@ -22,7 +22,7 @@ A _full-text search database_ using a key-value store internally.
It uses [sled](https://github.com/spacejam/sled) as the internal key-value store. The key-value store allows us to handle updates and queries with small memory and CPU overheads. The whole ranking system is [data oriented](https://github.com/meilisearch/MeiliDB/issues/82) and provides great performances. It uses [RocksDB](https://github.com/facebook/rocksdb) as the internal key-value store. The key-value store allows us to handle updates and queries with small memory and CPU overheads. The whole ranking system is [data oriented](https://github.com/meilisearch/MeiliDB/issues/82) and provides great performances.
You can [read the deep dive](deep-dive.md) if you want more information on the engine, it describes the whole process of generating updates and handling queries or you can take a look at the [typos and ranking rules](typos-ranking-rules.md) if you want to know the default rules used to sort the documents. You can [read the deep dive](deep-dive.md) if you want more information on the engine, it describes the whole process of generating updates and handling queries or you can take a look at the [typos and ranking rules](typos-ranking-rules.md) if you want to know the default rules used to sort the documents.

View File

@ -7,6 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
arc-swap = "0.4.2" arc-swap = "0.4.2"
bincode = "1.1.4" bincode = "1.1.4"
crossbeam-channel = "0.3.9"
deunicode = "1.0.0" deunicode = "1.0.0"
hashbrown = { version = "0.6.0", features = ["serde"] } hashbrown = { version = "0.6.0", features = ["serde"] }
log = "0.4.6" log = "0.4.6"
@ -14,11 +15,11 @@ meilidb-core = { path = "../meilidb-core", version = "0.1.0" }
meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" }
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
ordered-float = { version = "1.0.2", features = ["serde"] } ordered-float = { version = "1.0.2", features = ["serde"] }
rocksdb = "0.12.3"
sdset = "0.3.2" sdset = "0.3.2"
serde = { version = "1.0.99", features = ["derive"] } serde = { version = "1.0.99", features = ["derive"] }
serde_json = "1.0.40" serde_json = "1.0.40"
siphasher = "0.3.0" siphasher = "0.3.0"
sled = "0.26.0"
zerocopy = "0.2.8" zerocopy = "0.2.8"
[dependencies.rmp-serde] [dependencies.rmp-serde]

113
meilidb-data/src/cf_tree.rs Normal file
View File

@ -0,0 +1,113 @@
use std::sync::Arc;
use crossbeam_channel::{unbounded, Sender, Receiver};
use rocksdb::{DBVector, IteratorMode, Direction};
use crate::RocksDbResult;
#[derive(Clone)]
pub struct CfTree {
index: Arc<CfTreeInner>,
sender: Option<Sender<()>>,
}
struct CfTreeInner {
db: Arc<rocksdb::DB>,
name: String,
}
impl CfTree {
pub fn create(db: Arc<rocksdb::DB>, name: String) -> RocksDbResult<CfTree> {
let mut options = rocksdb::Options::default();
options.create_missing_column_families(true);
let _cf = db.create_cf(&name, &options)?;
let index = Arc::new(CfTreeInner { db, name });
Ok(CfTree { index, sender: None })
}
pub fn create_with_subcription(
db: Arc<rocksdb::DB>,
name: String,
) -> RocksDbResult<(CfTree, Receiver<()>)>
{
let mut options = rocksdb::Options::default();
options.create_missing_column_families(true);
let _cf = db.create_cf(&name, &options)?;
let index = Arc::new(CfTreeInner { db, name });
let (sender, receiver) = unbounded();
Ok((CfTree { index, sender: Some(sender) }, receiver))
}
pub fn insert<K, V>(&self, key: K, value: V) -> RocksDbResult<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let result = self.index.db.put_cf(cf, key, value);
if let Some(sender) = &self.sender {
let _err = sender.send(());
}
result
}
pub fn get<K>(&self, key: K) -> RocksDbResult<Option<DBVector>>
where K: AsRef<[u8]>,
{
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
self.index.db.get_cf(cf, key)
}
pub fn remove<K>(&self, key: K) -> RocksDbResult<()>
where K: AsRef<[u8]>
{
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
self.index.db.delete_cf(cf, key)
}
/// Start and end key range is inclusive on both bounds.
pub fn range<KS, KE>(&self, start: KS, end: KE) -> RocksDbResult<CfIter>
where KS: AsRef<[u8]>,
KE: AsRef<[u8]>,
{
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let mut iter = self.index.db.iterator_cf(cf, IteratorMode::Start)?;
iter.set_mode(IteratorMode::From(start.as_ref(), Direction::Forward));
let end_bound = Box::from(end.as_ref());
Ok(CfIter { iter, end_bound: Some(end_bound) })
}
pub fn iter(&self) -> RocksDbResult<CfIter> {
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let iter = self.index.db.iterator_cf(cf, IteratorMode::Start)?;
Ok(CfIter { iter, end_bound: None })
}
pub fn last_key(&self) -> RocksDbResult<Option<Box<[u8]>>> {
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let mut iter = self.index.db.iterator_cf(cf, IteratorMode::End)?;
Ok(iter.next().map(|(key, _)| key))
}
}
pub struct CfIter<'a> {
iter: rocksdb::DBIterator<'a>,
end_bound: Option<Box<[u8]>>,
}
impl Iterator for CfIter<'_> {
type Item = (Box<[u8]>, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> {
match (self.iter.next(), &self.end_bound) {
(Some((ref key, _)), Some(end_bound)) if key > end_bound => None,
(Some(entry), _) => Some(entry),
(None, _) => None,
}
}
}

View File

@ -7,7 +7,7 @@ pub enum Error {
SchemaMissing, SchemaMissing,
WordIndexMissing, WordIndexMissing,
MissingDocumentId, MissingDocumentId,
SledError(sled::Error), RocksDbError(rocksdb::Error),
FstError(fst::Error), FstError(fst::Error),
RmpDecodeError(rmp_serde::decode::Error), RmpDecodeError(rmp_serde::decode::Error),
RmpEncodeError(rmp_serde::encode::Error), RmpEncodeError(rmp_serde::encode::Error),
@ -15,9 +15,9 @@ pub enum Error {
SerializerError(SerializerError), SerializerError(SerializerError),
} }
impl From<sled::Error> for Error { impl From<rocksdb::Error> for Error {
fn from(error: sled::Error) -> Error { fn from(error: rocksdb::Error) -> Error {
Error::SledError(error) Error::RocksDbError(error)
} }
} }
@ -59,7 +59,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"), SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"), WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"), MissingDocumentId => write!(f, "document id is missing"),
SledError(e) => write!(f, "Sled error; {}", e), RocksDbError(e) => write!(f, "RocksDB error; {}", e),
FstError(e) => write!(f, "fst error; {}", e), FstError(e) => write!(f, "fst error; {}", e),
RmpDecodeError(e) => write!(f, "rmp decode error; {}", e), RmpDecodeError(e) => write!(f, "rmp decode error; {}", e),
RmpEncodeError(e) => write!(f, "rmp encode error; {}", e), RmpEncodeError(e) => write!(f, "rmp encode error; {}", e),

View File

@ -1,11 +1,10 @@
use std::sync::Arc;
use std::ops::Deref; use std::ops::Deref;
#[derive(Clone)] #[derive(Clone)]
pub struct CustomSettingsIndex(pub(crate) Arc<sled::Tree>); pub struct CustomSettingsIndex(pub(crate) crate::CfTree);
impl Deref for CustomSettingsIndex { impl Deref for CustomSettingsIndex {
type Target = sled::Tree; type Target = crate::CfTree;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.0

View File

@ -3,7 +3,7 @@ use meilidb_core::DocumentId;
use crate::database::Error; use crate::database::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct DocsWordsIndex(pub Arc<sled::Tree>); pub struct DocsWordsIndex(pub crate::CfTree);
impl DocsWordsIndex { impl DocsWordsIndex {
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> { pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {

View File

@ -1,64 +1,62 @@
use std::sync::Arc;
use std::convert::TryInto; use std::convert::TryInto;
use std::ops::Bound;
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use meilidb_schema::SchemaAttr; use meilidb_schema::SchemaAttr;
use rocksdb::DBVector;
use crate::document_attr_key::DocumentAttrKey; use crate::document_attr_key::DocumentAttrKey;
use crate::RocksDbResult;
fn document_fields_range(id: DocumentId) -> (Bound<[u8; 10]>, Bound<[u8; 10]>) { fn document_fields_range(id: DocumentId) -> ([u8; 10], [u8; 10]) {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
(Bound::Included(start), Bound::Included(end)) (start, end)
} }
#[derive(Clone)] #[derive(Clone)]
pub struct DocumentsIndex(pub(crate) Arc<sled::Tree>); pub struct DocumentsIndex(pub(crate) crate::CfTree);
impl DocumentsIndex { impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<sled::IVec>> { pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> RocksDbResult<Option<DBVector>> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key) self.0.get(key)
} }
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> { pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> RocksDbResult<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.insert(key, value)?; self.0.insert(key, value)?;
Ok(()) Ok(())
} }
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> RocksDbResult<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.remove(key)?; self.0.remove(key)?;
Ok(()) Ok(())
} }
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { pub fn del_all_document_fields(&self, id: DocumentId) -> RocksDbResult<()> {
let range = document_fields_range(id); let (start, end) = document_fields_range(id);
for result in self.0.range(range) { for (key, _) in self.0.range(start, end)? {
let (key, _) = result?;
self.0.remove(key)?; self.0.remove(key)?;
} }
Ok(()) Ok(())
} }
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { pub fn document_fields(&self, id: DocumentId) -> RocksDbResult<DocumentFieldsIter> {
let range = document_fields_range(id); let (start, end) = document_fields_range(id);
let iter = self.0.range(range); let iter = self.0.range(start, end)?;
DocumentFieldsIter(iter) Ok(DocumentFieldsIter(iter))
} }
pub fn len(&self) -> sled::Result<usize> { pub fn len(&self) -> RocksDbResult<usize> {
let mut last_document_id = None; let mut last_document_id = None;
let mut count = 0; let mut count = 0;
for result in self.0.iter() { for (key, _) in self.0.iter()? {
let (key, _) = result?;
let array = key.as_ref().try_into().unwrap(); let array = key.as_ref().try_into().unwrap();
let document_id = DocumentAttrKey::from_be_bytes(array).document_id; let document_id = DocumentAttrKey::from_be_bytes(array).document_id;
@ -72,19 +70,18 @@ impl DocumentsIndex {
} }
} }
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); pub struct DocumentFieldsIter<'a>(crate::CfIter<'a>);
impl Iterator for DocumentFieldsIter<'_> { impl Iterator for DocumentFieldsIter<'_> {
type Item = sled::Result<(SchemaAttr, sled::IVec)>; type Item = (SchemaAttr, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
match self.0.next() { match self.0.next() {
Some(Ok((key, value))) => { Some((key, value)) => {
let array = key.as_ref().try_into().unwrap(); let array = key.as_ref().try_into().unwrap();
let key = DocumentAttrKey::from_be_bytes(array); let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value))) Some((key.attribute, value))
}, },
Some(Err(e)) => return Some(Err(e)),
None => None, None => None,
} }
} }

View File

@ -11,7 +11,7 @@ const SYNONYMS_KEY: &str = "synonyms";
const RANKED_MAP_KEY: &str = "ranked-map"; const RANKED_MAP_KEY: &str = "ranked-map";
#[derive(Clone)] #[derive(Clone)]
pub struct MainIndex(pub(crate) Arc<sled::Tree>); pub struct MainIndex(pub(crate) crate::CfTree);
impl MainIndex { impl MainIndex {
pub fn schema(&self) -> Result<Option<Schema>, Error> { pub fn schema(&self) -> Result<Option<Schema>, Error> {

View File

@ -1,17 +1,19 @@
use std::collections::{HashSet, BTreeMap}; use std::collections::{HashSet, BTreeMap};
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use arc_swap::{ArcSwap, ArcSwapOption, Guard}; use arc_swap::{ArcSwap, ArcSwapOption, Guard};
use crossbeam_channel::Receiver;
use meilidb_core::criterion::Criteria; use meilidb_core::criterion::Criteria;
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
use meilidb_schema::Schema; use meilidb_schema::Schema;
use sdset::SetBuf; use sdset::SetBuf;
use serde::{de, Serialize, Deserialize}; use serde::{de, Serialize, Deserialize};
use sled::Transactional;
use crate::CfTree;
use crate::ranked_map::RankedMap; use crate::ranked_map::RankedMap;
use crate::serde::{Deserializer, DeserializerError}; use crate::serde::{Deserializer, DeserializerError};
@ -22,6 +24,7 @@ use self::main_index::MainIndex;
use self::synonyms_index::SynonymsIndex; use self::synonyms_index::SynonymsIndex;
use self::words_index::WordsIndex; use self::words_index::WordsIndex;
use crate::RocksDbResult;
use crate::database::{ use crate::database::{
Error, Error,
DocumentsAddition, DocumentsDeletion, DocumentsAddition, DocumentsDeletion,
@ -37,13 +40,6 @@ mod main_index;
mod synonyms_index; mod synonyms_index;
mod words_index; mod words_index;
fn event_is_set(event: &sled::Event) -> bool {
match event {
sled::Event::Set(_, _) => true,
_ => false,
}
}
#[derive(Deserialize)] #[derive(Deserialize)]
enum UpdateOwned { enum UpdateOwned {
DocumentsAddition(Vec<rmpv::Value>), DocumentsAddition(Vec<rmpv::Value>),
@ -81,19 +77,18 @@ pub struct UpdateStatus {
pub detailed_duration: DetailedDuration, pub detailed_duration: DetailedDuration,
} }
fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { fn spawn_update_system(index: Index, subscription: Receiver<()>) -> thread::JoinHandle<()> {
thread::spawn(move || { thread::spawn(move || {
let mut subscription = subscription.into_iter();
loop { loop {
let subscription = index.updates_index.watch_prefix(vec![]); while let Some((key, _)) = index.updates_index.iter().unwrap().next() {
while let Some(result) = index.updates_index.iter().next() {
let (key, _) = result.unwrap();
let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap(); let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap();
let updates = &index.updates_index; let updates = &index.updates_index;
let results = &index.updates_results_index; let results = &index.updates_results_index;
(updates, results).transaction(|(updates, results)| { let update = updates.get(&key).unwrap().unwrap();
let update = updates.remove(&key)?.unwrap();
let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() { let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() {
UpdateOwned::DocumentsAddition(documents) => { UpdateOwned::DocumentsAddition(documents) => {
@ -137,18 +132,35 @@ fn spawn_update_system(index: Index) -> thread::JoinHandle<()> {
} }
let value = bincode::serialize(&status).unwrap(); let value = bincode::serialize(&status).unwrap();
results.insert(&key, value) results.insert(&key, value).unwrap();
}) updates.remove(&key).unwrap();
.unwrap();
} }
// this subscription is just used to block // this subscription is just used to block
// the loop until a new update is inserted // the loop until a new update is inserted
subscription.filter(event_is_set).next(); subscription.next();
} }
}) })
} }
fn last_update_id(
update_index: &crate::CfTree,
update_results_index: &crate::CfTree,
) -> RocksDbResult<u64>
{
let uikey = match update_index.last_key()? {
Some(key) => Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()),
None => None,
};
let urikey = match update_results_index.last_key()? {
Some(key) => Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()),
None => None,
};
Ok(uikey.max(urikey).unwrap_or(0))
}
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct IndexStats { pub struct IndexStats {
pub number_of_words: usize, pub number_of_words: usize,
@ -169,9 +181,9 @@ pub struct Index {
custom_settings_index: CustomSettingsIndex, custom_settings_index: CustomSettingsIndex,
// used by the update system // used by the update system
db: sled::Db, updates_id: Arc<AtomicU64>,
updates_index: Arc<sled::Tree>, updates_index: crate::CfTree,
updates_results_index: Arc<sled::Tree>, updates_results_index: crate::CfTree,
update_callback: Arc<ArcSwapOption<Box<dyn Fn(UpdateStatus) + Send + Sync + 'static>>>, update_callback: Arc<ArcSwapOption<Box<dyn Fn(UpdateStatus) + Send + Sync + 'static>>>,
} }
@ -183,23 +195,23 @@ pub(crate) struct Cache {
} }
impl Index { impl Index {
pub fn new(db: sled::Db, name: &str) -> Result<Index, Error> { pub fn new(db: Arc<rocksdb::DB>, name: &str) -> Result<Index, Error> {
Index::new_raw(db, name, None) Index::new_raw(db, name, None)
} }
pub fn with_schema(db: sled::Db, name: &str, schema: Schema) -> Result<Index, Error> { pub fn with_schema(db: Arc<rocksdb::DB>, name: &str, schema: Schema) -> Result<Index, Error> {
Index::new_raw(db, name, Some(schema)) Index::new_raw(db, name, Some(schema))
} }
fn new_raw(db: sled::Db, name: &str, schema: Option<Schema>) -> Result<Index, Error> { fn new_raw(db: Arc<rocksdb::DB>, name: &str, schema: Option<Schema>) -> Result<Index, Error> {
let main_index = db.open_tree(name).map(MainIndex)?; let main_index = CfTree::create(db.clone(), name.to_string()).map(MainIndex)?;
let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; let synonyms_index = CfTree::create(db.clone(), format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; let words_index = CfTree::create(db.clone(), format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; let docs_words_index = CfTree::create(db.clone(), format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; let documents_index = CfTree::create(db.clone(), format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; let custom_settings_index = CfTree::create(db.clone(), format!("{}-custom", name)).map(CustomSettingsIndex)?;
let updates_index = db.open_tree(format!("{}-updates", name))?; let (updates_index, subscription) = CfTree::create_with_subcription(db.clone(), format!("{}-updates", name))?;
let updates_results_index = db.open_tree(format!("{}-updates-results", name))?; let updates_results_index = CfTree::create(db.clone(), format!("{}-updates-results", name))?;
let words = match main_index.words_set()? { let words = match main_index.words_set()? {
Some(words) => Arc::new(words), Some(words) => Arc::new(words),
@ -232,6 +244,9 @@ impl Index {
let cache = Cache { words, synonyms, schema, ranked_map }; let cache = Cache { words, synonyms, schema, ranked_map };
let cache = Arc::new(ArcSwap::from_pointee(cache)); let cache = Arc::new(ArcSwap::from_pointee(cache));
let last_update_id = last_update_id(&updates_index, &updates_results_index)?;
let updates_id = Arc::new(AtomicU64::new(last_update_id + 1));
let index = Index { let index = Index {
cache, cache,
main_index, main_index,
@ -240,13 +255,13 @@ impl Index {
docs_words_index, docs_words_index,
documents_index, documents_index,
custom_settings_index, custom_settings_index,
db, updates_id,
updates_index, updates_index,
updates_results_index, updates_results_index,
update_callback: Arc::new(ArcSwapOption::empty()), update_callback: Arc::new(ArcSwapOption::empty()),
}; };
let _handle = spawn_update_system(index.clone()); let _handle = spawn_update_system(index.clone(), subscription);
Ok(index) Ok(index)
} }
@ -261,7 +276,7 @@ impl Index {
self.update_callback.store(None); self.update_callback.store(None);
} }
pub fn stats(&self) -> sled::Result<IndexStats> { pub fn stats(&self) -> RocksDbResult<IndexStats> {
let cache = self.cache.load(); let cache = self.cache.load();
Ok(IndexStats { Ok(IndexStats {
number_of_words: cache.words.len(), number_of_words: cache.words.len(),
@ -340,17 +355,15 @@ impl Index {
update_id: u64, update_id: u64,
) -> Result<UpdateStatus, Error> ) -> Result<UpdateStatus, Error>
{ {
let update_id_bytes = update_id.to_be_bytes().to_vec();
let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes);
// if we find the update result return it now // if we find the update result return it now
if let Some(result) = self.update_status(update_id)? { if let Some(result) = self.update_status(update_id)? {
return Ok(result) return Ok(result)
} }
// this subscription is used to block the thread loop {
// until the update_id is inserted in the tree if self.updates_results_index.get(&update_id.to_be_bytes())?.is_some() { break }
subscription.next(); std::thread::sleep(Duration::from_millis(300));
}
// the thread has been unblocked, it means that the update result // the thread has been unblocked, it means that the update result
// has been inserted in the tree, retrieve it // has been inserted in the tree, retrieve it
@ -429,11 +442,9 @@ impl Index {
} }
fn raw_push_update(&self, raw_update: Vec<u8>) -> Result<u64, Error> { fn raw_push_update(&self, raw_update: Vec<u8>) -> Result<u64, Error> {
let update_id = self.db.generate_id()?; let update_id = self.updates_id.fetch_add(1, Ordering::SeqCst);
let update_id_array = update_id.to_be_bytes(); let update_id_array = update_id.to_be_bytes();
self.updates_index.insert(update_id_array, raw_update)?; self.updates_index.insert(update_id_array, raw_update)?;
Ok(update_id) Ok(update_id)
} }
} }

View File

@ -1,21 +1,21 @@
use std::sync::Arc; use crate::RocksDbResult;
#[derive(Clone)] #[derive(Clone)]
pub struct SynonymsIndex(pub(crate) Arc<sled::Tree>); pub struct SynonymsIndex(pub(crate) crate::CfTree);
impl SynonymsIndex { impl SynonymsIndex {
pub fn alternatives_to(&self, word: &[u8]) -> sled::Result<Option<fst::Set>> { pub fn alternatives_to(&self, word: &[u8]) -> RocksDbResult<Option<fst::Set>> {
match self.0.get(word)? { match self.0.get(word)? {
Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())), Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())),
None => Ok(None), None => Ok(None),
} }
} }
pub fn set_alternatives_to(&self, word: &[u8], value: Vec<u8>) -> sled::Result<()> { pub fn set_alternatives_to(&self, word: &[u8], value: Vec<u8>) -> RocksDbResult<()> {
self.0.insert(word, value).map(drop) self.0.insert(word, value).map(drop)
} }
pub fn del_alternatives_of(&self, word: &[u8]) -> sled::Result<()> { pub fn del_alternatives_of(&self, word: &[u8]) -> RocksDbResult<()> {
self.0.remove(word).map(drop) self.0.remove(word).map(drop)
} }
} }

View File

@ -1,14 +1,13 @@
use std::sync::Arc;
use meilidb_core::DocIndex; use meilidb_core::DocIndex;
use sdset::{Set, SetBuf}; use sdset::{Set, SetBuf};
use zerocopy::{LayoutVerified, AsBytes}; use zerocopy::{LayoutVerified, AsBytes};
use crate::RocksDbResult;
#[derive(Clone)] #[derive(Clone)]
pub struct WordsIndex(pub(crate) Arc<sled::Tree>); pub struct WordsIndex(pub(crate) crate::CfTree);
impl WordsIndex { impl WordsIndex {
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> { pub fn doc_indexes(&self, word: &[u8]) -> RocksDbResult<Option<SetBuf<DocIndex>>> {
// we must force an allocation to make the memory aligned // we must force an allocation to make the memory aligned
match self.0.get(word)? { match self.0.get(word)? {
Some(bytes) => { Some(bytes) => {
@ -36,11 +35,11 @@ impl WordsIndex {
} }
} }
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> { pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> RocksDbResult<()> {
self.0.insert(word, set.as_bytes()).map(drop) self.0.insert(word, set.as_bytes()).map(drop)
} }
pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { pub fn del_doc_indexes(&self, word: &[u8]) -> RocksDbResult<()> {
self.0.remove(word).map(drop) self.0.remove(word).map(drop)
} }
} }

View File

@ -1,6 +1,7 @@
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use meilidb_schema::Schema; use meilidb_schema::Schema;
@ -21,8 +22,10 @@ use self::update::apply_documents_deletion;
use self::update::apply_synonyms_addition; use self::update::apply_synonyms_addition;
use self::update::apply_synonyms_deletion; use self::update::apply_synonyms_deletion;
fn load_indexes(tree: &sled::Tree) -> Result<HashSet<String>, Error> { const INDEXES_KEY: &str = "indexes";
match tree.get("indexes")? {
fn load_indexes(tree: &rocksdb::DB) -> Result<HashSet<String>, Error> {
match tree.get(INDEXES_KEY)? {
Some(bytes) => Ok(bincode::deserialize(&bytes)?), Some(bytes) => Ok(bincode::deserialize(&bytes)?),
None => Ok(HashSet::new()) None => Ok(HashSet::new())
} }
@ -30,13 +33,18 @@ fn load_indexes(tree: &sled::Tree) -> Result<HashSet<String>, Error> {
pub struct Database { pub struct Database {
cache: RwLock<HashMap<String, Index>>, cache: RwLock<HashMap<String, Index>>,
inner: sled::Db, inner: Arc<rocksdb::DB>,
} }
impl Database { impl Database {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Error> { pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
let cache = RwLock::new(HashMap::new()); let cache = RwLock::new(HashMap::new());
let inner = sled::Db::open(path)?;
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
let cfs = rocksdb::DB::list_cf(&options, &path).unwrap_or_default();
let inner = Arc::new(rocksdb::DB::open_cf(&options, path, cfs)?);
let indexes = load_indexes(&inner)?; let indexes = load_indexes(&inner)?;
let database = Database { cache, inner }; let database = Database { cache, inner };
@ -54,7 +62,7 @@ impl Database {
fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> { fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
let bytes = bincode::serialize(value)?; let bytes = bincode::serialize(value)?;
self.inner.insert("indexes", bytes)?; self.inner.put(INDEXES_KEY, bytes)?;
Ok(()) Ok(())
} }

View File

@ -1,3 +1,4 @@
mod cf_tree;
mod database; mod database;
mod document_attr_key; mod document_attr_key;
mod indexer; mod indexer;
@ -5,8 +6,10 @@ mod number;
mod ranked_map; mod ranked_map;
mod serde; mod serde;
pub use sled; pub use self::cf_tree::{CfTree, CfIter};
pub use self::database::{Database, Index, CustomSettingsIndex}; pub use self::database::{Database, Index, CustomSettingsIndex};
pub use self::number::Number; pub use self::number::Number;
pub use self::ranked_map::RankedMap; pub use self::ranked_map::RankedMap;
pub use self::serde::{compute_document_id, extract_document_id, value_to_string}; pub use self::serde::{compute_document_id, extract_document_id, value_to_string};
pub type RocksDbResult<T> = Result<T, rocksdb::Error>;

View File

@ -13,7 +13,7 @@ use crate::database::Index;
#[derive(Debug)] #[derive(Debug)]
pub enum DeserializerError { pub enum DeserializerError {
RmpError(RmpError), RmpError(RmpError),
SledError(sled::Error), RocksDbError(rocksdb::Error),
Custom(String), Custom(String),
} }
@ -27,7 +27,7 @@ impl fmt::Display for DeserializerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
DeserializerError::SledError(e) => write!(f, "Sled related error: {}", e), DeserializerError::RocksDbError(e) => write!(f, "RocksDB related error: {}", e),
DeserializerError::Custom(s) => f.write_str(s), DeserializerError::Custom(s) => f.write_str(s),
} }
} }
@ -41,9 +41,9 @@ impl From<RmpError> for DeserializerError {
} }
} }
impl From<sled::Error> for DeserializerError { impl From<rocksdb::Error> for DeserializerError {
fn from(error: sled::Error) -> DeserializerError { fn from(error: rocksdb::Error) -> DeserializerError {
DeserializerError::SledError(error) DeserializerError::RocksDbError(error)
} }
} }
@ -75,13 +75,9 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
let schema = self.index.schema(); let schema = self.index.schema();
let documents = self.index.as_ref().documents_index; let documents = self.index.as_ref().documents_index;
let mut error = None;
let iter = documents let iter = documents
.document_fields(self.document_id) .document_fields(self.document_id)?
.filter_map(|result| { .filter_map(|(attr, value)| {
match result {
Ok((attr, value)) => {
let is_displayed = schema.props(attr).is_displayed(); let is_displayed = schema.props(attr).is_displayed();
if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) {
let attribute_name = schema.attribute_name(attr); let attribute_name = schema.attribute_name(attr);
@ -89,23 +85,11 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
} else { } else {
None None
} }
},
Err(e) => {
if error.is_none() {
error = Some(e);
}
None
}
}
}); });
let map_deserializer = de::value::MapDeserializer::new(iter); let map_deserializer = de::value::MapDeserializer::new(iter);
let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from); let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from);
if let Some(e) = error {
return Err(DeserializerError::from(e))
}
result result
} }
} }

View File

@ -38,7 +38,7 @@ pub enum SerializerError {
DocumentIdNotFound, DocumentIdNotFound,
InvalidDocumentIdType, InvalidDocumentIdType,
RmpError(RmpError), RmpError(RmpError),
SledError(sled::Error), RocksDbError(rocksdb::Error),
SerdeJsonError(SerdeJsonError), SerdeJsonError(SerdeJsonError),
ParseNumberError(ParseNumberError), ParseNumberError(ParseNumberError),
UnserializableType { type_name: &'static str }, UnserializableType { type_name: &'static str },
@ -63,7 +63,7 @@ impl fmt::Display for SerializerError {
write!(f, "document identifier can only be of type string or number") write!(f, "document identifier can only be of type string or number")
}, },
SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
SerializerError::SledError(e) => write!(f, "Sled related error: {}", e), SerializerError::RocksDbError(e) => write!(f, "RocksDB related error: {}", e),
SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e), SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e),
SerializerError::ParseNumberError(e) => { SerializerError::ParseNumberError(e) => {
write!(f, "error while trying to parse a number: {}", e) write!(f, "error while trying to parse a number: {}", e)
@ -102,9 +102,9 @@ impl From<SerdeJsonError> for SerializerError {
} }
} }
impl From<sled::Error> for SerializerError { impl From<rocksdb::Error> for SerializerError {
fn from(error: sled::Error) -> SerializerError { fn from(error: rocksdb::Error) -> SerializerError {
SerializerError::SledError(error) SerializerError::RocksDbError(error)
} }
} }