feat: Introduce the UpdatesIndex type

This commit is contained in:
Clément Renault 2019-08-19 18:09:02 +02:00 committed by Clément Renault
parent 50e3c2c3de
commit 5a9e25c315
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
26 changed files with 696 additions and 639 deletions

View File

@ -5,10 +5,11 @@ authors = ["Kerollmops <renault.cle@gmail.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
arc-swap = "0.3.11" arc-swap = "0.4.2"
bincode = "1.1.4" bincode = "1.1.4"
deunicode = "1.0.0" deunicode = "1.0.0"
hashbrown = { version = "0.6.0", features = ["serde"] } hashbrown = { version = "0.6.0", features = ["serde"] }
log = "0.4.6"
meilidb-core = { path = "../meilidb-core", version = "0.1.0" } meilidb-core = { path = "../meilidb-core", version = "0.1.0" }
meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" }
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
@ -17,7 +18,7 @@ sdset = "0.3.2"
serde = { version = "1.0.99", features = ["derive"] } serde = { version = "1.0.99", features = ["derive"] }
serde_json = "1.0.40" serde_json = "1.0.40"
siphasher = "0.3.0" siphasher = "0.3.0"
rocksdb = { version = "0.12.3", default-features = false } sled = "0.25.0"
zerocopy = "0.2.8" zerocopy = "0.2.8"
[dependencies.rmp-serde] [dependencies.rmp-serde]

View File

@ -1,13 +0,0 @@
use std::ops::Deref;
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)]
pub struct CustomSettings(pub(crate) InnerRawIndex);
impl Deref for CustomSettings {
type Target = InnerRawIndex;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -9,7 +9,8 @@ use crate::indexer::Indexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::RankedMap; use crate::RankedMap;
use super::{Error, Index, InnerIndex, DocumentsDeletion}; use super::{Error, Index, DocumentsDeletion};
use super::index::Cache;
pub struct DocumentsAddition<'a> { pub struct DocumentsAddition<'a> {
inner: &'a Index, inner: &'a Index,
@ -33,7 +34,7 @@ impl<'a> DocumentsAddition<'a> {
pub fn update_document<D>(&mut self, document: D) -> Result<(), Error> pub fn update_document<D>(&mut self, document: D) -> Result<(), Error>
where D: serde::Serialize, where D: serde::Serialize,
{ {
let schema = &self.inner.lease_inner().schema; let schema = &self.inner.schema();
let identifier = schema.identifier_name(); let identifier = schema.identifier_name();
let document_id = match extract_document_id(identifier, &document)? { let document_id = match extract_document_id(identifier, &document)? {
@ -59,11 +60,11 @@ impl<'a> DocumentsAddition<'a> {
} }
pub fn finalize(self) -> Result<(), Error> { pub fn finalize(self) -> Result<(), Error> {
let lease_inner = self.inner.lease_inner(); let ref_index = self.inner.as_ref();
let docs_words = &lease_inner.raw.docs_words; let docs_words = ref_index.docs_words_index;
let documents = &lease_inner.raw.documents; let documents = ref_index.documents_index;
let main = &lease_inner.raw.main; let main = ref_index.main_index;
let words = &lease_inner.raw.words; let words = ref_index.words_index;
// 1. remove the previous documents match indexes // 1. remove the previous documents match indexes
let mut documents_deletion = DocumentsDeletion::new(self.inner, self.ranked_map.clone()); let mut documents_deletion = DocumentsDeletion::new(self.inner, self.ranked_map.clone());
@ -119,15 +120,14 @@ impl<'a> DocumentsAddition<'a> {
main.set_ranked_map(&self.ranked_map)?; main.set_ranked_map(&self.ranked_map)?;
// update the "consistent" view of the Index // update the "consistent" view of the Index
let cache = ref_index.cache;
let words = Arc::new(words); let words = Arc::new(words);
let ranked_map = self.ranked_map; let ranked_map = self.ranked_map;
let synonyms = lease_inner.synonyms.clone(); let synonyms = cache.synonyms.clone();
let schema = lease_inner.schema.clone(); let schema = cache.schema.clone();
let raw = lease_inner.raw.clone();
lease_inner.raw.compact();
let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; let cache = Cache { words, synonyms, schema, ranked_map };
self.inner.0.store(Arc::new(inner)); self.inner.cache.store(Arc::new(cache));
Ok(()) Ok(())
} }

View File

@ -8,7 +8,8 @@ use sdset::{SetBuf, SetOperation, duo::DifferenceByKey};
use crate::RankedMap; use crate::RankedMap;
use crate::serde::extract_document_id; use crate::serde::extract_document_id;
use super::{Index, Error, InnerIndex}; use super::{Index, Error};
use super::index::Cache;
pub struct DocumentsDeletion<'a> { pub struct DocumentsDeletion<'a> {
inner: &'a Index, inner: &'a Index,
@ -28,7 +29,7 @@ impl<'a> DocumentsDeletion<'a> {
pub fn delete_document<D>(&mut self, document: D) -> Result<(), Error> pub fn delete_document<D>(&mut self, document: D) -> Result<(), Error>
where D: serde::Serialize, where D: serde::Serialize,
{ {
let schema = &self.inner.lease_inner().schema; let schema = &self.inner.schema();
let identifier = schema.identifier_name(); let identifier = schema.identifier_name();
let document_id = match extract_document_id(identifier, &document)? { let document_id = match extract_document_id(identifier, &document)? {
@ -42,12 +43,12 @@ impl<'a> DocumentsDeletion<'a> {
} }
pub fn finalize(mut self) -> Result<(), Error> { pub fn finalize(mut self) -> Result<(), Error> {
let lease_inner = self.inner.lease_inner(); let ref_index = self.inner.as_ref();
let docs_words = &lease_inner.raw.docs_words; let schema = self.inner.schema();
let documents = &lease_inner.raw.documents; let docs_words = ref_index.docs_words_index;
let main = &lease_inner.raw.main; let documents = ref_index.documents_index;
let schema = &lease_inner.schema; let main = ref_index.main_index;
let words = &lease_inner.raw.words; let words = ref_index.words_index;
let idset = SetBuf::from_dirty(self.documents); let idset = SetBuf::from_dirty(self.documents);
@ -118,15 +119,14 @@ impl<'a> DocumentsDeletion<'a> {
main.set_ranked_map(&self.ranked_map)?; main.set_ranked_map(&self.ranked_map)?;
// update the "consistent" view of the Index // update the "consistent" view of the Index
let cache = ref_index.cache;
let words = Arc::new(words); let words = Arc::new(words);
let ranked_map = lease_inner.ranked_map.clone(); let ranked_map = self.ranked_map;
let synonyms = lease_inner.synonyms.clone(); let synonyms = cache.synonyms.clone();
let schema = lease_inner.schema.clone(); let schema = cache.schema.clone();
let raw = lease_inner.raw.clone();
lease_inner.raw.compact();
let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; let cache = Cache { words, synonyms, schema, ranked_map };
self.inner.0.store(Arc::new(inner)); self.inner.cache.store(Arc::new(cache));
Ok(()) Ok(())
} }

View File

@ -1,90 +0,0 @@
use std::convert::TryInto;
use meilidb_core::DocumentId;
use meilidb_schema::SchemaAttr;
use rocksdb::DBVector;
use crate::database::raw_index::InnerRawIndex;
use crate::document_attr_key::DocumentAttrKey;
#[derive(Clone)]
pub struct DocumentsIndex(pub(crate) InnerRawIndex);
impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<Option<DBVector>, rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key)
}
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.set(key, value)?;
Ok(())
}
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> Result<(), rocksdb::Error> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.delete(key)?;
Ok(())
}
pub fn del_all_document_fields(&self, id: DocumentId) -> Result<(), rocksdb::Error> {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
self.0.delete_range(start, end)?;
Ok(())
}
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
let from = rocksdb::IteratorMode::From(&start[..], rocksdb::Direction::Forward);
let iter = self.0.iterator(from).unwrap();
DocumentFieldsIter(iter, end.to_vec())
}
pub fn len(&self) -> Result<usize, rocksdb::Error> {
let mut last_document_id = None;
let mut count = 0;
let from = rocksdb::IteratorMode::Start;
let iterator = self.0.iterator(from)?;
for (key, _) in iterator {
let slice = key.as_ref().try_into().unwrap();
let document_id = DocumentAttrKey::from_be_bytes(slice).document_id;
if Some(document_id) != last_document_id {
last_document_id = Some(document_id);
count += 1;
}
}
Ok(count)
}
}
pub struct DocumentFieldsIter<'a>(rocksdb::DBIterator<'a>, Vec<u8>);
impl<'a> Iterator for DocumentFieldsIter<'a> {
type Item = Result<(SchemaAttr, Box<[u8]>), rocksdb::Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
Some((key, value)) => {
if key.as_ref() > self.1.as_ref() {
return None;
}
let slice: &[u8] = key.as_ref();
let array = slice.try_into().unwrap();
let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value)))
},
None => None,
}
}
}

View File

@ -7,15 +7,15 @@ pub enum Error {
SchemaMissing, SchemaMissing,
WordIndexMissing, WordIndexMissing,
MissingDocumentId, MissingDocumentId,
RocksdbError(rocksdb::Error), SledError(sled::Error),
FstError(fst::Error), FstError(fst::Error),
BincodeError(bincode::Error), BincodeError(bincode::Error),
SerializerError(SerializerError), SerializerError(SerializerError),
} }
impl From<rocksdb::Error> for Error { impl From<sled::Error> for Error {
fn from(error: rocksdb::Error) -> Error { fn from(error: sled::Error) -> Error {
Error::RocksdbError(error) Error::SledError(error)
} }
} }
@ -45,7 +45,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"), SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"), WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"), MissingDocumentId => write!(f, "document id is missing"),
RocksdbError(e) => write!(f, "RocksDB error; {}", e), SledError(e) => write!(f, "Sled error; {}", e),
FstError(e) => write!(f, "fst error; {}", e), FstError(e) => write!(f, "fst error; {}", e),
BincodeError(e) => write!(f, "bincode error; {}", e), BincodeError(e) => write!(f, "bincode error; {}", e),
SerializerError(e) => write!(f, "serializer error; {}", e), SerializerError(e) => write!(f, "serializer error; {}", e),

View File

@ -1,170 +0,0 @@
use sdset::SetBuf;
use std::collections::HashSet;
use std::sync::Arc;
use arc_swap::{ArcSwap, Lease};
use meilidb_core::criterion::Criteria;
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
use meilidb_schema::Schema;
use rmp_serde::decode::Error as RmpError;
use serde::de;
use crate::ranked_map::RankedMap;
use crate::serde::Deserializer;
use super::{Error, CustomSettings};
use super::{
RawIndex,
DocumentsAddition, DocumentsDeletion,
SynonymsAddition, SynonymsDeletion,
};
#[derive(Copy, Clone)]
pub struct IndexStats {
pub number_of_words: usize,
pub number_of_documents: usize,
pub number_attrs_in_ranked_map: usize,
}
#[derive(Clone)]
pub struct Index(pub ArcSwap<InnerIndex>);
pub struct InnerIndex {
pub words: Arc<fst::Set>,
pub synonyms: Arc<fst::Set>,
pub schema: Schema,
pub ranked_map: RankedMap,
pub raw: RawIndex, // TODO this will be a snapshot in the future
}
impl Index {
pub fn from_raw(raw: RawIndex) -> Result<Index, Error> {
let words = match raw.main.words_set()? {
Some(words) => Arc::new(words),
None => Arc::new(fst::Set::default()),
};
let synonyms = match raw.main.synonyms_set()? {
Some(synonyms) => Arc::new(synonyms),
None => Arc::new(fst::Set::default()),
};
let schema = match raw.main.schema()? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let ranked_map = match raw.main.ranked_map()? {
Some(map) => map,
None => RankedMap::default(),
};
let inner = InnerIndex { words, synonyms, schema, ranked_map, raw };
let index = Index(ArcSwap::new(Arc::new(inner)));
Ok(index)
}
pub fn stats(&self) -> Result<IndexStats, rocksdb::Error> {
let lease = self.0.lease();
Ok(IndexStats {
number_of_words: lease.words.len(),
number_of_documents: lease.raw.documents.len()?,
number_attrs_in_ranked_map: lease.ranked_map.len(),
})
}
pub fn query_builder(&self) -> QueryBuilder<IndexLease> {
let lease = IndexLease(self.0.lease());
QueryBuilder::new(lease)
}
pub fn query_builder_with_criteria<'c>(
&self,
criteria: Criteria<'c>,
) -> QueryBuilder<'c, IndexLease>
{
let lease = IndexLease(self.0.lease());
QueryBuilder::with_criteria(lease, criteria)
}
pub fn lease_inner(&self) -> Lease<Arc<InnerIndex>> {
self.0.lease()
}
pub fn schema(&self) -> Schema {
self.0.lease().schema.clone()
}
pub fn custom_settings(&self) -> CustomSettings {
self.0.lease().raw.custom.clone()
}
pub fn documents_addition(&self) -> DocumentsAddition {
let ranked_map = self.0.lease().ranked_map.clone();
DocumentsAddition::new(self, ranked_map)
}
pub fn documents_deletion(&self) -> DocumentsDeletion {
let ranked_map = self.0.lease().ranked_map.clone();
DocumentsDeletion::new(self, ranked_map)
}
pub fn synonyms_addition(&self) -> SynonymsAddition {
SynonymsAddition::new(self)
}
pub fn synonyms_deletion(&self) -> SynonymsDeletion {
SynonymsDeletion::new(self)
}
pub fn document<T>(
&self,
fields: Option<&HashSet<&str>>,
id: DocumentId,
) -> Result<Option<T>, RmpError>
where T: de::DeserializeOwned,
{
let schema = &self.lease_inner().schema;
let fields = fields
.map(|fields| {
fields
.iter()
.filter_map(|name| schema.attribute(name))
.collect()
});
let mut deserializer = Deserializer {
document_id: id,
index: &self,
fields: fields.as_ref(),
};
// TODO: currently we return an error if all document fields are missing,
// returning None would have been better
T::deserialize(&mut deserializer).map(Some)
}
}
pub struct IndexLease(Lease<Arc<InnerIndex>>);
impl Store for IndexLease {
type Error = Error;
fn words(&self) -> Result<&fst::Set, Self::Error> {
Ok(&self.0.words)
}
fn word_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Self::Error> {
Ok(self.0.raw.words.doc_indexes(word)?)
}
fn synonyms(&self) -> Result<&fst::Set, Self::Error> {
Ok(&self.0.synonyms)
}
fn alternatives_to(&self, word: &[u8]) -> Result<Option<fst::Set>, Self::Error> {
Ok(self.0.raw.synonyms.alternatives_to(word)?)
}
}

View File

@ -0,0 +1,13 @@
use std::sync::Arc;
use std::ops::Deref;
#[derive(Clone)]
pub struct CustomSettingsIndex(pub(crate) Arc<sled::Tree>);
impl Deref for CustomSettingsIndex {
type Target = sled::Tree;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -1,17 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use crate::database::raw_index::InnerRawIndex;
use super::Error; use super::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct DocsWordsIndex(pub(crate) InnerRawIndex); pub struct DocsWordsIndex(pub Arc<sled::Tree>);
impl DocsWordsIndex { impl DocsWordsIndex {
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> { pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
match self.0.get_pinned(key)? { match self.0.get(key)? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = Arc::from(bytes.as_ref()); let value = Arc::from(bytes.as_ref());
@ -24,13 +21,13 @@ impl DocsWordsIndex {
pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> { pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
self.0.set(key, words.as_fst().as_bytes())?; self.0.insert(key, words.as_fst().as_bytes())?;
Ok(()) Ok(())
} }
pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> { pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> {
let key = id.0.to_be_bytes(); let key = id.0.to_be_bytes();
self.0.delete(key)?; self.0.remove(key)?;
Ok(()) Ok(())
} }
} }

View File

@ -0,0 +1,91 @@
use std::sync::Arc;
use std::convert::TryInto;
use std::ops::Bound;
use meilidb_core::DocumentId;
use meilidb_schema::SchemaAttr;
use crate::document_attr_key::DocumentAttrKey;
fn document_fields_range(id: DocumentId) -> (Bound<[u8; 10]>, Bound<[u8; 10]>) {
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
(Bound::Included(start), Bound::Included(end))
}
#[derive(Clone)]
pub struct DocumentsIndex(pub(crate) Arc<sled::Tree>);
impl DocumentsIndex {
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<sled::IVec>> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.get(key)
}
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.insert(key, value)?;
Ok(())
}
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> {
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
self.0.remove(key)?;
Ok(())
}
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> {
let range = document_fields_range(id);
for result in self.0.range(range) {
let (key, _) = result?;
self.0.remove(key)?;
}
Ok(())
}
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
let range = document_fields_range(id);
let iter = self.0.range(range);
DocumentFieldsIter(iter)
}
pub fn len(&self) -> sled::Result<usize> {
let mut last_document_id = None;
let mut count = 0;
for result in self.0.iter() {
let (key, _) = result?;
let array = key.as_ref().try_into().unwrap();
let document_id = DocumentAttrKey::from_be_bytes(array).document_id;
if Some(document_id) != last_document_id {
last_document_id = Some(document_id);
count += 1;
}
}
Ok(count)
}
}
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>);
impl Iterator for DocumentFieldsIter<'_> {
type Item = sled::Result<(SchemaAttr, sled::IVec)>;
fn next(&mut self) -> Option<Self::Item> {
match self.0.next() {
Some(Ok((key, value))) => {
let array = key.as_ref().try_into().unwrap();
let key = DocumentAttrKey::from_be_bytes(array);
Some(Ok((key.attribute, value)))
},
Some(Err(e)) => return Some(Err(e)),
None => None,
}
}
}

View File

@ -1,8 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use meilidb_schema::Schema; use meilidb_schema::Schema;
use crate::database::raw_index::InnerRawIndex;
use crate::ranked_map::RankedMap; use crate::ranked_map::RankedMap;
use super::Error; use super::Error;
@ -13,11 +10,11 @@ const SYNONYMS_KEY: &str = "synonyms";
const RANKED_MAP_KEY: &str = "ranked-map"; const RANKED_MAP_KEY: &str = "ranked-map";
#[derive(Clone)] #[derive(Clone)]
pub struct MainIndex(pub(crate) InnerRawIndex); pub struct MainIndex(pub(crate) Arc<sled::Tree>);
impl MainIndex { impl MainIndex {
pub fn schema(&self) -> Result<Option<Schema>, Error> { pub fn schema(&self) -> Result<Option<Schema>, Error> {
match self.0.get_pinned(SCHEMA_KEY)? { match self.0.get(SCHEMA_KEY)? {
Some(bytes) => { Some(bytes) => {
let schema = Schema::read_from_bin(bytes.as_ref())?; let schema = Schema::read_from_bin(bytes.as_ref())?;
Ok(Some(schema)) Ok(Some(schema))
@ -29,12 +26,12 @@ impl MainIndex {
pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> { pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
schema.write_to_bin(&mut bytes)?; schema.write_to_bin(&mut bytes)?;
self.0.set(SCHEMA_KEY, bytes)?; self.0.insert(SCHEMA_KEY, bytes)?;
Ok(()) Ok(())
} }
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> { pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
match self.0.get_pinned(WORDS_KEY)? { match self.0.get(WORDS_KEY)? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = Arc::from(bytes.as_ref()); let value = Arc::from(bytes.as_ref());
@ -46,11 +43,11 @@ impl MainIndex {
} }
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> { pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
self.0.set(WORDS_KEY, value.as_fst().as_bytes()).map_err(Into::into) self.0.insert(WORDS_KEY, value.as_fst().as_bytes()).map(drop).map_err(Into::into)
} }
pub fn synonyms_set(&self) -> Result<Option<fst::Set>, Error> { pub fn synonyms_set(&self) -> Result<Option<fst::Set>, Error> {
match self.0.get_pinned(SYNONYMS_KEY)? { match self.0.get(SYNONYMS_KEY)? {
Some(bytes) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let value = Arc::from(bytes.as_ref()); let value = Arc::from(bytes.as_ref());
@ -62,11 +59,11 @@ impl MainIndex {
} }
pub fn set_synonyms_set(&self, value: &fst::Set) -> Result<(), Error> { pub fn set_synonyms_set(&self, value: &fst::Set) -> Result<(), Error> {
self.0.set(SYNONYMS_KEY, value.as_fst().as_bytes()).map_err(Into::into) self.0.insert(SYNONYMS_KEY, value.as_fst().as_bytes()).map(drop).map_err(Into::into)
} }
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> { pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
match self.0.get_pinned(RANKED_MAP_KEY)? { match self.0.get(RANKED_MAP_KEY)? {
Some(bytes) => { Some(bytes) => {
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?; let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
Ok(Some(ranked_map)) Ok(Some(ranked_map))
@ -78,7 +75,7 @@ impl MainIndex {
pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> { pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
value.write_to_bin(&mut bytes)?; value.write_to_bin(&mut bytes)?;
self.0.set(RANKED_MAP_KEY, bytes)?; self.0.insert(RANKED_MAP_KEY, bytes)?;
Ok(()) Ok(())
} }
} }

View File

@ -0,0 +1,276 @@
use std::collections::HashSet;
use std::sync::Arc;
use arc_swap::{ArcSwap, Guard};
use meilidb_core::criterion::Criteria;
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
use meilidb_schema::Schema;
use sdset::SetBuf;
use serde::de;
use crate::ranked_map::RankedMap;
use crate::serde::{Deserializer, DeserializerError};
use super::Error;
pub use self::custom_settings_index::CustomSettingsIndex;
use self::docs_words_index::DocsWordsIndex;
use self::documents_index::DocumentsIndex;
use self::main_index::MainIndex;
use self::synonyms_index::SynonymsIndex;
use self::updates_index::UpdatesIndex;
use self::words_index::WordsIndex;
use super::{
DocumentsAddition, DocumentsDeletion,
SynonymsAddition, SynonymsDeletion,
};
mod custom_settings_index;
mod docs_words_index;
mod documents_index;
mod main_index;
mod synonyms_index;
mod updates_index;
mod words_index;
#[derive(Copy, Clone)]
pub struct IndexStats {
pub number_of_words: usize,
pub number_of_documents: usize,
pub number_attrs_in_ranked_map: usize,
}
#[derive(Clone)]
pub struct Index {
pub(crate) cache: ArcSwap<Cache>,
// TODO this will be a snapshot in the future
main_index: MainIndex,
synonyms_index: SynonymsIndex,
words_index: WordsIndex,
docs_words_index: DocsWordsIndex,
documents_index: DocumentsIndex,
custom_settings_index: CustomSettingsIndex,
updates_index: UpdatesIndex,
}
pub(crate) struct Cache {
pub words: Arc<fst::Set>,
pub synonyms: Arc<fst::Set>,
pub schema: Schema,
pub ranked_map: RankedMap,
}
impl Index {
pub fn new(db: &sled::Db, name: &str) -> Result<Index, Error> {
let main_index = db.open_tree(name).map(MainIndex)?;
let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?;
let updates = db.open_tree(format!("{}-updates", name))?;
let updates_results = db.open_tree(format!("{}-updates-results", name))?;
let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results);
let words = match main_index.words_set()? {
Some(words) => Arc::new(words),
None => Arc::new(fst::Set::default()),
};
let synonyms = match main_index.synonyms_set()? {
Some(synonyms) => Arc::new(synonyms),
None => Arc::new(fst::Set::default()),
};
let schema = match main_index.schema()? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let ranked_map = match main_index.ranked_map()? {
Some(map) => map,
None => RankedMap::default(),
};
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = ArcSwap::from_pointee(cache);
Ok(Index {
cache,
main_index,
synonyms_index,
words_index,
docs_words_index,
documents_index,
custom_settings_index,
updates_index,
})
}
pub fn with_schema(db: &sled::Db, name: &str, schema: Schema) -> Result<Index, Error> {
let main_index = db.open_tree(name).map(MainIndex)?;
let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?;
let updates = db.open_tree(format!("{}-updates", name))?;
let updates_results = db.open_tree(format!("{}-updates-results", name))?;
let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results);
let words = match main_index.words_set()? {
Some(words) => Arc::new(words),
None => Arc::new(fst::Set::default()),
};
let synonyms = match main_index.synonyms_set()? {
Some(synonyms) => Arc::new(synonyms),
None => Arc::new(fst::Set::default()),
};
match main_index.schema()? {
Some(current) => if current != schema {
return Err(Error::SchemaDiffer)
},
None => main_index.set_schema(&schema)?,
}
let ranked_map = match main_index.ranked_map()? {
Some(map) => map,
None => RankedMap::default(),
};
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = ArcSwap::from_pointee(cache);
Ok(Index {
cache,
main_index,
synonyms_index,
words_index,
docs_words_index,
documents_index,
custom_settings_index,
updates_index,
})
}
pub fn stats(&self) -> sled::Result<IndexStats> {
let cache = self.cache.load();
Ok(IndexStats {
number_of_words: cache.words.len(),
number_of_documents: self.documents_index.len()?,
number_attrs_in_ranked_map: cache.ranked_map.len(),
})
}
pub fn query_builder(&self) -> QueryBuilder<RefIndex> {
let ref_index = self.as_ref();
QueryBuilder::new(ref_index)
}
pub fn query_builder_with_criteria<'c>(
&self,
criteria: Criteria<'c>,
) -> QueryBuilder<'c, RefIndex>
{
let ref_index = self.as_ref();
QueryBuilder::with_criteria(ref_index, criteria)
}
pub fn as_ref(&self) -> RefIndex {
RefIndex {
cache: self.cache.load(),
main_index: &self.main_index,
synonyms_index: &self.synonyms_index,
words_index: &self.words_index,
docs_words_index: &self.docs_words_index,
documents_index: &self.documents_index,
custom_settings_index: &self.custom_settings_index,
}
}
pub fn schema(&self) -> Schema {
self.cache.load().schema.clone()
}
pub fn custom_settings(&self) -> CustomSettingsIndex {
self.custom_settings_index.clone()
}
pub fn documents_addition(&self) -> DocumentsAddition {
let ranked_map = self.cache.load().ranked_map.clone();
DocumentsAddition::new(self, ranked_map)
}
pub fn documents_deletion(&self) -> DocumentsDeletion {
let ranked_map = self.cache.load().ranked_map.clone();
DocumentsDeletion::new(self, ranked_map)
}
pub fn synonyms_addition(&self) -> SynonymsAddition {
SynonymsAddition::new(self)
}
pub fn synonyms_deletion(&self) -> SynonymsDeletion {
SynonymsDeletion::new(self)
}
pub fn document<T>(
&self,
fields: Option<&HashSet<&str>>,
id: DocumentId,
) -> Result<Option<T>, DeserializerError>
where T: de::DeserializeOwned,
{
let schema = self.schema();
let fields = match fields {
Some(fields) => fields.into_iter().map(|name| schema.attribute(name)).collect(),
None => None,
};
let mut deserializer = Deserializer {
document_id: id,
index: &self,
fields: fields.as_ref(),
};
// TODO: currently we return an error if all document fields are missing,
// returning None would have been better
T::deserialize(&mut deserializer).map(Some)
}
}
pub struct RefIndex<'a> {
pub(crate) cache: Guard<'static, Arc<Cache>>,
pub main_index: &'a MainIndex,
pub synonyms_index: &'a SynonymsIndex,
pub words_index: &'a WordsIndex,
pub docs_words_index: &'a DocsWordsIndex,
pub documents_index: &'a DocumentsIndex,
pub custom_settings_index: &'a CustomSettingsIndex,
}
impl Store for RefIndex<'_> {
type Error = Error;
fn words(&self) -> Result<&fst::Set, Self::Error> {
Ok(&self.cache.words)
}
fn word_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Self::Error> {
Ok(self.words_index.doc_indexes(word)?)
}
fn synonyms(&self) -> Result<&fst::Set, Self::Error> {
Ok(&self.cache.synonyms)
}
fn alternatives_to(&self, word: &[u8]) -> Result<Option<fst::Set>, Self::Error> {
Ok(self.synonyms_index.alternatives_to(word)?)
}
}

View File

@ -0,0 +1,21 @@
use std::sync::Arc;
#[derive(Clone)]
pub struct SynonymsIndex(pub(crate) Arc<sled::Tree>);
impl SynonymsIndex {
pub fn alternatives_to(&self, word: &[u8]) -> sled::Result<Option<fst::Set>> {
match self.0.get(word)? {
Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())),
None => Ok(None),
}
}
pub fn set_alternatives_to(&self, word: &[u8], value: Vec<u8>) -> sled::Result<()> {
self.0.insert(word, value).map(drop)
}
pub fn del_alternatives_of(&self, word: &[u8]) -> sled::Result<()> {
self.0.remove(word).map(drop)
}
}

View File

@ -0,0 +1,110 @@
use std::convert::TryInto;
use std::sync::Arc;
use std::thread;
use log::info;
use sled::Event;
use serde::{Serialize, Deserialize};
use super::Error;
use crate::database::{
DocumentsAddition, DocumentsDeletion, SynonymsAddition, SynonymsDeletion
};
fn event_is_set(event: &Event) -> bool {
match event {
Event::Set(_, _) => true,
_ => false,
}
}
#[derive(Serialize, Deserialize)]
enum Update {
DocumentsAddition( () /*DocumentsAddition*/),
DocumentsDeletion( () /*DocumentsDeletion*/),
SynonymsAddition( () /*SynonymsAddition*/),
SynonymsDeletion( () /*SynonymsDeletion*/),
}
#[derive(Clone)]
pub struct UpdatesIndex {
db: sled::Db,
updates: Arc<sled::Tree>,
results: Arc<sled::Tree>,
}
impl UpdatesIndex {
pub fn new(
db: sled::Db,
updates: Arc<sled::Tree>,
results: Arc<sled::Tree>,
) -> UpdatesIndex
{
let updates_clone = updates.clone();
let results_clone = results.clone();
let _handle = thread::spawn(move || {
loop {
let mut subscription = updates_clone.watch_prefix(vec![]);
while let Some((key, update)) = updates_clone.pop_min().unwrap() {
let array = key.as_ref().try_into().unwrap();
let id = u64::from_be_bytes(array);
match bincode::deserialize(&update).unwrap() {
Update::DocumentsAddition(_) => {
info!("processing the document addition (update number {})", id);
// ...
},
Update::DocumentsDeletion(_) => {
info!("processing the document deletion (update number {})", id);
// ...
},
Update::SynonymsAddition(_) => {
info!("processing the synonyms addition (update number {})", id);
// ...
},
Update::SynonymsDeletion(_) => {
info!("processing the synonyms deletion (update number {})", id);
// ...
},
}
}
// this subscription is just used to block
// the loop until a new update is inserted
subscription.filter(event_is_set).next();
}
});
UpdatesIndex { db, updates, results }
}
pub fn push_documents_addition(&self, addition: DocumentsAddition) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub fn push_documents_deletion(&self, deletion: DocumentsDeletion) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub fn push_synonyms_addition(&self, addition: SynonymsAddition) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub fn push_synonyms_deletion(&self, deletion: SynonymsDeletion) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
fn raw_push_update(&self, raw_update: Vec<u8>) -> Result<u64, Error> {
let update_id = self.db.generate_id()?;
let update_id_array = update_id.to_be_bytes();
self.updates.insert(update_id_array, raw_update)?;
Ok(update_id)
}
}

View File

@ -1,14 +1,13 @@
use std::sync::Arc;
use meilidb_core::DocIndex; use meilidb_core::DocIndex;
use sdset::{Set, SetBuf}; use sdset::{Set, SetBuf};
use zerocopy::{LayoutVerified, AsBytes}; use zerocopy::{LayoutVerified, AsBytes};
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)] #[derive(Clone)]
pub struct WordsIndex(pub(crate) InnerRawIndex); pub struct WordsIndex(pub(crate) Arc<sled::Tree>);
impl WordsIndex { impl WordsIndex {
pub fn doc_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, rocksdb::Error> { pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> {
// we must force an allocation to make the memory aligned // we must force an allocation to make the memory aligned
match self.0.get(word)? { match self.0.get(word)? {
Some(bytes) => { Some(bytes) => {
@ -36,13 +35,11 @@ impl WordsIndex {
} }
} }
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> Result<(), rocksdb::Error> { pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> {
self.0.set(word, set.as_bytes())?; self.0.insert(word, set.as_bytes()).map(drop)
Ok(())
} }
pub fn del_doc_indexes(&self, word: &[u8]) -> Result<(), rocksdb::Error> { pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> {
self.0.delete(word)?; self.0.remove(word).map(drop)
Ok(())
} }
} }

View File

@ -1,88 +1,63 @@
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, RwLock}; use std::sync::RwLock;
use meilidb_schema::Schema; use meilidb_schema::Schema;
mod custom_settings;
mod docs_words_index;
mod documents_addition;
mod documents_deletion;
mod documents_index;
mod error; mod error;
mod index; mod index;
mod main_index;
mod raw_index; mod documents_addition;
mod documents_deletion;
mod synonyms_addition; mod synonyms_addition;
mod synonyms_deletion; mod synonyms_deletion;
mod synonyms_index;
mod words_index;
pub use self::error::Error; pub use self::error::Error;
pub use self::index::Index; pub use self::index::{Index, CustomSettingsIndex};
pub use self::custom_settings::CustomSettings;
use self::docs_words_index::DocsWordsIndex;
use self::documents_addition::DocumentsAddition; use self::documents_addition::DocumentsAddition;
use self::documents_deletion::DocumentsDeletion; use self::documents_deletion::DocumentsDeletion;
use self::synonyms_addition::SynonymsAddition; use self::synonyms_addition::SynonymsAddition;
use self::synonyms_deletion::SynonymsDeletion; use self::synonyms_deletion::SynonymsDeletion;
use self::documents_index::DocumentsIndex;
use self::index::InnerIndex; fn load_indexes(tree: &sled::Tree) -> Result<HashSet<String>, Error> {
use self::main_index::MainIndex; match tree.get("indexes")? {
use self::raw_index::{RawIndex, InnerRawIndex}; Some(bytes) => Ok(bincode::deserialize(&bytes)?),
use self::words_index::WordsIndex; None => Ok(HashSet::new())
use self::synonyms_index::SynonymsIndex; }
}
pub struct Database { pub struct Database {
cache: RwLock<HashMap<String, Arc<Index>>>, cache: RwLock<HashMap<String, Index>>,
inner: Arc<rocksdb::DB>, inner: sled::Db,
} }
impl Database { impl Database {
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> { pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
let path = path.as_ref();
let cache = RwLock::new(HashMap::new()); let cache = RwLock::new(HashMap::new());
let inner = sled::Db::open(path)?;
let options = { let indexes = load_indexes(&inner)?;
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options
};
let cfs = rocksdb::DB::list_cf(&options, path).unwrap_or(Vec::new());
let inner = Arc::new(rocksdb::DB::open_cf(&options, path, &cfs)?);
let database = Database { cache, inner }; let database = Database { cache, inner };
let mut indexes: Vec<_> = cfs.iter()
.filter_map(|c| c.split('-').nth(0).filter(|&c| c != "default"))
.collect();
indexes.sort_unstable();
indexes.dedup();
for index in indexes { for index in indexes {
database.open_index(index)?; database.open_index(&index)?;
} }
Ok(database) Ok(database)
} }
pub fn indexes(&self) -> Result<Option<HashSet<String>>, Error> { pub fn indexes(&self) -> Result<HashSet<String>, Error> {
let bytes = match self.inner.get("indexes")? { load_indexes(&self.inner)
Some(bytes) => bytes,
None => return Ok(None),
};
let indexes = bincode::deserialize(&bytes)?;
Ok(Some(indexes))
} }
fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> { fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
let bytes = bincode::serialize(value)?; let bytes = bincode::serialize(value)?;
self.inner.put("indexes", bytes)?; self.inner.insert("indexes", bytes)?;
Ok(()) Ok(())
} }
pub fn open_index(&self, name: &str) -> Result<Option<Arc<Index>>, Error> { pub fn open_index(&self, name: &str) -> Result<Option<Index>, Error> {
{ {
let cache = self.cache.read().unwrap(); let cache = self.cache.read().unwrap();
if let Some(index) = cache.get(name).cloned() { if let Some(index) = cache.get(name).cloned() {
@ -96,56 +71,19 @@ impl Database {
occupied.get().clone() occupied.get().clone()
}, },
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
if !self.indexes()?.map_or(false, |x| x.contains(name)) { if !self.indexes()?.contains(name) {
return Ok(None) return Ok(None)
} }
let main = { let index = Index::new(&self.inner, name)?;
self.inner.cf_handle(name).expect("cf not found"); vacant.insert(index).clone()
MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name)))
};
let synonyms = {
let cf_name = format!("{}-synonyms", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
SynonymsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let words = {
let cf_name = format!("{}-words", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let docs_words = {
let cf_name = format!("{}-docs-words", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let documents = {
let cf_name = format!("{}-documents", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let custom = {
let cf_name = format!("{}-custom", name);
self.inner.cf_handle(&cf_name).expect("cf not found");
CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let raw_index = RawIndex { main, synonyms, words, docs_words, documents, custom };
let index = Index::from_raw(raw_index)?;
vacant.insert(Arc::new(index)).clone()
}, },
}; };
Ok(Some(index)) Ok(Some(index))
} }
pub fn create_index(&self, name: &str, schema: Schema) -> Result<Arc<Index>, Error> { pub fn create_index(&self, name: &str, schema: Schema) -> Result<Index, Error> {
let mut cache = self.cache.write().unwrap(); let mut cache = self.cache.write().unwrap();
let index = match cache.entry(name.to_string()) { let index = match cache.entry(name.to_string()) {
@ -153,57 +91,13 @@ impl Database {
occupied.get().clone() occupied.get().clone()
}, },
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
let main = { let index = Index::with_schema(&self.inner, name, schema)?;
self.inner.create_cf(name, &rocksdb::Options::default())?;
MainIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(name)))
};
if let Some(prev_schema) = main.schema()? { let mut indexes = self.indexes()?;
if prev_schema != schema {
return Err(Error::SchemaDiffer)
}
}
main.set_schema(&schema)?;
let synonyms = {
let cf_name = format!("{}-synonyms", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
SynonymsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let words = {
let cf_name = format!("{}-words", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
WordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let docs_words = {
let cf_name = format!("{}-docs-words", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocsWordsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let documents = {
let cf_name = format!("{}-documents", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
DocumentsIndex(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let custom = {
let cf_name = format!("{}-custom", name);
self.inner.create_cf(&cf_name, &rocksdb::Options::default())?;
CustomSettings(InnerRawIndex::new(self.inner.clone(), Arc::from(cf_name)))
};
let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new);
indexes.insert(name.to_string()); indexes.insert(name.to_string());
self.set_indexes(&indexes)?; self.set_indexes(&indexes)?;
let raw_index = RawIndex { main, synonyms, words, docs_words, documents, custom }; vacant.insert(index).clone()
let index = Index::from_raw(raw_index)?;
vacant.insert(Arc::new(index)).clone()
}, },
}; };

View File

@ -1,88 +0,0 @@
use std::sync::Arc;
use super::{MainIndex, SynonymsIndex, WordsIndex, DocsWordsIndex, DocumentsIndex, CustomSettings};
#[derive(Clone)]
pub struct RawIndex {
pub main: MainIndex,
pub synonyms: SynonymsIndex,
pub words: WordsIndex,
pub docs_words: DocsWordsIndex,
pub documents: DocumentsIndex,
pub custom: CustomSettings,
}
impl RawIndex {
pub(crate) fn compact(&self) {
self.main.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.synonyms.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.words.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.docs_words.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.documents.0.compact_range(None::<&[u8]>, None::<&[u8]>);
self.custom.0.compact_range(None::<&[u8]>, None::<&[u8]>);
}
}
#[derive(Clone)]
pub struct InnerRawIndex {
database: Arc<rocksdb::DB>,
name: Arc<str>,
}
impl InnerRawIndex {
pub fn new(database: Arc<rocksdb::DB>, name: Arc<str>) -> InnerRawIndex {
InnerRawIndex { database, name }
}
pub fn get<K>(&self, key: K) -> Result<Option<rocksdb::DBVector>, rocksdb::Error>
where K: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.get_cf(cf, key)
}
pub fn get_pinned<K>(&self, key: K) -> Result<Option<rocksdb::DBPinnableSlice>, rocksdb::Error>
where K: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.get_pinned_cf(cf, key)
}
pub fn iterator(&self, from: rocksdb::IteratorMode) -> Result<rocksdb::DBIterator, rocksdb::Error> {
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.iterator_cf(cf, from)
}
pub fn set<K, V>(&self, key: K, value: V) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.put_cf(cf, key, value)
}
pub fn delete<K>(&self, key: K) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.delete_cf(cf, key)
}
pub fn delete_range<K>(&self, start: K, end: K) -> Result<(), rocksdb::Error>
where K: AsRef<[u8]>,
{
let mut batch = rocksdb::WriteBatch::default();
let cf = self.database.cf_handle(&self.name).expect("cf not found");
batch.delete_range_cf(cf, start, end)?;
self.database.write(batch)
}
pub fn compact_range<S, E>(&self, start: Option<S>, end: Option<E>)
where S: AsRef<[u8]>,
E: AsRef<[u8]>,
{
let cf = self.database.cf_handle(&self.name).expect("cf not found");
self.database.compact_range_cf(cf, start, end)
}
}

View File

@ -5,8 +5,8 @@ use fst::{SetBuilder, set::OpBuilder};
use meilidb_core::normalize_str; use meilidb_core::normalize_str;
use sdset::SetBuf; use sdset::SetBuf;
use crate::database::index::InnerIndex;
use super::{Error, Index}; use super::{Error, Index};
use super::index::Cache;
pub struct SynonymsAddition<'a> { pub struct SynonymsAddition<'a> {
inner: &'a Index, inner: &'a Index,
@ -29,9 +29,9 @@ impl<'a> SynonymsAddition<'a> {
} }
pub fn finalize(self) -> Result<(), Error> { pub fn finalize(self) -> Result<(), Error> {
let lease_inner = self.inner.lease_inner(); let ref_index = self.inner.as_ref();
let synonyms = &lease_inner.raw.synonyms; let synonyms = ref_index.synonyms_index;
let main = &lease_inner.raw.main; let main = ref_index.main_index;
let mut synonyms_builder = SetBuilder::memory(); let mut synonyms_builder = SetBuilder::memory();
@ -72,15 +72,14 @@ impl<'a> SynonymsAddition<'a> {
main.set_synonyms_set(&synonyms)?; main.set_synonyms_set(&synonyms)?;
// update the "consistent" view of the Index // update the "consistent" view of the Index
let cache = ref_index.cache;
let words = Arc::new(main.words_set()?.unwrap_or_default()); let words = Arc::new(main.words_set()?.unwrap_or_default());
let ranked_map = lease_inner.ranked_map.clone(); let ranked_map = cache.ranked_map.clone();
let synonyms = Arc::new(synonyms); let synonyms = Arc::new(synonyms);
let schema = lease_inner.schema.clone(); let schema = cache.schema.clone();
let raw = lease_inner.raw.clone();
lease_inner.raw.compact();
let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; let cache = Cache { words, synonyms, schema, ranked_map };
self.inner.0.store(Arc::new(inner)); self.inner.cache.store(Arc::new(cache));
Ok(()) Ok(())
} }

View File

@ -6,8 +6,8 @@ use fst::{SetBuilder, set::OpBuilder};
use meilidb_core::normalize_str; use meilidb_core::normalize_str;
use sdset::SetBuf; use sdset::SetBuf;
use crate::database::index::InnerIndex;
use super::{Error, Index}; use super::{Error, Index};
use super::index::Cache;
pub struct SynonymsDeletion<'a> { pub struct SynonymsDeletion<'a> {
inner: &'a Index, inner: &'a Index,
@ -39,9 +39,9 @@ impl<'a> SynonymsDeletion<'a> {
} }
pub fn finalize(self) -> Result<(), Error> { pub fn finalize(self) -> Result<(), Error> {
let lease_inner = self.inner.lease_inner(); let ref_index = self.inner.as_ref();
let synonyms = &lease_inner.raw.synonyms; let synonyms = ref_index.synonyms_index;
let main = &lease_inner.raw.main; let main = ref_index.main_index;
let mut delete_whole_synonym_builder = SetBuilder::memory(); let mut delete_whole_synonym_builder = SetBuilder::memory();
@ -115,15 +115,14 @@ impl<'a> SynonymsDeletion<'a> {
main.set_synonyms_set(&synonyms)?; main.set_synonyms_set(&synonyms)?;
// update the "consistent" view of the Index // update the "consistent" view of the Index
let cache = ref_index.cache;
let words = Arc::new(main.words_set()?.unwrap_or_default()); let words = Arc::new(main.words_set()?.unwrap_or_default());
let ranked_map = lease_inner.ranked_map.clone(); let ranked_map = cache.ranked_map.clone();
let synonyms = Arc::new(synonyms); let synonyms = Arc::new(synonyms);
let schema = lease_inner.schema.clone(); let schema = cache.schema.clone();
let raw = lease_inner.raw.clone();
lease_inner.raw.compact();
let inner = InnerIndex { words, synonyms, schema, ranked_map, raw }; let cache = Cache { words, synonyms, schema, ranked_map };
self.inner.0.store(Arc::new(inner)); self.inner.cache.store(Arc::new(cache));
Ok(()) Ok(())
} }

View File

@ -1,23 +0,0 @@
use crate::database::raw_index::InnerRawIndex;
#[derive(Clone)]
pub struct SynonymsIndex(pub(crate) InnerRawIndex);
impl SynonymsIndex {
pub fn alternatives_to(&self, word: &[u8]) -> Result<Option<fst::Set>, rocksdb::Error> {
match self.0.get(word)? {
Some(vector) => Ok(Some(fst::Set::from_bytes(vector.to_vec()).unwrap())),
None => Ok(None),
}
}
pub fn set_alternatives_to(&self, word: &[u8], value: Vec<u8>) -> Result<(), rocksdb::Error> {
self.0.set(word, value)?;
Ok(())
}
pub fn del_alternatives_of(&self, word: &[u8]) -> Result<(), rocksdb::Error> {
self.0.delete(word)?;
Ok(())
}
}

View File

@ -5,8 +5,8 @@ mod number;
mod ranked_map; mod ranked_map;
mod serde; mod serde;
pub use rocksdb; pub use sled;
pub use self::database::{Database, Index, CustomSettings}; pub use self::database::{Database, Index, CustomSettingsIndex};
pub use self::number::Number; pub use self::number::Number;
pub use self::ranked_map::RankedMap; pub use self::ranked_map::RankedMap;
pub use self::serde::{compute_document_id, extract_document_id, value_to_string}; pub use self::serde::{compute_document_id, extract_document_id, value_to_string};

View File

@ -1,5 +1,6 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::io::Cursor; use std::io::Cursor;
use std::{fmt, error::Error};
use meilidb_core::DocumentId; use meilidb_core::DocumentId;
use meilidb_schema::SchemaAttr; use meilidb_schema::SchemaAttr;
@ -9,6 +10,43 @@ use serde::{de, forward_to_deserialize_any};
use crate::database::Index; use crate::database::Index;
#[derive(Debug)]
pub enum DeserializerError {
RmpError(RmpError),
SledError(sled::Error),
Custom(String),
}
impl de::Error for DeserializerError {
fn custom<T: fmt::Display>(msg: T) -> Self {
DeserializerError::Custom(msg.to_string())
}
}
impl fmt::Display for DeserializerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DeserializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
DeserializerError::SledError(e) => write!(f, "Sled related error: {}", e),
DeserializerError::Custom(s) => f.write_str(s),
}
}
}
impl Error for DeserializerError {}
impl From<RmpError> for DeserializerError {
fn from(error: RmpError) -> DeserializerError {
DeserializerError::RmpError(error)
}
}
impl From<sled::Error> for DeserializerError {
fn from(error: sled::Error) -> DeserializerError {
DeserializerError::SledError(error)
}
}
pub struct Deserializer<'a> { pub struct Deserializer<'a> {
pub document_id: DocumentId, pub document_id: DocumentId,
pub index: &'a Index, pub index: &'a Index,
@ -17,7 +55,7 @@ pub struct Deserializer<'a> {
impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
{ {
type Error = RmpError; type Error = DeserializerError;
fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error> fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: de::Visitor<'de> where V: de::Visitor<'de>
@ -34,22 +72,16 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
fn deserialize_map<V>(self, visitor: V) -> Result<V::Value, Self::Error> fn deserialize_map<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: de::Visitor<'de> where V: de::Visitor<'de>
{ {
let schema = &self.index.lease_inner().schema; let schema = self.index.schema();
let documents = &self.index.lease_inner().raw.documents; let documents = self.index.as_ref().documents_index;
let document_attributes = documents.document_fields(self.document_id); let mut error = None;
let document_attributes = document_attributes.filter_map(|result| {
let iter = documents
.document_fields(self.document_id)
.filter_map(|result| {
match result { match result {
Ok(value) => Some(value), Ok((attr, value)) => {
Err(e) => {
// TODO: must log the error
// error!("sled iter error; {}", e);
None
},
}
});
let iter = document_attributes.filter_map(|(attr, value)| {
let is_displayed = schema.props(attr).is_displayed(); let is_displayed = schema.props(attr).is_displayed();
if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) { if is_displayed && self.fields.map_or(true, |f| f.contains(&attr)) {
let attribute_name = schema.attribute_name(attr); let attribute_name = schema.attribute_name(attr);
@ -57,10 +89,24 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a>
} else { } else {
None None
} }
},
Err(e) => {
if error.is_none() {
error = Some(e);
}
None
}
}
}); });
let map_deserializer = de::value::MapDeserializer::new(iter); let map_deserializer = de::value::MapDeserializer::new(iter);
visitor.visit_map(map_deserializer) let result = visitor.visit_map(map_deserializer).map_err(DeserializerError::from);
if let Some(e) = error {
return Err(DeserializerError::from(e))
}
result
} }
} }

View File

@ -15,7 +15,7 @@ mod extract_document_id;
mod indexer; mod indexer;
mod serializer; mod serializer;
pub use self::deserializer::Deserializer; pub use self::deserializer::{Deserializer, DeserializerError};
pub use self::extract_document_id::{extract_document_id, compute_document_id, value_to_string}; pub use self::extract_document_id::{extract_document_id, compute_document_id, value_to_string};
pub use self::convert_to_string::ConvertToString; pub use self::convert_to_string::ConvertToString;
pub use self::convert_to_number::ConvertToNumber; pub use self::convert_to_number::ConvertToNumber;
@ -38,8 +38,8 @@ pub enum SerializerError {
DocumentIdNotFound, DocumentIdNotFound,
InvalidDocumentIdType, InvalidDocumentIdType,
RmpError(RmpError), RmpError(RmpError),
SledError(sled::Error),
SerdeJsonError(SerdeJsonError), SerdeJsonError(SerdeJsonError),
RocksdbError(rocksdb::Error),
ParseNumberError(ParseNumberError), ParseNumberError(ParseNumberError),
UnserializableType { type_name: &'static str }, UnserializableType { type_name: &'static str },
UnindexableType { type_name: &'static str }, UnindexableType { type_name: &'static str },
@ -63,8 +63,8 @@ impl fmt::Display for SerializerError {
write!(f, "document identifier can only be of type string or number") write!(f, "document identifier can only be of type string or number")
}, },
SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e), SerializerError::RmpError(e) => write!(f, "rmp serde related error: {}", e),
SerializerError::SledError(e) => write!(f, "Sled related error: {}", e),
SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e), SerializerError::SerdeJsonError(e) => write!(f, "serde json error: {}", e),
SerializerError::RocksdbError(e) => write!(f, "RocksDB related error: {}", e),
SerializerError::ParseNumberError(e) => { SerializerError::ParseNumberError(e) => {
write!(f, "error while trying to parse a number: {}", e) write!(f, "error while trying to parse a number: {}", e)
}, },
@ -102,9 +102,9 @@ impl From<SerdeJsonError> for SerializerError {
} }
} }
impl From<rocksdb::Error> for SerializerError { impl From<sled::Error> for SerializerError {
fn from(error: rocksdb::Error) -> SerializerError { fn from(error: sled::Error) -> SerializerError {
SerializerError::RocksdbError(error) SerializerError::SledError(error)
} }
} }

View File

@ -12,7 +12,7 @@ fn simple_schema() -> Schema {
#[test] #[test]
fn insert_delete_document() { fn insert_delete_document() {
let tmp_dir = tempfile::tempdir().unwrap(); let tmp_dir = tempfile::tempdir().unwrap();
let database = Database::start_default(&tmp_dir).unwrap(); let database = Database::open(&tmp_dir).unwrap();
let schema = simple_schema(); let schema = simple_schema();
let index = database.create_index("hello", schema).unwrap(); let index = database.create_index("hello", schema).unwrap();
@ -38,7 +38,7 @@ fn insert_delete_document() {
#[test] #[test]
fn replace_document() { fn replace_document() {
let tmp_dir = tempfile::tempdir().unwrap(); let tmp_dir = tempfile::tempdir().unwrap();
let database = Database::start_default(&tmp_dir).unwrap(); let database = Database::open(&tmp_dir).unwrap();
let schema = simple_schema(); let schema = simple_schema();
let index = database.create_index("hello", schema).unwrap(); let index = database.create_index("hello", schema).unwrap();

View File

@ -85,7 +85,7 @@ fn index(
synonyms: Vec<Synonym>, synonyms: Vec<Synonym>,
) -> Result<Database, Box<dyn Error>> ) -> Result<Database, Box<dyn Error>>
{ {
let database = Database::start_default(database_path)?; let database = Database::open(database_path)?;
let mut wtr = csv::Writer::from_path("./stats.csv").unwrap(); let mut wtr = csv::Writer::from_path("./stats.csv").unwrap();
wtr.write_record(&["NumberOfDocuments", "DiskUsed", "MemoryUsed"])?; wtr.write_record(&["NumberOfDocuments", "DiskUsed", "MemoryUsed"])?;

View File

@ -143,7 +143,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args(); let opt = Opt::from_args();
let start = Instant::now(); let start = Instant::now();
let database = Database::start_default(&opt.database_path)?; let database = Database::open(&opt.database_path)?;
let index = database.open_index("test")?.unwrap(); let index = database.open_index("test")?.unwrap();
let schema = index.schema(); let schema = index.schema();