feat: Make RocksDB works seemlessly like sled

This commit is contained in:
Clément Renault 2019-09-05 18:43:10 +02:00
parent e3fa07077c
commit f46868407c
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
13 changed files with 218 additions and 180 deletions

View File

@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
arc-swap = "0.4.2"
bincode = "1.1.4"
crossbeam-channel = "0.3.9"
deunicode = "1.0.0"
hashbrown = { version = "0.6.0", features = ["serde"] }
log = "0.4.6"

View File

@ -1,36 +1,71 @@
use std::sync::Arc;
use crossbeam_channel::{unbounded, Sender, Receiver};
use rocksdb::{DBVector, IteratorMode, Direction};
use crate::RocksDbResult;
#[derive(Clone)]
pub struct CfTree(Arc<CfTreeInner>);
pub struct CfTree {
index: Arc<CfTreeInner>,
sender: Option<Sender<()>>,
}
struct CfTreeInner {
db: rocksdb::DB,
db: Arc<rocksdb::DB>,
name: String,
}
impl CfTree {
pub fn create(db: Arc<rocksdb::DB>, name: String) -> RocksDbResult<CfTree> {
let mut options = rocksdb::Options::default();
options.create_missing_column_families(true);
let _cf = db.create_cf(&name, &options)?;
let index = Arc::new(CfTreeInner { db, name });
Ok(CfTree { index, sender: None })
}
pub fn create_with_subcription(
db: Arc<rocksdb::DB>,
name: String,
) -> RocksDbResult<(CfTree, Receiver<()>)>
{
let mut options = rocksdb::Options::default();
options.create_missing_column_families(true);
let _cf = db.create_cf(&name, &options)?;
let index = Arc::new(CfTreeInner { db, name });
let (sender, receiver) = unbounded();
Ok((CfTree { index, sender: Some(sender) }, receiver))
}
pub fn insert<K, V>(&self, key: K, value: V) -> RocksDbResult<()>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let cf = self.0.db.cf_handle(&self.0.name).unwrap();
self.0.db.put_cf(cf, key, value)
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let result = self.index.db.put_cf(cf, key, value);
if let Some(sender) = &self.sender {
let _err = sender.send(());
}
result
}
pub fn get<K>(&self, key: K) -> RocksDbResult<Option<DBVector>>
where K: AsRef<[u8]>,
{
let cf = self.0.db.cf_handle(&self.0.name).unwrap();
self.0.db.get_cf(cf, key)
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
self.index.db.get_cf(cf, key)
}
pub fn remove<K>(&self, key: K) -> RocksDbResult<()>
where K: AsRef<[u8]>
{
let cf = self.0.db.cf_handle(&self.0.name).unwrap();
self.0.db.delete_cf(cf, key)
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
self.index.db.delete_cf(cf, key)
}
/// Start and end key range is inclusive on both bounds.
@ -38,9 +73,9 @@ impl CfTree {
where KS: AsRef<[u8]>,
KE: AsRef<[u8]>,
{
let cf = self.0.db.cf_handle(&self.0.name).unwrap();
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let mut iter = self.0.db.iterator_cf(cf, IteratorMode::Start)?;
let mut iter = self.index.db.iterator_cf(cf, IteratorMode::Start)?;
iter.set_mode(IteratorMode::From(start.as_ref(), Direction::Forward));
let end_bound = Box::from(end.as_ref());
@ -48,10 +83,16 @@ impl CfTree {
}
pub fn iter(&self) -> RocksDbResult<CfIter> {
let cf = self.0.db.cf_handle(&self.0.name).unwrap();
let iter = self.0.db.iterator_cf(cf, IteratorMode::Start)?;
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let iter = self.index.db.iterator_cf(cf, IteratorMode::Start)?;
Ok(CfIter { iter, end_bound: None })
}
pub fn last_key(&self) -> RocksDbResult<Option<Box<[u8]>>> {
let cf = self.index.db.cf_handle(&self.index.name).unwrap();
let mut iter = self.index.db.iterator_cf(cf, IteratorMode::End)?;
Ok(iter.next().map(|(key, _)| key))
}
}
pub struct CfIter<'a> {
@ -63,8 +104,8 @@ impl Iterator for CfIter<'_> {
type Item = (Box<[u8]>, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> {
match (self.iter.next(), self.end_bound) {
(Some((key, _)), Some(end_bound)) if key > end_bound => None,
match (self.iter.next(), &self.end_bound) {
(Some((ref key, _)), Some(end_bound)) if key > end_bound => None,
(Some(entry), _) => Some(entry),
(None, _) => None,
}

View File

@ -7,7 +7,7 @@ pub enum Error {
SchemaMissing,
WordIndexMissing,
MissingDocumentId,
SledError(sled::Error),
RocksDbError(rocksdb::Error),
FstError(fst::Error),
RmpDecodeError(rmp_serde::decode::Error),
RmpEncodeError(rmp_serde::encode::Error),
@ -15,9 +15,9 @@ pub enum Error {
SerializerError(SerializerError),
}
impl From<sled::Error> for Error {
fn from(error: sled::Error) -> Error {
Error::SledError(error)
impl From<rocksdb::Error> for Error {
fn from(error: rocksdb::Error) -> Error {
Error::RocksDbError(error)
}
}
@ -59,7 +59,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"),
SledError(e) => write!(f, "Sled error; {}", e),
RocksDbError(e) => write!(f, "RocksDB error; {}", e),
FstError(e) => write!(f, "fst error; {}", e),
RmpDecodeError(e) => write!(f, "rmp decode error; {}", e),
RmpEncodeError(e) => write!(f, "rmp encode error; {}", e),

View File

@ -1,11 +1,10 @@
use std::sync::Arc;
use std::ops::Deref;
#[derive(Clone)]
pub struct CustomSettingsIndex(pub(crate) Arc<sled::Tree>);
pub struct CustomSettingsIndex(pub(crate) crate::CfTree);
impl Deref for CustomSettingsIndex {
type Target = sled::Tree;
type Target = crate::CfTree;
fn deref(&self) -> &Self::Target {
&self.0

View File

@ -3,7 +3,7 @@ use meilidb_core::DocumentId;
use crate::database::Error;
#[derive(Clone)]
pub struct DocsWordsIndex(pub Arc<sled::Tree>);
pub struct DocsWordsIndex(pub crate::CfTree);
impl DocsWordsIndex {
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {

View File

@ -1,64 +1,62 @@
use std::sync::Arc;
use std::convert::TryInto;
use std::ops::Bound;
use meilidb_core::DocumentId;
use meilidb_schema::SchemaAttr;
use rocksdb::DBVector;
use crate::document_attr_key::DocumentAttrKey;
use crate::RocksDbResult;
fn document_fields_range(id: DocumentId) -> (Bound<[u8; 10]>, Bound<[u8; 10]>) {
fn document_fields_range(id: DocumentId) -> ([u8; 10], [u8; 10]) {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
(Bound::Included(start), Bound::Included(end))
(start, end)
}
#[derive(Clone)]
pub struct DocumentsIndex(pub(crate) Arc<sled::Tree>);
pub struct DocumentsIndex(pub(crate) crate::CfTree);
impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<sled::IVec>> {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> RocksDbResult<Option<DBVector>> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key)
}
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> {
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> RocksDbResult<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.insert(key, value)?;
Ok(())
}
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> {
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> RocksDbResult<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.remove(key)?;
Ok(())
}
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> {
let range = document_fields_range(id);
pub fn del_all_document_fields(&self, id: DocumentId) -> RocksDbResult<()> {
let (start, end) = document_fields_range(id);
for result in self.0.range(range) {
let (key, _) = result?;
for (key, _) in self.0.range(start, end)? {
self.0.remove(key)?;
}
Ok(())
}
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
let range = document_fields_range(id);
pub fn document_fields(&self, id: DocumentId) -> RocksDbResult<DocumentFieldsIter> {
let (start, end) = document_fields_range(id);
let iter = self.0.range(range);
DocumentFieldsIter(iter)
let iter = self.0.range(start, end)?;
Ok(DocumentFieldsIter(iter))
}
pub fn len(&self) -> sled::Result<usize> {
pub fn len(&self) -> RocksDbResult<usize> {
let mut last_document_id = None;
let mut count = 0;
for result in self.0.iter() {
let (key, _) = result?;
for (key, _) in self.0.iter()? {
let array = key.as_ref().try_into().unwrap();
let document_id = DocumentAttrKey::from_be_bytes(array).document_id;
@ -72,19 +70,18 @@ impl DocumentsIndex {
}
}
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>);
pub struct DocumentFieldsIter<'a>(crate::CfIter<'a>);
impl Iterator for DocumentFieldsIter<'_> {
type Item = sled::Result<(SchemaAttr, sled::IVec)>;
type Item = (SchemaAttr, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
Some(Ok((key, value))) => {
Some((key, value)) => {
let array = key.as_ref().try_into().unwrap();
let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value)))
Some((key.attribute, value))
},
Some(Err(e)) => return Some(Err(e)),
None => None,
}
}

View File

@ -11,7 +11,7 @@ const SYNONYMS_KEY: &str = "synonyms";
const RANKED_MAP_KEY: &str = "ranked-map";
#[derive(Clone)]
pub struct MainIndex(pub(crate) Arc<sled::Tree>);
pub struct MainIndex(pub(crate) crate::CfTree);
impl MainIndex {
pub fn schema(&self) -> Result<Option<Schema>, Error> {

View File

@ -1,17 +1,19 @@
use std::collections::{HashSet, BTreeMap};
use std::convert::TryInto;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use arc_swap::{ArcSwap, ArcSwapOption, Guard};
use crossbeam_channel::Receiver;
use meilidb_core::criterion::Criteria;
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
use meilidb_schema::Schema;
use sdset::SetBuf;
use serde::{de, Serialize, Deserialize};
use sled::Transactional;
use crate::CfTree;
use crate::ranked_map::RankedMap;
use crate::serde::{Deserializer, DeserializerError};
@ -22,6 +24,7 @@ use self::main_index::MainIndex;
use self::synonyms_index::SynonymsIndex;
use self::words_index::WordsIndex;
use crate::RocksDbResult;
use crate::database::{
Error,
DocumentsAddition, DocumentsDeletion,
@ -37,13 +40,6 @@ mod main_index;
mod synonyms_index;
mod words_index;
fn event_is_set(event: &sled::Event) -> bool {
match event {
sled::Event::Set(_, _) => true,
_ => false,
}
}
#[derive(Deserialize)]
enum UpdateOwned {
DocumentsAddition(Vec<rmpv::Value>),
@ -81,74 +77,90 @@ pub struct UpdateStatus {
pub detailed_duration: DetailedDuration,
}
fn spawn_update_system(index: Index) -> thread::JoinHandle<()> {
fn spawn_update_system(index: Index, subscription: Receiver<()>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut subscription = subscription.into_iter();
loop {
let subscription = index.updates_index.watch_prefix(vec![]);
while let Some(result) = index.updates_index.iter().next() {
let (key, _) = result.unwrap();
while let Some((key, _)) = index.updates_index.iter().unwrap().next() {
let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap();
let updates = &index.updates_index;
let results = &index.updates_results_index;
(updates, results).transaction(|(updates, results)| {
let update = updates.remove(&key)?.unwrap();
let update = updates.get(&key).unwrap().unwrap();
let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() {
UpdateOwned::DocumentsAddition(documents) => {
let update_type = UpdateType::DocumentsAddition { number: documents.len() };
let ranked_map = index.cache.load().ranked_map.clone();
let start = Instant::now();
let result = apply_documents_addition(&index, ranked_map, documents);
(update_type, result, start.elapsed())
},
UpdateOwned::DocumentsDeletion(documents) => {
let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
let ranked_map = index.cache.load().ranked_map.clone();
let start = Instant::now();
let result = apply_documents_deletion(&index, ranked_map, documents);
(update_type, result, start.elapsed())
},
UpdateOwned::SynonymsAddition(synonyms) => {
let update_type = UpdateType::SynonymsAddition { number: synonyms.len() };
let start = Instant::now();
let result = apply_synonyms_addition(&index, synonyms);
(update_type, result, start.elapsed())
},
UpdateOwned::SynonymsDeletion(synonyms) => {
let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() };
let start = Instant::now();
let result = apply_synonyms_deletion(&index, synonyms);
(update_type, result, start.elapsed())
},
};
let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() {
UpdateOwned::DocumentsAddition(documents) => {
let update_type = UpdateType::DocumentsAddition { number: documents.len() };
let ranked_map = index.cache.load().ranked_map.clone();
let start = Instant::now();
let result = apply_documents_addition(&index, ranked_map, documents);
(update_type, result, start.elapsed())
},
UpdateOwned::DocumentsDeletion(documents) => {
let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
let ranked_map = index.cache.load().ranked_map.clone();
let start = Instant::now();
let result = apply_documents_deletion(&index, ranked_map, documents);
(update_type, result, start.elapsed())
},
UpdateOwned::SynonymsAddition(synonyms) => {
let update_type = UpdateType::SynonymsAddition { number: synonyms.len() };
let start = Instant::now();
let result = apply_synonyms_addition(&index, synonyms);
(update_type, result, start.elapsed())
},
UpdateOwned::SynonymsDeletion(synonyms) => {
let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() };
let start = Instant::now();
let result = apply_synonyms_deletion(&index, synonyms);
(update_type, result, start.elapsed())
},
};
let detailed_duration = DetailedDuration { main: duration };
let status = UpdateStatus {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),
detailed_duration,
};
let detailed_duration = DetailedDuration { main: duration };
let status = UpdateStatus {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),
detailed_duration,
};
if let Some(callback) = &*index.update_callback.load() {
(callback)(status.clone());
}
if let Some(callback) = &*index.update_callback.load() {
(callback)(status.clone());
}
let value = bincode::serialize(&status).unwrap();
results.insert(&key, value)
})
.unwrap();
let value = bincode::serialize(&status).unwrap();
results.insert(&key, value).unwrap();
updates.remove(&key).unwrap();
}
// this subscription is just used to block
// the loop until a new update is inserted
subscription.filter(event_is_set).next();
subscription.next();
}
})
}
fn last_update_id(
update_index: &crate::CfTree,
update_results_index: &crate::CfTree,
) -> RocksDbResult<u64>
{
let uikey = match update_index.last_key()? {
Some(key) => Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()),
None => None,
};
let urikey = match update_results_index.last_key()? {
Some(key) => Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()),
None => None,
};
Ok(uikey.max(urikey).unwrap_or(0))
}
#[derive(Copy, Clone)]
pub struct IndexStats {
pub number_of_words: usize,
@ -169,9 +181,9 @@ pub struct Index {
custom_settings_index: CustomSettingsIndex,
// used by the update system
db: sled::Db,
updates_index: Arc<sled::Tree>,
updates_results_index: Arc<sled::Tree>,
updates_id: Arc<AtomicU64>,
updates_index: crate::CfTree,
updates_results_index: crate::CfTree,
update_callback: Arc<ArcSwapOption<Box<dyn Fn(UpdateStatus) + Send + Sync + 'static>>>,
}
@ -183,23 +195,23 @@ pub(crate) struct Cache {
}
impl Index {
pub fn new(db: sled::Db, name: &str) -> Result<Index, Error> {
pub fn new(db: Arc<rocksdb::DB>, name: &str) -> Result<Index, Error> {
Index::new_raw(db, name, None)
}
pub fn with_schema(db: sled::Db, name: &str, schema: Schema) -> Result<Index, Error> {
pub fn with_schema(db: Arc<rocksdb::DB>, name: &str, schema: Schema) -> Result<Index, Error> {
Index::new_raw(db, name, Some(schema))
}
fn new_raw(db: sled::Db, name: &str, schema: Option<Schema>) -> Result<Index, Error> {
let main_index = db.open_tree(name).map(MainIndex)?;
let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?;
let updates_index = db.open_tree(format!("{}-updates", name))?;
let updates_results_index = db.open_tree(format!("{}-updates-results", name))?;
fn new_raw(db: Arc<rocksdb::DB>, name: &str, schema: Option<Schema>) -> Result<Index, Error> {
let main_index = CfTree::create(db.clone(), name.to_string()).map(MainIndex)?;
let synonyms_index = CfTree::create(db.clone(), format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = CfTree::create(db.clone(), format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = CfTree::create(db.clone(), format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = CfTree::create(db.clone(), format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = CfTree::create(db.clone(), format!("{}-custom", name)).map(CustomSettingsIndex)?;
let (updates_index, subscription) = CfTree::create_with_subcription(db.clone(), format!("{}-updates", name))?;
let updates_results_index = CfTree::create(db.clone(), format!("{}-updates-results", name))?;
let words = match main_index.words_set()? {
Some(words) => Arc::new(words),
@ -232,6 +244,9 @@ impl Index {
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = Arc::new(ArcSwap::from_pointee(cache));
let last_update_id = last_update_id(&updates_index, &updates_results_index)?;
let updates_id = Arc::new(AtomicU64::new(last_update_id + 1));
let index = Index {
cache,
main_index,
@ -240,13 +255,13 @@ impl Index {
docs_words_index,
documents_index,
custom_settings_index,
db,
updates_id,
updates_index,
updates_results_index,
update_callback: Arc::new(ArcSwapOption::empty()),
};
let _handle = spawn_update_system(index.clone());
let _handle = spawn_update_system(index.clone(), subscription);
Ok(index)
}
@ -261,7 +276,7 @@ impl Index {
self.update_callback.store(None);
}
pub fn stats(&self) -> sled::Result<IndexStats> {
pub fn stats(&self) -> RocksDbResult<IndexStats> {
let cache = self.cache.load();
Ok(IndexStats {
number_of_words: cache.words.len(),
@ -340,17 +355,15 @@ impl Index {
update_id: u64,
) -> Result<UpdateStatus, Error>
{
let update_id_bytes = update_id.to_be_bytes().to_vec();
let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes);
// if we find the update result return it now
if let Some(result) = self.update_status(update_id)? {
return Ok(result)
}
// this subscription is used to block the thread
// until the update_id is inserted in the tree
subscription.next();
loop {
if self.updates_results_index.get(&update_id.to_be_bytes())?.is_some() { break }
std::thread::sleep(Duration::from_millis(300));
}
// the thread has been unblocked, it means that the update result
// has been inserted in the tree, retrieve it
@ -429,11 +442,9 @@ impl Index {
}
fn raw_push_update(&self, raw_update: Vec<u8>) -> Result<u64, Error> {
let update_id = self.db.generate_id()?;
let update_id = self.updates_id.fetch_add(1, Ordering::SeqCst);
let update_id_array = update_id.to_be_bytes();
self.updates_index.insert(update_id_array, raw_update)?;
Ok(update_id)
}
}

View File

@ -1,21 +1,21 @@
use std::sync::Arc;
use crate::RocksDbResult;
#[derive(Clone)]
pub struct SynonymsIndex(pub(crate) Arc<sled::Tree>);
pub struct SynonymsIndex(pub(crate) crate::CfTree);
impl SynonymsIndex {
pub fn alternatives_to(&self, word: &[u8]) -> sled::Result<Option<fst::Set>> {
pub fn alternatives_to(&self, word: &[u8]) -> RocksDbResult<Option<fst::Set>> {
match self.0.get(word)? {
Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())),
None => Ok(None),
}
}
pub fn set_alternatives_to(&self, word: &[u8], value: Vec<u8>) -> sled::Result<()> {
pub fn set_alternatives_to(&self, word: &[u8], value: Vec<u8>) -> RocksDbResult<()> {
self.0.insert(word, value).map(drop)
}
pub fn del_alternatives_of(&self, word: &[u8]) -> sled::Result<()> {
pub fn del_alternatives_of(&self, word: &[u8]) -> RocksDbResult<()> {
self.0.remove(word).map(drop)
}
}

View File

@ -1,14 +1,13 @@
use std::sync::Arc;
use meilidb_core::DocIndex;
use sdset::{Set, SetBuf};
use zerocopy::{LayoutVerified, AsBytes};
use crate::RocksDbResult;
#[derive(Clone)]
pub struct WordsIndex(pub(crate) Arc<sled::Tree>);
pub struct WordsIndex(pub(crate) crate::CfTree);
impl WordsIndex {
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> {
pub fn doc_indexes(&self, word: &[u8]) -> RocksDbResult<Option<SetBuf<DocIndex>>> {
// we must force an allocation to make the memory aligned
match self.0.get(word)? {
Some(bytes) => {
@ -36,11 +35,11 @@ impl WordsIndex {
}
}
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> {
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> RocksDbResult<()> {
self.0.insert(word, set.as_bytes()).map(drop)
}
pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> {
pub fn del_doc_indexes(&self, word: &[u8]) -> RocksDbResult<()> {
self.0.remove(word).map(drop)
}
}

View File

@ -1,6 +1,7 @@
use std::collections::hash_map::Entry;
use std::collections::{HashSet, HashMap};
use std::path::Path;
use std::sync::Arc;
use std::sync::RwLock;
use meilidb_schema::Schema;
@ -23,7 +24,7 @@ use self::update::apply_synonyms_deletion;
const INDEXES_KEY: &str = "indexes";
fn load_indexes(tree: &sled::Tree) -> Result<HashSet<String>, Error> {
fn load_indexes(tree: &rocksdb::DB) -> Result<HashSet<String>, Error> {
match tree.get(INDEXES_KEY)? {
Some(bytes) => Ok(bincode::deserialize(&bytes)?),
None => Ok(HashSet::new())
@ -32,13 +33,18 @@ fn load_indexes(tree: &sled::Tree) -> Result<HashSet<String>, Error> {
pub struct Database {
cache: RwLock<HashMap<String, Index>>,
inner: sled::Db,
inner: Arc<rocksdb::DB>,
}
impl Database {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
let cache = RwLock::new(HashMap::new());
let inner = sled::Db::open(path)?;
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
let cfs = rocksdb::DB::list_cf(&options, &path).unwrap_or_default();
let inner = Arc::new(rocksdb::DB::open_cf(&options, path, cfs)?);
let indexes = load_indexes(&inner)?;
let database = Database { cache, inner };
@ -56,7 +62,7 @@ impl Database {
fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
let bytes = bincode::serialize(value)?;
self.inner.insert(INDEXES_KEY, bytes)?;
self.inner.put(INDEXES_KEY, bytes)?;
Ok(())
}

View File

@ -13,7 +13,7 @@ use crate::database::Index;
#[derive(Debug)]
pub enum DeserializerError {
RmpError(RmpError),
SledError(sled::Error),
RocksDbError(rocksdb::Error),
Custom(String),
}
@ -27,7 +27,7 @@ impl fmt::Display for DeserializerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
DeserializerError::SledError(e) => write!(f, "Sled related error: {}", e),
DeserializerError::RocksDbError(e) => write!(f, "RocksDB related error: {}", e),
DeserializerError::Custom(s) => f.write_str(s),
}
}
@ -41,9 +41,9 @@ impl From<RmpError> for DeserializerError {
}
}
impl From<sled::Error> for DeserializerError {
fn from(error: sled::Error) -> DeserializerError {
DeserializerError::SledError(error)
impl From<rocksdb::Error> for DeserializerError {
fn from(error: rocksdb::Error) -> DeserializerError {
DeserializerError::RocksDbError(error)
}
}
@ -75,37 +75,21 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
let schema = self.index.schema();
let documents = self.index.as_ref().documents_index;
let mut error = None;
let iter = documents
.document_fields(self.document_id)
.filter_map(|result| {
match result {
Ok((attr, value)) => {
let is_displayed = schema.props(attr).is_displayed();
if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) {
let attribute_name = schema.attribute_name(attr);
Some((attribute_name, Value::new(value)))
} else {
None
}
},
Err(e) => {
if error.is_none() {
error = Some(e);
}
None
}
.document_fields(self.document_id)?
.filter_map(|(attr, value)| {
let is_displayed = schema.props(attr).is_displayed();
if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) {
let attribute_name = schema.attribute_name(attr);
Some((attribute_name, Value::new(value)))
} else {
None
}
});
let map_deserializer = de::value::MapDeserializer::new(iter);
let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from);
if let Some(e) = error {
return Err(DeserializerError::from(e))
}
result
}
}

View File

@ -38,7 +38,7 @@ pub enum SerializerError {
DocumentIdNotFound,
InvalidDocumentIdType,
RmpError(RmpError),
SledError(sled::Error),
RocksDbError(rocksdb::Error),
SerdeJsonError(SerdeJsonError),
ParseNumberError(ParseNumberError),
UnserializableType { type_name: &'static str },
@ -63,7 +63,7 @@ impl fmt::Display for SerializerError {
write!(f, "document identifier can only be of type string or number")
},
SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
SerializerError::SledError(e) => write!(f, "Sled related error: {}", e),
SerializerError::RocksDbError(e) => write!(f, "RocksDB related error: {}", e),
SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e),
SerializerError::ParseNumberError(e) => {
write!(f, "error while trying to parse a number: {}", e)
@ -102,9 +102,9 @@ impl From<SerdeJsonError> for SerializerError {
}
}
impl From<sled::Error> for SerializerError {
fn from(error: sled::Error) -> SerializerError {
SerializerError::SledError(error)
impl From<rocksdb::Error> for SerializerError {
fn from(error: rocksdb::Error) -> SerializerError {
SerializerError::RocksDbError(error)
}
}