feat: Wrap the database index access to improve usability

This commit is contained in:
Clément Renault 2019-05-24 14:26:05 +02:00
parent 6f258f71d5
commit 9fca74443e
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
7 changed files with 102 additions and 63 deletions

View File

@ -1,22 +1,20 @@
use std::sync::Arc;
use rocksdb::DBVector; use rocksdb::DBVector;
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)] #[derive(Clone)]
pub struct CustomSettings(pub Arc<rocksdb::DB>, pub String); pub struct CustomSettings(pub(crate) InnerRawIndex);
impl CustomSettings { impl CustomSettings {
pub fn set<K, V>(&self, key: K, value: V) -> Result<(), rocksdb::Error> pub fn set<K, V>(&self, key: K, value: V) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>, where K: AsRef<[u8]>,
V: AsRef<[u8]>, V: AsRef<[u8]>,
{ {
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set(key, value)
self.0.put_cf(cf, key, value)
} }
pub fn get<K, V>(&self, key: K) -> Result<Option<DBVector>, rocksdb::Error> pub fn get<K, V>(&self, key: K) -> Result<Option<DBVector>, rocksdb::Error>
where K: AsRef<[u8]>, where K: AsRef<[u8]>,
{ {
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.get(key)
self.0.get_cf(cf, key)
} }
} }

View File

@ -1,15 +1,17 @@
use std::sync::Arc; use std::sync::Arc;
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use crate::database::raw_index::InnerRawIndex;
use super::Error; use super::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct DocsWordsIndex(pub Arc<rocksdb::DB>, pub String); pub struct DocsWordsIndex(pub(crate) InnerRawIndex);
impl DocsWordsIndex { impl DocsWordsIndex {
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> { pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap(); match self.0.get_pinned(key)? {
match self.0.get_pinned_cf(cf, key)? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = Arc::from(bytes.as_ref()); let value = Arc::from(bytes.as_ref());
@ -22,15 +24,13 @@ impl DocsWordsIndex {
pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set(key, words.as_fst().as_bytes())?;
self.0.put_cf(cf, key, words.as_fst().as_bytes())?;
Ok(()) Ok(())
} }
pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.delete(key)?;
self.0.delete_cf(cf, key)?;
Ok(()) Ok(())
} }
} }

View File

@ -1,45 +1,37 @@
use std::sync::Arc;
use std::convert::TryInto; use std::convert::TryInto;
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use rocksdb::DBVector; use rocksdb::DBVector;
use crate::database::raw_index::InnerRawIndex;
use crate::document_attr_key::DocumentAttrKey; use crate::document_attr_key::DocumentAttrKey;
use crate::schema::SchemaAttr; use crate::schema::SchemaAttr;
#[derive(Clone)] #[derive(Clone)]
pub struct DocumentsIndex(pub Arc<rocksdb::DB>, pub String); pub struct DocumentsIndex(pub(crate) InnerRawIndex);
impl DocumentsIndex { impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<Option<DBVector>, rocksdb::Error> { pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<Option<DBVector>, rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.get(key)
self.0.get_cf(cf, key)
} }
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> Result<(), rocksdb::Error> { pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set(key, value)?;
self.0.put_cf(cf, key, value)?;
Ok(()) Ok(())
} }
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> { pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes(); let key = DocumentAttrKey::new(id, attr).to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.delete(key)?;
self.0.delete_cf(cf, key)?;
Ok(()) Ok(())
} }
pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> { pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
self.0.delete_range(start, end)?;
let cf = self.0.cf_handle(&self.1).unwrap();
let mut batch = rocksdb::WriteBatch::default();
batch.delete_range_cf(cf, start, end)?;
self.0.write(batch)?;
Ok(()) Ok(())
} }
@ -47,9 +39,8 @@ impl DocumentsIndex {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes(); let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes(); let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
let cf = self.0.cf_handle(&self.1).unwrap();
let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward); let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward);
let iter = self.0.iterator_cf(cf, from).unwrap(); let iter = self.0.iterator(from).unwrap();
DocumentFieldsIter(iter, end.to_vec()) DocumentFieldsIter(iter, end.to_vec())
} }

View File

@ -1,17 +1,17 @@
use std::sync::Arc; use std::sync::Arc;
use crate::database::raw_index::InnerRawIndex;
use crate::ranked_map::RankedMap; use crate::ranked_map::RankedMap;
use crate::schema::Schema; use crate::schema::Schema;
use super::Error; use super::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct MainIndex(pub Arc<rocksdb::DB>, pub String); pub struct MainIndex(pub(crate) InnerRawIndex);
impl MainIndex { impl MainIndex {
pub fn schema(&self) -> Result<Option<Schema>, Error> { pub fn schema(&self) -> Result<Option<Schema>, Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); match self.0.get_pinned("schema")? {
match self.0.get_cf(cf, "schema")? {
Some(bytes) => { Some(bytes) => {
let schema = Schema::read_from_bin(bytes.as_ref())?; let schema = Schema::read_from_bin(bytes.as_ref())?;
Ok(Some(schema)) Ok(Some(schema))
@ -23,14 +23,12 @@ impl MainIndex {
pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
schema.write_to_bin(&mut bytes)?; schema.write_to_bin(&mut bytes)?;
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set("schema", bytes)?;
self.0.put_cf(cf, "schema", bytes)?;
Ok(()) Ok(())
} }
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> { pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); match self.0.get_pinned("words")? {
match self.0.get_pinned_cf(cf, "words")? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = Arc::from(bytes.as_ref()); let value = Arc::from(bytes.as_ref());
@ -42,14 +40,11 @@ impl MainIndex {
} }
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set("words", value.as_fst().as_bytes()).map_err(Into::into)
self.0.put_cf(cf, "words", value.as_fst().as_bytes())?;
Ok(())
} }
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> { pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); match self.0.get_pinned("ranked-map")? {
match self.0.get_cf(cf, "ranked-map")? {
Some(bytes) => { Some(bytes) => {
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
Ok(Some(ranked_map)) Ok(Some(ranked_map))
@ -61,8 +56,7 @@ impl MainIndex {
pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
value.write_to_bin(&mut bytes)?; value.write_to_bin(&mut bytes)?;
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set("ranked_map", bytes)?;
self.0.put_cf(cf, "ranked_map", bytes)?;
Ok(()) Ok(())
} }
} }

View File

@ -26,7 +26,7 @@ use self::documents_deletion::DocumentsDeletion;
use self::documents_index::DocumentsIndex; use self::documents_index::DocumentsIndex;
use self::index::InnerIndex; use self::index::InnerIndex;
use self::main_index::MainIndex; use self::main_index::MainIndex;
use self::raw_index::RawIndex; use self::raw_index::{RawIndex, InnerRawIndex};
use self::words_index::WordsIndex; use self::words_index::WordsIndex;
pub struct Database { pub struct Database {
@ -88,31 +88,31 @@ impl Database {
let main = { let main = {
self.inner.cf_handle(name).expect("cf not found"); self.inner.cf_handle(name).expect("cf not found");
MainIndex(self.inner.clone(), name.to_owned()) MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name)))
}; };
let words = { let words = {
let cf_name = format!("{}-words", name); let cf_name = format!("{}-words", name);
self.inner.cf_handle(&cf_name).expect("cf not found"); self.inner.cf_handle(&cf_name).expect("cf not found");
WordsIndex(self.inner.clone(), cf_name) WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let docs_words = { let docs_words = {
let cf_name = format!("{}-docs-words", name); let cf_name = format!("{}-docs-words", name);
self.inner.cf_handle(&cf_name).expect("cf not found"); self.inner.cf_handle(&cf_name).expect("cf not found");
DocsWordsIndex(self.inner.clone(), cf_name) DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let documents = { let documents = {
let cf_name = format!("{}-documents", name); let cf_name = format!("{}-documents", name);
self.inner.cf_handle(&cf_name).expect("cf not found"); self.inner.cf_handle(&cf_name).expect("cf not found");
DocumentsIndex(self.inner.clone(), cf_name) DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let custom = { let custom = {
let cf_name = format!("{}-custom", name); let cf_name = format!("{}-custom", name);
self.inner.cf_handle(&cf_name).expect("cf not found"); self.inner.cf_handle(&cf_name).expect("cf not found");
CustomSettings(self.inner.clone(), cf_name) CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let raw_index = RawIndex { main, words, docs_words, documents, custom }; let raw_index = RawIndex { main, words, docs_words, documents, custom };
@ -135,7 +135,7 @@ impl Database {
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
let main = { let main = {
self.inner.create_cf(name, &rocksdb::Options::default())?; self.inner.create_cf(name, &rocksdb::Options::default())?;
MainIndex(self.inner.clone(), name.to_owned()) MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name)))
}; };
if let Some(prev_schema) = main.schema()? { if let Some(prev_schema) = main.schema()? {
@ -149,25 +149,25 @@ impl Database {
let words = { let words = {
let cf_name = format!("{}-words", name); let cf_name = format!("{}-words", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
WordsIndex(self.inner.clone(), cf_name) WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let docs_words = { let docs_words = {
let cf_name = format!("{}-docs-words", name); let cf_name = format!("{}-docs-words", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocsWordsIndex(self.inner.clone(), cf_name) DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let documents = { let documents = {
let cf_name = format!("{}-documents", name); let cf_name = format!("{}-documents", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocumentsIndex(self.inner.clone(), cf_name) DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let custom = { let custom = {
let cf_name = format!("{}-custom", name); let cf_name = format!("{}-custom", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?; self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
CustomSettings(self.inner.clone(), cf_name) CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
}; };
let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new); let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new);

View File

@ -1,3 +1,4 @@
use std::sync::Arc;
use super::{MainIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings}; use super::{MainIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings};
#[derive(Clone)] #[derive(Clone)]
@ -8,3 +9,60 @@ pub struct RawIndex {
pub documents: DocumentsIndex, pub documents: DocumentsIndex,
pub custom: CustomSettings, pub custom: CustomSettings,
} }
#[derive(Clone)]
pub struct InnerRawIndex {
database: Arc<rocksdb::DB>,
name: Arc<str>,
}
impl InnerRawIndex {
pub fn new(database: Arc<rocksdb::DB>, name: Arc<str>) -> InnerRawIndex {
InnerRawIndex { database, name }
}
pub fn get<K>(&self, key: K) -> Result<Option<rocksdb::DBVector>, rocksdb::Error>
where K: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.get_cf(cf, key)
}
pub fn get_pinned<K>(&self, key: K) -> Result<Option<rocksdb::DBPinnableSlice>, rocksdb::Error>
where K: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.get_pinned_cf(cf, key)
}
pub fn iterator(&self, from: rocksdb::IteratorMode) -> Result<rocksdb::DBIterator, rocksdb::Error> {
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.iterator_cf(cf, from)
}
pub fn set<K, V>(&self, key: K, value: V) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.put_cf(cf, key, value)
}
pub fn delete<K>(&self, key: K) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.delete_cf(cf, key)
}
pub fn delete_range<K>(&self, start: K, end: K) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>,
{
let mut batch = rocksdb::WriteBatch::default();
let cf = self.database.cf_handle(&self.name).expect("cf not found");
batch.delete_range_cf(cf, start, end)?;
self.database.write(batch)
}
}

View File

@ -1,16 +1,16 @@
use std::sync::Arc;
use meilidb_core::DocIndex; use meilidb_core::DocIndex;
use sdset::{Set, SetBuf}; use sdset::{Set, SetBuf};
use zerocopy::{LayoutVerified, AsBytes}; use zerocopy::{LayoutVerified, AsBytes};
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)] #[derive(Clone)]
pub struct WordsIndex(pub Arc<rocksdb::DB>, pub String); pub struct WordsIndex(pub(crate) InnerRawIndex);
impl WordsIndex { impl WordsIndex {
pub fn doc_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, rocksdb::Error> { pub fn doc_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, rocksdb::Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); // we must force an allocation to make the memory aligned
match self.0.get_cf(cf, word)? { match self.0.get(word)? {
Some(bytes) => { Some(bytes) => {
let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout"); let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout");
let slice = layout.into_slice(); let slice = layout.into_slice();
@ -22,14 +22,12 @@ impl WordsIndex {
} }
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> Result<(), rocksdb::Error> { pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> Result<(), rocksdb::Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.set(word, set.as_bytes())?;
self.0.put_cf(cf, word, set.as_bytes())?;
Ok(()) Ok(())
} }
pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> { pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> {
let cf = self.0.cf_handle(&self.1).unwrap(); self.0.delete(word)?;
self.0.delete_cf(cf, word)?;
Ok(()) Ok(())
} }
} }