Merge pull request #158 from meilisearch/moving-back-to-rocksdb

Moving back to RocksDB
This commit is contained in:
Clément Renault 2019-05-29 14:56:55 +02:00 committed by GitHub
commit ab2ca15c5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 250 additions and 110 deletions

View File

@ -16,7 +16,7 @@ ordered-float = { version = "1.0.2", features = ["serde"] }
sdset = "0.3.2" sdset = "0.3.2"
serde = { version = "1.0.91", features = ["derive"] } serde = { version = "1.0.91", features = ["derive"] }
serde_json = { version = "1.0.39", features = ["preserve_order"] } serde_json = { version = "1.0.39", features = ["preserve_order"] }
sled = "0.23.0" rocksdb = { version = "0.12.2", default-features = false }
toml = { version = "0.5.0", features = ["preserve_order"] } toml = { version = "0.5.0", features = ["preserve_order"] }
zerocopy = "0.2.2" zerocopy = "0.2.2"
@ -28,9 +28,5 @@ rev = "40b3d48"
git = "https://github.com/Kerollmops/fst.git" git = "https://github.com/Kerollmops/fst.git"
branch = "arc-byte-slice" branch = "arc-byte-slice"
[features]
default = []
compression = ["sled/compression"]
[dev-dependencies] [dev-dependencies]
tempfile = "3.0.7" tempfile = "3.0.7"

View File

@ -1,13 +1,13 @@
use std::sync::Arc;
use std::ops::Deref; use std::ops::Deref;
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)] #[derive(Clone)]
pub struct CustomSettings(pub Arc<sled::Tree>); pub struct CustomSettings(pub(crate) InnerRawIndex);
impl Deref for CustomSettings { impl Deref for CustomSettings {
type Target = sled::Tree; type Target = InnerRawIndex;
fn deref(&self) -> &sled::Tree { fn deref(&self) -> &Self::Target {
&self.0 &self.0
} }
} }

View File

@ -1,17 +1,20 @@
use std::sync::Arc; use std::sync::Arc;
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use crate::database::raw_index::InnerRawIndex;
use super::Error; use super::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct DocsWordsIndex(pub Arc<sled::Tree>); pub struct DocsWordsIndex(pub(crate) InnerRawIndex);
impl DocsWordsIndex { impl DocsWordsIndex {
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> { pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
match self.0.get(key)? { match self.0.get_pinned(key)? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = bytes.into(); let value = Arc::from(bytes.as_ref());
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
Ok(Some(fst::Set::from(fst))) Ok(Some(fst::Set::from(fst)))
}, },
@ -27,7 +30,7 @@ impl DocsWordsIndex {
pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
self.0.del(key)?; self.0.delete(key)?;
Ok(()) Ok(())
} }
} }

View File

@ -122,6 +122,7 @@ impl<'a> DocumentsAddition<'a> {
let ranked_map = self.ranked_map; let ranked_map = self.ranked_map;
let schema = lease_inner.schema.clone(); let schema = lease_inner.schema.clone();
let raw = lease_inner.raw.clone(); let raw = lease_inner.raw.clone();
lease_inner.raw.compact();
let inner = InnerIndex { words, schema, ranked_map, raw }; let inner = InnerIndex { words, schema, ranked_map, raw };
self.inner.0.store(Arc::new(inner)); self.inner.0.store(Arc::new(inner));

View File

@ -121,6 +121,7 @@ impl<'a> DocumentsDeletion<'a> {
let ranked_map = lease_inner.ranked_map.clone(); let ranked_map = lease_inner.ranked_map.clone();
let schema = lease_inner.schema.clone(); let schema = lease_inner.schema.clone();
let raw = lease_inner.raw.clone(); let raw = lease_inner.raw.clone();
lease_inner.raw.compact();
let inner = InnerIndex { words, schema, ranked_map, raw }; let inner = InnerIndex { words, schema, ranked_map, raw };
self.inner.0.store(Arc::new(inner)); self.inner.0.store(Arc::new(inner));

View File

@ -1,70 +1,89 @@
use std::sync::Arc;
use std::convert::TryInto; use std::convert::TryInto;
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use sled::IVec; use rocksdb::DBVector;
use crate::database::raw_index::InnerRawIndex;
use crate::document_attr_key::DocumentAttrKey; use crate::document_attr_key::DocumentAttrKey;
use crate::schema::SchemaAttr; use crate::schema::SchemaAttr;
#[derive(Clone)] #[derive(Clone)]
pub struct DocumentsIndex(pub Arc<sled::Tree>); pub struct DocumentsIndex(pub(crate) InnerRawIndex);
impl DocumentsIndex { impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<IVec>> { pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<Option<DBVector>, rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key) self.0.get(key)
} }
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> { pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.set(key, value)?; self.0.set(key, value)?;
Ok(()) Ok(())
} }
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> { pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.del(key)?; self.0.delete(key)?;
Ok(()) Ok(())
} }
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> { pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
let document_attrs = self.0.range(start..=end).keys(); self.0.delete_range(start, end)?;
for key in document_attrs {
self.0.del(key?)?;
}
Ok(()) Ok(())
} }
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter { pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
let start = DocumentAttrKey::new(id, SchemaAttr::min()); let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let start = start.to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()); let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward);
let end = end.to_be_bytes(); let iter = self.0.iterator(from).unwrap();
DocumentFieldsIter(self.0.range(start..=end)) DocumentFieldsIter(iter, end.to_vec())
}
pub fn len(&self) -> Result<usize, rocksdb::Error> {
let mut last_document_id = None;
let mut count = 0;
let from = rocksdb::IteratorMode::Start;
let iterator = self.0.iterator(from)?;
for (key, value) in iterator {
let slice = key.as_ref().try_into().unwrap();
let document_id = DocumentAttrKey::from_be_bytes(slice).document_id;
if Some(document_id) != last_document_id {
last_document_id = Some(document_id);
count += 1;
}
}
Ok(count)
} }
} }
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>); pub struct DocumentFieldsIter<'a>(rocksdb::DBIterator<'a>, Vec<u8>);
impl<'a> Iterator for DocumentFieldsIter<'a> { impl<'a> Iterator for DocumentFieldsIter<'a> {
type Item = sled::Result<(SchemaAttr, IVec)>; type Item = Result<(SchemaAttr, Box<[u8]>), rocksdb::Error>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
match self.0.next() { match self.0.next() {
Some(Ok((key, value))) => { Some((key, value)) => {
if key.as_ref() > self.1.as_ref() {
return None;
}
let slice: &[u8] = key.as_ref(); let slice: &[u8] = key.as_ref();
let array = slice.try_into().unwrap(); let array = slice.try_into().unwrap();
let key = DocumentAttrKey::from_be_bytes(array); let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value))) Some(Ok((key.attribute, value)))
}, },
Some(Err(e)) => Some(Err(e)),
None => None, None => None,
} }
} }

View File

@ -7,15 +7,15 @@ pub enum Error {
SchemaMissing, SchemaMissing,
WordIndexMissing, WordIndexMissing,
MissingDocumentId, MissingDocumentId,
SledError(sled::Error), RocksdbError(rocksdb::Error),
FstError(fst::Error), FstError(fst::Error),
BincodeError(bincode::Error), BincodeError(bincode::Error),
SerializerError(SerializerError), SerializerError(SerializerError),
} }
impl From<sled::Error> for Error { impl From<rocksdb::Error> for Error {
fn from(error: sled::Error) -> Error { fn from(error: rocksdb::Error) -> Error {
Error::SledError(error) Error::RocksdbError(error)
} }
} }
@ -45,7 +45,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"), SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"), WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"), MissingDocumentId => write!(f, "document id is missing"),
SledError(e) => write!(f, "sled error; {}", e), RocksdbError(e) => write!(f, "RocksDB error; {}", e),
FstError(e) => write!(f, "fst error; {}", e), FstError(e) => write!(f, "fst error; {}", e),
BincodeError(e) => write!(f, "bincode error; {}", e), BincodeError(e) => write!(f, "bincode error; {}", e),
SerializerError(e) => write!(f, "serializer error; {}", e), SerializerError(e) => write!(f, "serializer error; {}", e),

View File

@ -15,6 +15,13 @@ use crate::serde::Deserializer;
use super::{Error, CustomSettings}; use super::{Error, CustomSettings};
use super::{RawIndex, DocumentsAddition, DocumentsDeletion}; use super::{RawIndex, DocumentsAddition, DocumentsDeletion};
#[derive(Copy, Clone)]
pub struct IndexStats {
pub number_of_words: usize,
pub number_of_documents: usize,
pub number_attrs_in_ranked_map: usize,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Index(pub ArcSwap<InnerIndex>); pub struct Index(pub ArcSwap<InnerIndex>);
@ -48,6 +55,16 @@ impl Index {
Ok(index) Ok(index)
} }
pub fn stats(&self) -> Result<IndexStats, rocksdb::Error> {
let lease = self.0.lease();
Ok(IndexStats {
number_of_words: lease.words.len(),
number_of_documents: lease.raw.documents.len()?,
number_attrs_in_ranked_map: lease.ranked_map.len(),
})
}
pub fn query_builder(&self) -> QueryBuilder<IndexLease> { pub fn query_builder(&self) -> QueryBuilder<IndexLease> {
let lease = IndexLease(self.0.lease()); let lease = IndexLease(self.0.lease());
QueryBuilder::new(lease) QueryBuilder::new(lease)

View File

@ -1,16 +1,17 @@
use std::sync::Arc; use std::sync::Arc;
use crate::database::raw_index::InnerRawIndex;
use crate::ranked_map::RankedMap; use crate::ranked_map::RankedMap;
use crate::schema::Schema; use crate::schema::Schema;
use super::Error; use super::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct MainIndex(pub Arc<sled::Tree>); pub struct MainIndex(pub(crate) InnerRawIndex);
impl MainIndex { impl MainIndex {
pub fn schema(&self) -> Result<Option<Schema>, Error> { pub fn schema(&self) -> Result<Option<Schema>, Error> {
match self.0.get("schema")? { match self.0.get_pinned("schema")? {
Some(bytes) => { Some(bytes) => {
let schema = Schema::read_from_bin(bytes.as_ref())?; let schema = Schema::read_from_bin(bytes.as_ref())?;
Ok(Some(schema)) Ok(Some(schema))
@ -27,10 +28,10 @@ impl MainIndex {
} }
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> { pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
match self.0.get("words")? { match self.0.get_pinned("words")? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = bytes.into(); let value = Arc::from(bytes.as_ref());
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?; let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
Ok(Some(fst::Set::from(fst))) Ok(Some(fst::Set::from(fst)))
}, },
@ -39,12 +40,11 @@ impl MainIndex {
} }
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
self.0.set("words", value.as_fst().as_bytes())?; self.0.set("words", value.as_fst().as_bytes()).map_err(Into::into)
Ok(())
} }
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> { pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
match self.0.get("ranked-map")? { match self.0.get_pinned("ranked-map")? {
Some(bytes) => { Some(bytes) => {
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
Ok(Some(ranked_map)) Ok(Some(ranked_map))

View File

@ -26,32 +26,39 @@ use self::documents_deletion::DocumentsDeletion;
use self::documents_index::DocumentsIndex; use self::documents_index::DocumentsIndex;
use self::index::InnerIndex; use self::index::InnerIndex;
use self::main_index::MainIndex; use self::main_index::MainIndex;
use self::raw_index::RawIndex; use self::raw_index::{RawIndex, InnerRawIndex};
use self::words_index::WordsIndex; use self::words_index::WordsIndex;
pub struct Database { pub struct Database {
cache: RwLock<HashMap<String, Arc<Index>>>, cache: RwLock<HashMap<String, Arc<Index>>>,
inner: sled::Db, inner: Arc<rocksdb::DB>,
} }
impl Database { impl Database {
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> { pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
let path = path.as_ref();
let cache = RwLock::new(HashMap::new()); let cache = RwLock::new(HashMap::new());
let config = sled::ConfigBuilder::new().path(path).print_profile_on_drop(true).build();
let inner = sled::Db::start(config)?; let options = {
Ok(Database { cache, inner }) let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options
};
let cfs = rocksdb::DB::list_cf(&options, path).unwrap_or(Vec::new());
let inner = Arc::new(rocksdb::DB::open_cf(&options, path, &cfs)?);
let database = Database { cache, inner };
let mut indexes: Vec<_> = cfs.iter()
.filter_map(|c| c.split('-').nth(0).filter(|&c| c != "default"))
.collect();
indexes.sort_unstable();
indexes.dedup();
for index in indexes {
database.open_index(index)?;
} }
pub fn start_with_compression<P: AsRef<Path>>(path: P, factor: i32) -> Result<Database, Error> { Ok(database)
let config = sled::ConfigBuilder::default()
.use_compression(true)
.compression_factor(factor)
.path(path)
.build();
let cache = RwLock::new(HashMap::new());
let inner = sled::Db::start(config)?;
Ok(Database { cache, inner })
} }
pub fn indexes(&self) -> Result<Option<HashSet<String>>, Error> { pub fn indexes(&self) -> Result<Option<HashSet<String>>, Error> {
@ -66,7 +73,7 @@ impl Database {
fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> { fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
let bytes = bincode::serialize(value)?; let bytes = bincode::serialize(value)?;
self.inner.set("indexes", bytes)?; self.inner.put("indexes", bytes)?;
Ok(()) Ok(())
} }
@ -89,32 +96,32 @@ impl Database {
} }
let main = { let main = {
let tree = self.inner.open_tree(name)?; self.inner.cf_handle(name).expect("cf not found");
MainIndex(tree) MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name)))
}; };
let words = { let words = {
let tree_name = format!("{}-words", name); let cf_name = format!("{}-words", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.cf_handle(&cf_name).expect("cf not found");
WordsIndex(tree) WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let docs_words = { let docs_words = {
let tree_name = format!("{}-docs-words", name); let cf_name = format!("{}-docs-words", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.cf_handle(&cf_name).expect("cf not found");
DocsWordsIndex(tree) DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let documents = { let documents = {
let tree_name = format!("{}-documents", name); let cf_name = format!("{}-documents", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.cf_handle(&cf_name).expect("cf not found");
DocumentsIndex(tree) DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let custom = { let custom = {
let tree_name = format!("{}-custom", name); let cf_name = format!("{}-custom", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.cf_handle(&cf_name).expect("cf not found");
CustomSettings(tree) CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let raw_index = RawIndex { main, words, docs_words, documents, custom }; let raw_index = RawIndex { main, words, docs_words, documents, custom };
@ -136,8 +143,8 @@ impl Database {
}, },
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
let main = { let main = {
let tree = self.inner.open_tree(name)?; self.inner.create_cf(name, &rocksdb::Options::default())?;
MainIndex(tree) MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name)))
}; };
if let Some(prev_schema) = main.schema()? { if let Some(prev_schema) = main.schema()? {
@ -149,27 +156,27 @@ impl Database {
main.set_schema(&schema)?; main.set_schema(&schema)?;
let words = { let words = {
let tree_name = format!("{}-words", name); let cf_name = format!("{}-words", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
WordsIndex(tree) WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let docs_words = { let docs_words = {
let tree_name = format!("{}-docs-words", name); let cf_name = format!("{}-docs-words", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocsWordsIndex(tree) DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let documents = { let documents = {
let tree_name = format!("{}-documents", name); let cf_name = format!("{}-documents", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocumentsIndex(tree) DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let custom = { let custom = {
let tree_name = format!("{}-custom", name); let cf_name = format!("{}-custom", name);
let tree = self.inner.open_tree(tree_name)?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
CustomSettings(tree) CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new);

View File

@ -1,3 +1,4 @@
use std::sync::Arc;
use super::{MainIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings}; use super::{MainIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings};
#[derive(Clone)] #[derive(Clone)]
@ -8,3 +9,78 @@ pub struct RawIndex {
pub documents: DocumentsIndex, pub documents: DocumentsIndex,
pub custom: CustomSettings, pub custom: CustomSettings,
} }
impl RawIndex {
pub(crate) fn compact(&self) {
self.main.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.words.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.docs_words.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.documents.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.custom.0.compact_range(None::<&[u8]>, None::<&[u8]>);
}
}
#[derive(Clone)]
pub struct InnerRawIndex {
database: Arc<rocksdb::DB>,
name: Arc<str>,
}
impl InnerRawIndex {
pub fn new(database: Arc<rocksdb::DB>, name: Arc<str>) -> InnerRawIndex {
InnerRawIndex { database, name }
}
pub fn get<K>(&self, key: K) -> Result<Option<rocksdb::DBVector>, rocksdb::Error>
where K: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.get_cf(cf, key)
}
pub fn get_pinned<K>(&self, key: K) -> Result<Option<rocksdb::DBPinnableSlice>, rocksdb::Error>
where K: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.get_pinned_cf(cf, key)
}
pub fn iterator(&self, from: rocksdb::IteratorMode) -> Result<rocksdb::DBIterator, rocksdb::Error> {
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.iterator_cf(cf, from)
}
pub fn set<K, V>(&self, key: K, value: V) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.put_cf(cf, key, value)
}
pub fn delete<K>(&self, key: K) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.delete_cf(cf, key)
}
pub fn delete_range<K>(&self, start: K, end: K) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>,
{
let mut batch = rocksdb::WriteBatch::default();
let cf = self.database.cf_handle(&self.name).expect("cf not found");
batch.delete_range_cf(cf, start, end)?;
self.database.write(batch)
}
pub fn compact_range<S, E>(&self, start: Option<S>, end: Option<E>)
where S: AsRef<[u8]>,
E: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.compact_range_cf(cf, start, end)
}
}

View File

@ -1,32 +1,48 @@
use std::sync::Arc;
use meilidb_core::DocIndex; use meilidb_core::DocIndex;
use sdset::{Set, SetBuf}; use sdset::{Set, SetBuf};
use zerocopy::{LayoutVerified, AsBytes}; use zerocopy::{LayoutVerified, AsBytes};
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)] #[derive(Clone)]
pub struct WordsIndex(pub Arc<sled::Tree>); pub struct WordsIndex(pub(crate) InnerRawIndex);
impl WordsIndex { impl WordsIndex {
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> { pub fn doc_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, rocksdb::Error> {
// we must force an allocation to make the memory aligned
match self.0.get(word)? { match self.0.get(word)? {
Some(bytes) => { Some(bytes) => {
let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout"); let vec = match LayoutVerified::new_slice(bytes.as_ref()) {
let slice = layout.into_slice(); Some(layout) => layout.into_slice().to_vec(),
let setbuf = SetBuf::new_unchecked(slice.to_vec()); None => {
let len = bytes.as_ref().len();
let count = len / std::mem::size_of::<DocIndex>();
let mut buf: Vec<DocIndex> = Vec::with_capacity(count);
unsafe {
let src = bytes.as_ref().as_ptr();
let dst = buf.as_mut_ptr() as *mut u8;
std::ptr::copy_nonoverlapping(src, dst, len);
buf.set_len(count);
}
buf
}
};
let setbuf = SetBuf::new_unchecked(vec);
Ok(Some(setbuf)) Ok(Some(setbuf))
}, },
None => Ok(None), None => Ok(None),
} }
} }
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> { pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> Result<(), rocksdb::Error> {
self.0.set(word, set.as_bytes())?; self.0.set(word, set.as_bytes())?;
Ok(()) Ok(())
} }
pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> { pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> {
self.0.del(word)?; self.0.delete(word)?;
Ok(()) Ok(())
} }
} }

View File

@ -6,7 +6,7 @@ mod ranked_map;
mod serde; mod serde;
pub mod schema; pub mod schema;
pub use sled; pub use rocksdb;
pub use self::database::{Database, Index, CustomSettings}; pub use self::database::{Database, Index, CustomSettings};
pub use self::number::Number; pub use self::number::Number;
pub use self::ranked_map::RankedMap; pub use self::ranked_map::RankedMap;

View File

@ -9,6 +9,10 @@ use crate::{SchemaAttr, Number};
pub struct RankedMap(HashMap<(DocumentId, SchemaAttr), Number>); pub struct RankedMap(HashMap<(DocumentId, SchemaAttr), Number>);
impl RankedMap { impl RankedMap {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn insert(&mut self, document: DocumentId, attribute: SchemaAttr, number: Number) { pub fn insert(&mut self, document: DocumentId, attribute: SchemaAttr, number: Number) {
self.0.insert((document, attribute), number); self.0.insert((document, attribute), number);
} }

View File

@ -36,7 +36,7 @@ use crate::schema::SchemaAttr;
pub enum SerializerError { pub enum SerializerError {
DocumentIdNotFound, DocumentIdNotFound,
RmpError(RmpError), RmpError(RmpError),
SledError(sled::Error), RocksdbError(rocksdb::Error),
ParseNumberError(ParseNumberError), ParseNumberError(ParseNumberError),
UnserializableType { type_name: &'static str }, UnserializableType { type_name: &'static str },
UnindexableType { type_name: &'static str }, UnindexableType { type_name: &'static str },
@ -57,7 +57,7 @@ impl fmt::Display for SerializerError {
write!(f, "serialized document does not have an id according to the schema") write!(f, "serialized document does not have an id according to the schema")
} }
SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
SerializerError::SledError(e) => write!(f, "sled related error: {}", e), SerializerError::RocksdbError(e) => write!(f, "RocksDB related error: {}", e),
SerializerError::ParseNumberError(e) => { SerializerError::ParseNumberError(e) => {
write!(f, "error while trying to parse a number: {}", e) write!(f, "error while trying to parse a number: {}", e)
}, },
@ -89,9 +89,9 @@ impl From<RmpError> for SerializerError {
} }
} }
impl From<sled::Error> for SerializerError { impl From<rocksdb::Error> for SerializerError {
fn from(error: sled::Error) -> SerializerError { fn from(error: rocksdb::Error) -> SerializerError {
SerializerError::SledError(error) SerializerError::RocksdbError(error)
} }
} }

View File

@ -59,7 +59,7 @@ fn index(
let mut system = sysinfo::System::new(); let mut system = sysinfo::System::new();
let index = database.create_index("default", schema.clone())?; let index = database.create_index("test", schema.clone())?;
let mut rdr = csv::Reader::from_path(csv_data_path)?; let mut rdr = csv::Reader::from_path(csv_data_path)?;
let mut raw_record = csv::StringRecord::new(); let mut raw_record = csv::StringRecord::new();

View File

@ -143,7 +143,7 @@ fn main() -> Result<(), Box<Error>> {
let mut buffer = String::new(); let mut buffer = String::new();
let input = io::stdin(); let input = io::stdin();
let index = database.open_index("default")?.unwrap(); let index = database.open_index("test")?.unwrap();
let schema = index.schema(); let schema = index.schema();
println!("database prepared for you in {:.2?}", start.elapsed()); println!("database prepared for you in {:.2?}", start.elapsed());