mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 22:14:32 +01:00
chore: Split the database structure internal types
This commit is contained in:
parent
169bd4cb39
commit
2e31bb519a
@ -1,719 +0,0 @@
|
|||||||
use std::collections::hash_map::Entry;
|
|
||||||
use std::collections::{BTreeSet, HashSet, HashMap};
|
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::{error, fmt};
|
|
||||||
|
|
||||||
use arc_swap::{ArcSwap, Lease};
|
|
||||||
use meilidb_core::{criterion::Criteria, QueryBuilder, Store, DocumentId, DocIndex};
|
|
||||||
use rmp_serde::decode::{Error as RmpError};
|
|
||||||
use sdset::{Set, SetBuf, SetOperation, duo::{Union, DifferenceByKey}};
|
|
||||||
use serde::de;
|
|
||||||
use sled::IVec;
|
|
||||||
use zerocopy::{AsBytes, LayoutVerified};
|
|
||||||
use fst::{SetBuilder, set::OpBuilder, Streamer};
|
|
||||||
|
|
||||||
use crate::document_attr_key::DocumentAttrKey;
|
|
||||||
use crate::indexer::Indexer;
|
|
||||||
use crate::serde::extract_document_id;
|
|
||||||
use crate::serde::{Serializer, RamDocumentStore, Deserializer, SerializerError};
|
|
||||||
use crate::{Schema, SchemaAttr, RankedMap};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Error {
|
|
||||||
SchemaDiffer,
|
|
||||||
SchemaMissing,
|
|
||||||
WordIndexMissing,
|
|
||||||
MissingDocumentId,
|
|
||||||
SledError(sled::Error),
|
|
||||||
FstError(fst::Error),
|
|
||||||
BincodeError(bincode::Error),
|
|
||||||
SerializerError(SerializerError),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<sled::Error> for Error {
|
|
||||||
fn from(error: sled::Error) -> Error {
|
|
||||||
Error::SledError(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<fst::Error> for Error {
|
|
||||||
fn from(error: fst::Error) -> Error {
|
|
||||||
Error::FstError(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<bincode::Error> for Error {
|
|
||||||
fn from(error: bincode::Error) -> Error {
|
|
||||||
Error::BincodeError(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<SerializerError> for Error {
|
|
||||||
fn from(error: SerializerError) -> Error {
|
|
||||||
Error::SerializerError(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for Error {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
use self::Error::*;
|
|
||||||
match self {
|
|
||||||
SchemaDiffer => write!(f, "schemas differ"),
|
|
||||||
SchemaMissing => write!(f, "this index does not have a schema"),
|
|
||||||
WordIndexMissing => write!(f, "this index does not have a word index"),
|
|
||||||
MissingDocumentId => write!(f, "document id is missing"),
|
|
||||||
SledError(e) => write!(f, "sled error; {}", e),
|
|
||||||
FstError(e) => write!(f, "fst error; {}", e),
|
|
||||||
BincodeError(e) => write!(f, "bincode error; {}", e),
|
|
||||||
SerializerError(e) => write!(f, "serializer error; {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl error::Error for Error { }
|
|
||||||
|
|
||||||
pub struct Database {
|
|
||||||
cache: RwLock<HashMap<String, Arc<Index>>>,
|
|
||||||
inner: sled::Db,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Database {
|
|
||||||
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
|
|
||||||
let cache = RwLock::new(HashMap::new());
|
|
||||||
let inner = sled::Db::start_default(path)?;
|
|
||||||
Ok(Database { cache, inner })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn indexes(&self) -> Result<Option<HashSet<String>>, Error> {
|
|
||||||
let bytes = match self.inner.get("indexes")? {
|
|
||||||
Some(bytes) => bytes,
|
|
||||||
None => return Ok(None),
|
|
||||||
};
|
|
||||||
|
|
||||||
let indexes = bincode::deserialize(&bytes)?;
|
|
||||||
Ok(Some(indexes))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
|
|
||||||
let bytes = bincode::serialize(value)?;
|
|
||||||
self.inner.set("indexes", bytes)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn open_index(&self, name: &str) -> Result<Option<Arc<Index>>, Error> {
|
|
||||||
{
|
|
||||||
let cache = self.cache.read().unwrap();
|
|
||||||
if let Some(index) = cache.get(name).cloned() {
|
|
||||||
return Ok(Some(index))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut cache = self.cache.write().unwrap();
|
|
||||||
let index = match cache.entry(name.to_string()) {
|
|
||||||
Entry::Occupied(occupied) => {
|
|
||||||
occupied.get().clone()
|
|
||||||
},
|
|
||||||
Entry::Vacant(vacant) => {
|
|
||||||
if !self.indexes()?.map_or(false, |x| x.contains(name)) {
|
|
||||||
return Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
let main = {
|
|
||||||
let tree = self.inner.open_tree(name)?;
|
|
||||||
MainIndex(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let words = {
|
|
||||||
let tree_name = format!("{}-words", name);
|
|
||||||
let tree = self.inner.open_tree(tree_name)?;
|
|
||||||
WordsIndex(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let docs_words = {
|
|
||||||
let tree_name = format!("{}-docs-words", name);
|
|
||||||
let tree = self.inner.open_tree(tree_name)?;
|
|
||||||
DocsWords(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let documents = {
|
|
||||||
let tree_name = format!("{}-documents", name);
|
|
||||||
let tree = self.inner.open_tree(tree_name)?;
|
|
||||||
DocumentsIndex(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let raw_index = RawIndex { main, words, docs_words, documents };
|
|
||||||
let index = Index::from_raw(raw_index)?;
|
|
||||||
|
|
||||||
vacant.insert(Arc::new(index)).clone()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Some(index))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn create_index(&self, name: &str, schema: Schema) -> Result<Arc<Index>, Error> {
|
|
||||||
let mut cache = self.cache.write().unwrap();
|
|
||||||
|
|
||||||
let index = match cache.entry(name.to_string()) {
|
|
||||||
Entry::Occupied(occupied) => {
|
|
||||||
occupied.get().clone()
|
|
||||||
},
|
|
||||||
Entry::Vacant(vacant) => {
|
|
||||||
let main = {
|
|
||||||
let tree = self.inner.open_tree(name)?;
|
|
||||||
MainIndex(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(prev_schema) = main.schema()? {
|
|
||||||
if prev_schema != schema {
|
|
||||||
return Err(Error::SchemaDiffer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
main.set_schema(&schema)?;
|
|
||||||
|
|
||||||
let words = {
|
|
||||||
let tree_name = format!("{}-words", name);
|
|
||||||
let tree = self.inner.open_tree(tree_name)?;
|
|
||||||
WordsIndex(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let docs_words = {
|
|
||||||
let tree_name = format!("{}-docs-words", name);
|
|
||||||
let tree = self.inner.open_tree(tree_name)?;
|
|
||||||
DocsWords(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let documents = {
|
|
||||||
let tree_name = format!("{}-documents", name);
|
|
||||||
let tree = self.inner.open_tree(tree_name)?;
|
|
||||||
DocumentsIndex(tree)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new);
|
|
||||||
indexes.insert(name.to_string());
|
|
||||||
self.set_indexes(&indexes)?;
|
|
||||||
|
|
||||||
let raw_index = RawIndex { main, words, docs_words, documents };
|
|
||||||
let index = Index::from_raw(raw_index)?;
|
|
||||||
|
|
||||||
vacant.insert(Arc::new(index)).clone()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(index)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct RawIndex {
|
|
||||||
pub main: MainIndex,
|
|
||||||
pub words: WordsIndex,
|
|
||||||
pub docs_words: DocsWords,
|
|
||||||
pub documents: DocumentsIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct MainIndex(Arc<sled::Tree>);
|
|
||||||
|
|
||||||
impl MainIndex {
|
|
||||||
pub fn schema(&self) -> Result<Option<Schema>, Error> {
|
|
||||||
match self.0.get("schema")? {
|
|
||||||
Some(bytes) => {
|
|
||||||
let schema = Schema::read_from_bin(bytes.as_ref())?;
|
|
||||||
Ok(Some(schema))
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> {
|
|
||||||
let mut bytes = Vec::new();
|
|
||||||
schema.write_to_bin(&mut bytes)?;
|
|
||||||
self.0.set("schema", bytes)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
|
|
||||||
match self.0.get("words")? {
|
|
||||||
Some(bytes) => {
|
|
||||||
let len = bytes.len();
|
|
||||||
let value = bytes.into();
|
|
||||||
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
|
|
||||||
Ok(Some(fst::Set::from(fst)))
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
|
|
||||||
self.0.set("words", value.as_fst().as_bytes())?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
|
|
||||||
match self.0.get("ranked-map")? {
|
|
||||||
Some(bytes) => {
|
|
||||||
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
|
|
||||||
Ok(Some(ranked_map))
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> {
|
|
||||||
let mut bytes = Vec::new();
|
|
||||||
value.write_to_bin(&mut bytes)?;
|
|
||||||
self.0.set("ranked_map", bytes)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct WordsIndex(Arc<sled::Tree>);
|
|
||||||
|
|
||||||
impl WordsIndex {
|
|
||||||
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> {
|
|
||||||
match self.0.get(word)? {
|
|
||||||
Some(bytes) => {
|
|
||||||
let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout");
|
|
||||||
let slice = layout.into_slice();
|
|
||||||
let setbuf = SetBuf::new_unchecked(slice.to_vec());
|
|
||||||
Ok(Some(setbuf))
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> {
|
|
||||||
self.0.set(word, set.as_bytes())?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> {
|
|
||||||
self.0.del(word)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DocsWords(Arc<sled::Tree>);
|
|
||||||
|
|
||||||
impl DocsWords {
|
|
||||||
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {
|
|
||||||
let key = id.0.to_be_bytes();
|
|
||||||
match self.0.get(key)? {
|
|
||||||
Some(bytes) => {
|
|
||||||
let len = bytes.len();
|
|
||||||
let value = bytes.into();
|
|
||||||
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
|
|
||||||
Ok(Some(fst::Set::from(fst)))
|
|
||||||
},
|
|
||||||
None => Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> {
|
|
||||||
let key = id.0.to_be_bytes();
|
|
||||||
self.0.set(key, words.as_fst().as_bytes())?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> {
|
|
||||||
let key = id.0.to_be_bytes();
|
|
||||||
self.0.del(key)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DocumentsIndex(Arc<sled::Tree>);
|
|
||||||
|
|
||||||
impl DocumentsIndex {
|
|
||||||
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<IVec>> {
|
|
||||||
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
|
|
||||||
self.0.get(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> {
|
|
||||||
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
|
|
||||||
self.0.set(key, value)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> {
|
|
||||||
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
|
|
||||||
self.0.del(key)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> {
|
|
||||||
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
|
|
||||||
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
|
|
||||||
let document_attrs = self.0.range(start..=end).keys();
|
|
||||||
|
|
||||||
for key in document_attrs {
|
|
||||||
self.0.del(key?)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
|
|
||||||
let start = DocumentAttrKey::new(id, SchemaAttr::min());
|
|
||||||
let start = start.to_be_bytes();
|
|
||||||
|
|
||||||
let end = DocumentAttrKey::new(id, SchemaAttr::max());
|
|
||||||
let end = end.to_be_bytes();
|
|
||||||
|
|
||||||
DocumentFieldsIter(self.0.range(start..=end))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>);
|
|
||||||
|
|
||||||
impl<'a> Iterator for DocumentFieldsIter<'a> {
|
|
||||||
type Item = sled::Result<(SchemaAttr, IVec)>;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
match self.0.next() {
|
|
||||||
Some(Ok((key, value))) => {
|
|
||||||
let slice: &[u8] = key.as_ref();
|
|
||||||
let array = slice.try_into().unwrap();
|
|
||||||
let key = DocumentAttrKey::from_be_bytes(array);
|
|
||||||
Some(Ok((key.attribute, value)))
|
|
||||||
},
|
|
||||||
Some(Err(e)) => Some(Err(e)),
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Index(ArcSwap<InnerIndex>);
|
|
||||||
|
|
||||||
pub struct InnerIndex {
|
|
||||||
pub words: fst::Set,
|
|
||||||
pub schema: Schema,
|
|
||||||
pub ranked_map: RankedMap,
|
|
||||||
pub raw: RawIndex, // TODO this will be a snapshot in the future
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Index {
|
|
||||||
fn from_raw(raw: RawIndex) -> Result<Index, Error> {
|
|
||||||
let words = match raw.main.words_set()? {
|
|
||||||
Some(words) => words,
|
|
||||||
None => fst::Set::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let schema = match raw.main.schema()? {
|
|
||||||
Some(schema) => schema,
|
|
||||||
None => return Err(Error::SchemaMissing),
|
|
||||||
};
|
|
||||||
|
|
||||||
let ranked_map = match raw.main.ranked_map()? {
|
|
||||||
Some(map) => map,
|
|
||||||
None => RankedMap::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let inner = InnerIndex { words, schema, ranked_map, raw };
|
|
||||||
let index = Index(ArcSwap::new(Arc::new(inner)));
|
|
||||||
|
|
||||||
Ok(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn query_builder(&self) -> QueryBuilder<IndexLease> {
|
|
||||||
let lease = IndexLease(self.0.lease());
|
|
||||||
QueryBuilder::new(lease)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn query_builder_with_criteria<'c>(
|
|
||||||
&self,
|
|
||||||
criteria: Criteria<'c>,
|
|
||||||
) -> QueryBuilder<'c, IndexLease>
|
|
||||||
{
|
|
||||||
let lease = IndexLease(self.0.lease());
|
|
||||||
QueryBuilder::with_criteria(lease, criteria)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn lease_inner(&self) -> Lease<Arc<InnerIndex>> {
|
|
||||||
self.0.lease()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn schema(&self) -> Schema {
|
|
||||||
self.0.lease().schema.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn documents_addition(&self) -> DocumentsAddition {
|
|
||||||
let ranked_map = self.0.lease().ranked_map.clone();
|
|
||||||
DocumentsAddition::new(self, ranked_map)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn documents_deletion(&self) -> DocumentsDeletion {
|
|
||||||
DocumentsDeletion::new(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn document<T>(
|
|
||||||
&self,
|
|
||||||
fields: Option<&HashSet<&str>>,
|
|
||||||
id: DocumentId,
|
|
||||||
) -> Result<Option<T>, RmpError>
|
|
||||||
where T: de::DeserializeOwned,
|
|
||||||
{
|
|
||||||
let schema = &self.lease_inner().schema;
|
|
||||||
let fields = fields
|
|
||||||
.map(|fields| {
|
|
||||||
fields
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|name| schema.attribute(name))
|
|
||||||
.collect()
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut deserializer = Deserializer {
|
|
||||||
document_id: id,
|
|
||||||
index: &self,
|
|
||||||
fields: fields.as_ref(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: currently we return an error if all document fields are missing,
|
|
||||||
// returning None would have been better
|
|
||||||
T::deserialize(&mut deserializer).map(Some)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IndexLease(Lease<Arc<InnerIndex>>);
|
|
||||||
|
|
||||||
impl Store for IndexLease {
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn words(&self) -> Result<&fst::Set, Self::Error> {
|
|
||||||
Ok(&self.0.words)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn word_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Self::Error> {
|
|
||||||
Ok(self.0.raw.words.doc_indexes(word)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DocumentsAddition<'a> {
|
|
||||||
inner: &'a Index,
|
|
||||||
document_ids: HashSet<DocumentId>,
|
|
||||||
document_store: RamDocumentStore,
|
|
||||||
indexer: Indexer,
|
|
||||||
ranked_map: RankedMap,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> DocumentsAddition<'a> {
|
|
||||||
fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> {
|
|
||||||
DocumentsAddition {
|
|
||||||
inner,
|
|
||||||
document_ids: HashSet::new(),
|
|
||||||
document_store: RamDocumentStore::new(),
|
|
||||||
indexer: Indexer::new(),
|
|
||||||
ranked_map,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_document<D>(&mut self, document: D) -> Result<(), Error>
|
|
||||||
where D: serde::Serialize,
|
|
||||||
{
|
|
||||||
let schema = &self.inner.lease_inner().schema;
|
|
||||||
let identifier = schema.identifier_name();
|
|
||||||
|
|
||||||
let document_id = match extract_document_id(identifier, &document)? {
|
|
||||||
Some(id) => id,
|
|
||||||
None => return Err(Error::MissingDocumentId),
|
|
||||||
};
|
|
||||||
|
|
||||||
// 1. store the document id for future deletion
|
|
||||||
self.document_ids.insert(document_id);
|
|
||||||
|
|
||||||
// 2. index the document fields in ram stores
|
|
||||||
let serializer = Serializer {
|
|
||||||
schema,
|
|
||||||
document_store: &mut self.document_store,
|
|
||||||
indexer: &mut self.indexer,
|
|
||||||
ranked_map: &mut self.ranked_map,
|
|
||||||
document_id,
|
|
||||||
};
|
|
||||||
|
|
||||||
document.serialize(serializer)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn finalize(self) -> Result<(), Error> {
|
|
||||||
let lease_inner = self.inner.lease_inner();
|
|
||||||
let main = &lease_inner.raw.main;
|
|
||||||
let words = &lease_inner.raw.words;
|
|
||||||
let docs_words = &lease_inner.raw.docs_words;
|
|
||||||
let documents = &lease_inner.raw.documents;
|
|
||||||
|
|
||||||
// 1. remove the previous documents match indexes
|
|
||||||
let mut documents_deletion = DocumentsDeletion::new(self.inner);
|
|
||||||
documents_deletion.extend(self.document_ids);
|
|
||||||
documents_deletion.finalize()?;
|
|
||||||
|
|
||||||
// 2. insert new document attributes in the database
|
|
||||||
for ((id, attr), value) in self.document_store.into_inner() {
|
|
||||||
documents.set_document_field(id, attr, value)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let indexed = self.indexer.build();
|
|
||||||
let mut delta_words_builder = SetBuilder::memory();
|
|
||||||
|
|
||||||
for (word, delta_set) in indexed.words_doc_indexes {
|
|
||||||
delta_words_builder.insert(&word).unwrap();
|
|
||||||
|
|
||||||
let set = match words.doc_indexes(&word)? {
|
|
||||||
Some(set) => Union::new(&set, &delta_set).into_set_buf(),
|
|
||||||
None => delta_set,
|
|
||||||
};
|
|
||||||
|
|
||||||
words.set_doc_indexes(&word, &set)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (id, words) in indexed.docs_words {
|
|
||||||
docs_words.set_doc_words(id, &words)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let delta_words = delta_words_builder
|
|
||||||
.into_inner()
|
|
||||||
.and_then(fst::Set::from_bytes)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let words = match main.words_set()? {
|
|
||||||
Some(words) => {
|
|
||||||
let op = OpBuilder::new()
|
|
||||||
.add(words.stream())
|
|
||||||
.add(delta_words.stream())
|
|
||||||
.r#union();
|
|
||||||
|
|
||||||
let mut words_builder = SetBuilder::memory();
|
|
||||||
words_builder.extend_stream(op).unwrap();
|
|
||||||
words_builder
|
|
||||||
.into_inner()
|
|
||||||
.and_then(fst::Set::from_bytes)
|
|
||||||
.unwrap()
|
|
||||||
},
|
|
||||||
None => delta_words,
|
|
||||||
};
|
|
||||||
|
|
||||||
main.set_words_set(&words)?;
|
|
||||||
main.set_ranked_map(&self.ranked_map)?;
|
|
||||||
|
|
||||||
// update the "consistent" view of the Index
|
|
||||||
let ranked_map = self.ranked_map;
|
|
||||||
let schema = lease_inner.schema.clone();
|
|
||||||
let raw = lease_inner.raw.clone();
|
|
||||||
|
|
||||||
let inner = InnerIndex { words, schema, ranked_map, raw };
|
|
||||||
self.inner.0.store(Arc::new(inner));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DocumentsDeletion<'a> {
|
|
||||||
inner: &'a Index,
|
|
||||||
documents: Vec<DocumentId>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> DocumentsDeletion<'a> {
|
|
||||||
fn new(inner: &'a Index) -> DocumentsDeletion {
|
|
||||||
DocumentsDeletion { inner, documents: Vec::new() }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete_document(&mut self, id: DocumentId) {
|
|
||||||
self.documents.push(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn finalize(mut self) -> Result<(), Error> {
|
|
||||||
let lease_inner = self.inner.lease_inner();
|
|
||||||
let main = &lease_inner.raw.main;
|
|
||||||
let docs_words = &lease_inner.raw.docs_words;
|
|
||||||
let words = &lease_inner.raw.words;
|
|
||||||
let documents = &lease_inner.raw.documents;
|
|
||||||
|
|
||||||
let idset = {
|
|
||||||
self.documents.sort_unstable();
|
|
||||||
self.documents.dedup();
|
|
||||||
SetBuf::new_unchecked(self.documents)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut words_document_ids = HashMap::new();
|
|
||||||
for id in idset.into_vec() {
|
|
||||||
if let Some(words) = docs_words.doc_words(id)? {
|
|
||||||
let mut stream = words.stream();
|
|
||||||
while let Some(word) = stream.next() {
|
|
||||||
let word = word.to_vec();
|
|
||||||
words_document_ids.entry(word).or_insert_with(Vec::new).push(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut removed_words = BTreeSet::new();
|
|
||||||
for (word, mut document_ids) in words_document_ids {
|
|
||||||
document_ids.sort_unstable();
|
|
||||||
document_ids.dedup();
|
|
||||||
let document_ids = SetBuf::new_unchecked(document_ids);
|
|
||||||
|
|
||||||
if let Some(doc_indexes) = words.doc_indexes(&word)? {
|
|
||||||
let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id);
|
|
||||||
let doc_indexes = op.into_set_buf();
|
|
||||||
|
|
||||||
if !doc_indexes.is_empty() {
|
|
||||||
words.set_doc_indexes(&word, &doc_indexes)?;
|
|
||||||
} else {
|
|
||||||
words.del_doc_indexes(&word)?;
|
|
||||||
removed_words.insert(word);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for id in document_ids.into_vec() {
|
|
||||||
documents.del_all_document_fields(id)?;
|
|
||||||
docs_words.del_doc_words(id)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let removed_words = fst::Set::from_iter(removed_words).unwrap();
|
|
||||||
let words = match main.words_set()? {
|
|
||||||
Some(words_set) => {
|
|
||||||
let op = fst::set::OpBuilder::new()
|
|
||||||
.add(words_set.stream())
|
|
||||||
.add(removed_words.stream())
|
|
||||||
.difference();
|
|
||||||
|
|
||||||
let mut words_builder = SetBuilder::memory();
|
|
||||||
words_builder.extend_stream(op).unwrap();
|
|
||||||
words_builder
|
|
||||||
.into_inner()
|
|
||||||
.and_then(fst::Set::from_bytes)
|
|
||||||
.unwrap()
|
|
||||||
},
|
|
||||||
None => fst::Set::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
main.set_words_set(&words)?;
|
|
||||||
|
|
||||||
// TODO must update the ranked_map too!
|
|
||||||
|
|
||||||
// update the "consistent" view of the Index
|
|
||||||
let ranked_map = lease_inner.ranked_map.clone();
|
|
||||||
let schema = lease_inner.schema.clone();
|
|
||||||
let raw = lease_inner.raw.clone();
|
|
||||||
|
|
||||||
let inner = InnerIndex { words, schema, ranked_map, raw };
|
|
||||||
self.inner.0.store(Arc::new(inner));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Extend<DocumentId> for DocumentsDeletion<'a> {
|
|
||||||
fn extend<T: IntoIterator<Item=DocumentId>>(&mut self, iter: T) {
|
|
||||||
self.documents.extend(iter)
|
|
||||||
}
|
|
||||||
}
|
|
33
meilidb-data/src/database/docs_words_index.rs
Normal file
33
meilidb-data/src/database/docs_words_index.rs
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use meilidb_core::DocumentId;
|
||||||
|
use super::Error;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DocsWordsIndex(pub Arc<sled::Tree>);
|
||||||
|
|
||||||
|
impl DocsWordsIndex {
|
||||||
|
pub fn doc_words(&self, id: DocumentId) -> Result<Option<fst::Set>, Error> {
|
||||||
|
let key = id.0.to_be_bytes();
|
||||||
|
match self.0.get(key)? {
|
||||||
|
Some(bytes) => {
|
||||||
|
let len = bytes.len();
|
||||||
|
let value = bytes.into();
|
||||||
|
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
|
||||||
|
Ok(Some(fst::Set::from(fst)))
|
||||||
|
},
|
||||||
|
None => Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_doc_words(&self, id: DocumentId, words: &fst::Set) -> Result<(), Error> {
|
||||||
|
let key = id.0.to_be_bytes();
|
||||||
|
self.0.set(key, words.as_fst().as_bytes())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn del_doc_words(&self, id: DocumentId) -> Result<(), Error> {
|
||||||
|
let key = id.0.to_be_bytes();
|
||||||
|
self.0.del(key)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
131
meilidb-data/src/database/documents_addition.rs
Normal file
131
meilidb-data/src/database/documents_addition.rs
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
use std::collections::HashSet;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use meilidb_core::DocumentId;
|
||||||
|
use fst::{SetBuilder, set::OpBuilder};
|
||||||
|
use sdset::{SetOperation, duo::Union};
|
||||||
|
|
||||||
|
use crate::indexer::Indexer;
|
||||||
|
use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
|
||||||
|
use crate::RankedMap;
|
||||||
|
|
||||||
|
use super::{Error, Index, InnerIndex, DocumentsDeletion};
|
||||||
|
|
||||||
|
pub struct DocumentsAddition<'a> {
|
||||||
|
inner: &'a Index,
|
||||||
|
document_ids: HashSet<DocumentId>,
|
||||||
|
document_store: RamDocumentStore,
|
||||||
|
indexer: Indexer,
|
||||||
|
ranked_map: RankedMap,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> DocumentsAddition<'a> {
|
||||||
|
pub(crate) fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> {
|
||||||
|
DocumentsAddition {
|
||||||
|
inner,
|
||||||
|
document_ids: HashSet::new(),
|
||||||
|
document_store: RamDocumentStore::new(),
|
||||||
|
indexer: Indexer::new(),
|
||||||
|
ranked_map,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_document<D>(&mut self, document: D) -> Result<(), Error>
|
||||||
|
where D: serde::Serialize,
|
||||||
|
{
|
||||||
|
let schema = &self.inner.lease_inner().schema;
|
||||||
|
let identifier = schema.identifier_name();
|
||||||
|
|
||||||
|
let document_id = match extract_document_id(identifier, &document)? {
|
||||||
|
Some(id) => id,
|
||||||
|
None => return Err(Error::MissingDocumentId),
|
||||||
|
};
|
||||||
|
|
||||||
|
// 1. store the document id for future deletion
|
||||||
|
self.document_ids.insert(document_id);
|
||||||
|
|
||||||
|
// 2. index the document fields in ram stores
|
||||||
|
let serializer = Serializer {
|
||||||
|
schema,
|
||||||
|
document_store: &mut self.document_store,
|
||||||
|
indexer: &mut self.indexer,
|
||||||
|
ranked_map: &mut self.ranked_map,
|
||||||
|
document_id,
|
||||||
|
};
|
||||||
|
|
||||||
|
document.serialize(serializer)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finalize(self) -> Result<(), Error> {
|
||||||
|
let lease_inner = self.inner.lease_inner();
|
||||||
|
let main = &lease_inner.raw.main;
|
||||||
|
let words = &lease_inner.raw.words;
|
||||||
|
let docs_words = &lease_inner.raw.docs_words;
|
||||||
|
let documents = &lease_inner.raw.documents;
|
||||||
|
|
||||||
|
// 1. remove the previous documents match indexes
|
||||||
|
let mut documents_deletion = DocumentsDeletion::new(self.inner);
|
||||||
|
documents_deletion.extend(self.document_ids);
|
||||||
|
documents_deletion.finalize()?;
|
||||||
|
|
||||||
|
// 2. insert new document attributes in the database
|
||||||
|
for ((id, attr), value) in self.document_store.into_inner() {
|
||||||
|
documents.set_document_field(id, attr, value)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let indexed = self.indexer.build();
|
||||||
|
let mut delta_words_builder = SetBuilder::memory();
|
||||||
|
|
||||||
|
for (word, delta_set) in indexed.words_doc_indexes {
|
||||||
|
delta_words_builder.insert(&word).unwrap();
|
||||||
|
|
||||||
|
let set = match words.doc_indexes(&word)? {
|
||||||
|
Some(set) => Union::new(&set, &delta_set).into_set_buf(),
|
||||||
|
None => delta_set,
|
||||||
|
};
|
||||||
|
|
||||||
|
words.set_doc_indexes(&word, &set)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (id, words) in indexed.docs_words {
|
||||||
|
docs_words.set_doc_words(id, &words)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let delta_words = delta_words_builder
|
||||||
|
.into_inner()
|
||||||
|
.and_then(fst::Set::from_bytes)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let words = match main.words_set()? {
|
||||||
|
Some(words) => {
|
||||||
|
let op = OpBuilder::new()
|
||||||
|
.add(words.stream())
|
||||||
|
.add(delta_words.stream())
|
||||||
|
.r#union();
|
||||||
|
|
||||||
|
let mut words_builder = SetBuilder::memory();
|
||||||
|
words_builder.extend_stream(op).unwrap();
|
||||||
|
words_builder
|
||||||
|
.into_inner()
|
||||||
|
.and_then(fst::Set::from_bytes)
|
||||||
|
.unwrap()
|
||||||
|
},
|
||||||
|
None => delta_words,
|
||||||
|
};
|
||||||
|
|
||||||
|
main.set_words_set(&words)?;
|
||||||
|
main.set_ranked_map(&self.ranked_map)?;
|
||||||
|
|
||||||
|
// update the "consistent" view of the Index
|
||||||
|
let ranked_map = self.ranked_map;
|
||||||
|
let schema = lease_inner.schema.clone();
|
||||||
|
let raw = lease_inner.raw.clone();
|
||||||
|
|
||||||
|
let inner = InnerIndex { words, schema, ranked_map, raw };
|
||||||
|
self.inner.0.store(Arc::new(inner));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
110
meilidb-data/src/database/documents_deletion.rs
Normal file
110
meilidb-data/src/database/documents_deletion.rs
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
use std::collections::{HashMap, BTreeSet};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use sdset::{SetBuf, SetOperation, duo::DifferenceByKey};
|
||||||
|
use fst::{SetBuilder, Streamer};
|
||||||
|
use meilidb_core::DocumentId;
|
||||||
|
|
||||||
|
use super::{Index, Error, InnerIndex};
|
||||||
|
|
||||||
|
pub struct DocumentsDeletion<'a> {
|
||||||
|
inner: &'a Index,
|
||||||
|
documents: Vec<DocumentId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> DocumentsDeletion<'a> {
|
||||||
|
pub(crate) fn new(inner: &'a Index) -> DocumentsDeletion {
|
||||||
|
DocumentsDeletion { inner, documents: Vec::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete_document(&mut self, id: DocumentId) {
|
||||||
|
self.documents.push(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finalize(mut self) -> Result<(), Error> {
|
||||||
|
let lease_inner = self.inner.lease_inner();
|
||||||
|
let main = &lease_inner.raw.main;
|
||||||
|
let docs_words = &lease_inner.raw.docs_words;
|
||||||
|
let words = &lease_inner.raw.words;
|
||||||
|
let documents = &lease_inner.raw.documents;
|
||||||
|
|
||||||
|
let idset = {
|
||||||
|
self.documents.sort_unstable();
|
||||||
|
self.documents.dedup();
|
||||||
|
SetBuf::new_unchecked(self.documents)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut words_document_ids = HashMap::new();
|
||||||
|
for id in idset.into_vec() {
|
||||||
|
if let Some(words) = docs_words.doc_words(id)? {
|
||||||
|
let mut stream = words.stream();
|
||||||
|
while let Some(word) = stream.next() {
|
||||||
|
let word = word.to_vec();
|
||||||
|
words_document_ids.entry(word).or_insert_with(Vec::new).push(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut removed_words = BTreeSet::new();
|
||||||
|
for (word, mut document_ids) in words_document_ids {
|
||||||
|
document_ids.sort_unstable();
|
||||||
|
document_ids.dedup();
|
||||||
|
let document_ids = SetBuf::new_unchecked(document_ids);
|
||||||
|
|
||||||
|
if let Some(doc_indexes) = words.doc_indexes(&word)? {
|
||||||
|
let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id);
|
||||||
|
let doc_indexes = op.into_set_buf();
|
||||||
|
|
||||||
|
if !doc_indexes.is_empty() {
|
||||||
|
words.set_doc_indexes(&word, &doc_indexes)?;
|
||||||
|
} else {
|
||||||
|
words.del_doc_indexes(&word)?;
|
||||||
|
removed_words.insert(word);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for id in document_ids.into_vec() {
|
||||||
|
documents.del_all_document_fields(id)?;
|
||||||
|
docs_words.del_doc_words(id)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let removed_words = fst::Set::from_iter(removed_words).unwrap();
|
||||||
|
let words = match main.words_set()? {
|
||||||
|
Some(words_set) => {
|
||||||
|
let op = fst::set::OpBuilder::new()
|
||||||
|
.add(words_set.stream())
|
||||||
|
.add(removed_words.stream())
|
||||||
|
.difference();
|
||||||
|
|
||||||
|
let mut words_builder = SetBuilder::memory();
|
||||||
|
words_builder.extend_stream(op).unwrap();
|
||||||
|
words_builder
|
||||||
|
.into_inner()
|
||||||
|
.and_then(fst::Set::from_bytes)
|
||||||
|
.unwrap()
|
||||||
|
},
|
||||||
|
None => fst::Set::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
main.set_words_set(&words)?;
|
||||||
|
|
||||||
|
// TODO must update the ranked_map too!
|
||||||
|
|
||||||
|
// update the "consistent" view of the Index
|
||||||
|
let ranked_map = lease_inner.ranked_map.clone();
|
||||||
|
let schema = lease_inner.schema.clone();
|
||||||
|
let raw = lease_inner.raw.clone();
|
||||||
|
|
||||||
|
let inner = InnerIndex { words, schema, ranked_map, raw };
|
||||||
|
self.inner.0.store(Arc::new(inner));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Extend<DocumentId> for DocumentsDeletion<'a> {
|
||||||
|
fn extend<T: IntoIterator<Item=DocumentId>>(&mut self, iter: T) {
|
||||||
|
self.documents.extend(iter)
|
||||||
|
}
|
||||||
|
}
|
71
meilidb-data/src/database/documents_index.rs
Normal file
71
meilidb-data/src/database/documents_index.rs
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
use meilidb_core::DocumentId;
|
||||||
|
use sled::IVec;
|
||||||
|
|
||||||
|
use crate::document_attr_key::DocumentAttrKey;
|
||||||
|
use crate::schema::SchemaAttr;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DocumentsIndex(pub(crate) Arc<sled::Tree>);
|
||||||
|
|
||||||
|
impl DocumentsIndex {
|
||||||
|
pub fn document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<Option<IVec>> {
|
||||||
|
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
|
||||||
|
self.0.get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_document_field(&self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) -> sled::Result<()> {
|
||||||
|
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
|
||||||
|
self.0.set(key, value)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn del_document_field(&self, id: DocumentId, attr: SchemaAttr) -> sled::Result<()> {
|
||||||
|
let key = DocumentAttrKey::new(id, attr).to_be_bytes();
|
||||||
|
self.0.del(key)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn del_all_document_fields(&self, id: DocumentId) -> sled::Result<()> {
|
||||||
|
let start = DocumentAttrKey::new(id, SchemaAttr::min()).to_be_bytes();
|
||||||
|
let end = DocumentAttrKey::new(id, SchemaAttr::max()).to_be_bytes();
|
||||||
|
let document_attrs = self.0.range(start..=end).keys();
|
||||||
|
|
||||||
|
for key in document_attrs {
|
||||||
|
self.0.del(key?)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
|
||||||
|
let start = DocumentAttrKey::new(id, SchemaAttr::min());
|
||||||
|
let start = start.to_be_bytes();
|
||||||
|
|
||||||
|
let end = DocumentAttrKey::new(id, SchemaAttr::max());
|
||||||
|
let end = end.to_be_bytes();
|
||||||
|
|
||||||
|
DocumentFieldsIter(self.0.range(start..=end))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DocumentFieldsIter<'a>(sled::Iter<'a>);
|
||||||
|
|
||||||
|
impl<'a> Iterator for DocumentFieldsIter<'a> {
|
||||||
|
type Item = sled::Result<(SchemaAttr, IVec)>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
match self.0.next() {
|
||||||
|
Some(Ok((key, value))) => {
|
||||||
|
let slice: &[u8] = key.as_ref();
|
||||||
|
let array = slice.try_into().unwrap();
|
||||||
|
let key = DocumentAttrKey::from_be_bytes(array);
|
||||||
|
Some(Ok((key.attribute, value)))
|
||||||
|
},
|
||||||
|
Some(Err(e)) => Some(Err(e)),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
57
meilidb-data/src/database/error.rs
Normal file
57
meilidb-data/src/database/error.rs
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
use std::{error, fmt};
|
||||||
|
use crate::serde::SerializerError;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Error {
|
||||||
|
SchemaDiffer,
|
||||||
|
SchemaMissing,
|
||||||
|
WordIndexMissing,
|
||||||
|
MissingDocumentId,
|
||||||
|
SledError(sled::Error),
|
||||||
|
FstError(fst::Error),
|
||||||
|
BincodeError(bincode::Error),
|
||||||
|
SerializerError(SerializerError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<sled::Error> for Error {
|
||||||
|
fn from(error: sled::Error) -> Error {
|
||||||
|
Error::SledError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<fst::Error> for Error {
|
||||||
|
fn from(error: fst::Error) -> Error {
|
||||||
|
Error::FstError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<bincode::Error> for Error {
|
||||||
|
fn from(error: bincode::Error) -> Error {
|
||||||
|
Error::BincodeError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SerializerError> for Error {
|
||||||
|
fn from(error: SerializerError) -> Error {
|
||||||
|
Error::SerializerError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Error {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
use self::Error::*;
|
||||||
|
match self {
|
||||||
|
SchemaDiffer => write!(f, "schemas differ"),
|
||||||
|
SchemaMissing => write!(f, "this index does not have a schema"),
|
||||||
|
WordIndexMissing => write!(f, "this index does not have a word index"),
|
||||||
|
MissingDocumentId => write!(f, "document id is missing"),
|
||||||
|
SledError(e) => write!(f, "sled error; {}", e),
|
||||||
|
FstError(e) => write!(f, "fst error; {}", e),
|
||||||
|
BincodeError(e) => write!(f, "bincode error; {}", e),
|
||||||
|
SerializerError(e) => write!(f, "serializer error; {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl error::Error for Error { }
|
||||||
|
|
121
meilidb-data/src/database/index.rs
Normal file
121
meilidb-data/src/database/index.rs
Normal file
@ -0,0 +1,121 @@
|
|||||||
|
use sdset::SetBuf;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arc_swap::{ArcSwap, Lease};
|
||||||
|
use meilidb_core::criterion::Criteria;
|
||||||
|
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
|
||||||
|
use rmp_serde::decode::Error as RmpError;
|
||||||
|
use serde::de;
|
||||||
|
|
||||||
|
use crate::ranked_map::RankedMap;
|
||||||
|
use crate::schema::Schema;
|
||||||
|
use crate::serde::Deserializer;
|
||||||
|
|
||||||
|
use super::{Error, RawIndex, DocumentsAddition, DocumentsDeletion};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Index(pub(crate) ArcSwap<InnerIndex>);
|
||||||
|
|
||||||
|
pub struct InnerIndex {
|
||||||
|
pub words: fst::Set,
|
||||||
|
pub schema: Schema,
|
||||||
|
pub ranked_map: RankedMap,
|
||||||
|
pub raw: RawIndex, // TODO this will be a snapshot in the future
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Index {
|
||||||
|
pub(crate) fn from_raw(raw: RawIndex) -> Result<Index, Error> {
|
||||||
|
let words = match raw.main.words_set()? {
|
||||||
|
Some(words) => words,
|
||||||
|
None => fst::Set::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let schema = match raw.main.schema()? {
|
||||||
|
Some(schema) => schema,
|
||||||
|
None => return Err(Error::SchemaMissing),
|
||||||
|
};
|
||||||
|
|
||||||
|
let ranked_map = match raw.main.ranked_map()? {
|
||||||
|
Some(map) => map,
|
||||||
|
None => RankedMap::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let inner = InnerIndex { words, schema, ranked_map, raw };
|
||||||
|
let index = Index(ArcSwap::new(Arc::new(inner)));
|
||||||
|
|
||||||
|
Ok(index)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query_builder(&self) -> QueryBuilder<IndexLease> {
|
||||||
|
let lease = IndexLease(self.0.lease());
|
||||||
|
QueryBuilder::new(lease)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query_builder_with_criteria<'c>(
|
||||||
|
&self,
|
||||||
|
criteria: Criteria<'c>,
|
||||||
|
) -> QueryBuilder<'c, IndexLease>
|
||||||
|
{
|
||||||
|
let lease = IndexLease(self.0.lease());
|
||||||
|
QueryBuilder::with_criteria(lease, criteria)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn lease_inner(&self) -> Lease<Arc<InnerIndex>> {
|
||||||
|
self.0.lease()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn schema(&self) -> Schema {
|
||||||
|
self.0.lease().schema.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn documents_addition(&self) -> DocumentsAddition {
|
||||||
|
let ranked_map = self.0.lease().ranked_map.clone();
|
||||||
|
DocumentsAddition::new(self, ranked_map)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn documents_deletion(&self) -> DocumentsDeletion {
|
||||||
|
DocumentsDeletion::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn document<T>(
|
||||||
|
&self,
|
||||||
|
fields: Option<&HashSet<&str>>,
|
||||||
|
id: DocumentId,
|
||||||
|
) -> Result<Option<T>, RmpError>
|
||||||
|
where T: de::DeserializeOwned,
|
||||||
|
{
|
||||||
|
let schema = &self.lease_inner().schema;
|
||||||
|
let fields = fields
|
||||||
|
.map(|fields| {
|
||||||
|
fields
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|name| schema.attribute(name))
|
||||||
|
.collect()
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut deserializer = Deserializer {
|
||||||
|
document_id: id,
|
||||||
|
index: &self,
|
||||||
|
fields: fields.as_ref(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: currently we return an error if all document fields are missing,
|
||||||
|
// returning None would have been better
|
||||||
|
T::deserialize(&mut deserializer).map(Some)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct IndexLease(Lease<Arc<InnerIndex>>);
|
||||||
|
|
||||||
|
impl Store for IndexLease {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn words(&self) -> Result<&fst::Set, Self::Error> {
|
||||||
|
Ok(&self.0.words)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn word_indexes(&self, word: &[u8]) -> Result<Option<SetBuf<DocIndex>>, Self::Error> {
|
||||||
|
Ok(self.0.raw.words.doc_indexes(word)?)
|
||||||
|
}
|
||||||
|
}
|
62
meilidb-data/src/database/main_index.rs
Normal file
62
meilidb-data/src/database/main_index.rs
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::ranked_map::RankedMap;
|
||||||
|
use crate::schema::Schema;
|
||||||
|
|
||||||
|
use super::Error;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MainIndex(pub(crate) Arc<sled::Tree>);
|
||||||
|
|
||||||
|
impl MainIndex {
|
||||||
|
pub fn schema(&self) -> Result<Option<Schema>, Error> {
|
||||||
|
match self.0.get("schema")? {
|
||||||
|
Some(bytes) => {
|
||||||
|
let schema = Schema::read_from_bin(bytes.as_ref())?;
|
||||||
|
Ok(Some(schema))
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_schema(&self, schema: &Schema) -> Result<(), Error> {
|
||||||
|
let mut bytes = Vec::new();
|
||||||
|
schema.write_to_bin(&mut bytes)?;
|
||||||
|
self.0.set("schema", bytes)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn words_set(&self) -> Result<Option<fst::Set>, Error> {
|
||||||
|
match self.0.get("words")? {
|
||||||
|
Some(bytes) => {
|
||||||
|
let len = bytes.len();
|
||||||
|
let value = bytes.into();
|
||||||
|
let fst = fst::raw::Fst::from_shared_bytes(value, 0, len)?;
|
||||||
|
Ok(Some(fst::Set::from(fst)))
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_words_set(&self, value: &fst::Set) -> Result<(), Error> {
|
||||||
|
self.0.set("words", value.as_fst().as_bytes())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ranked_map(&self) -> Result<Option<RankedMap>, Error> {
|
||||||
|
match self.0.get("ranked-map")? {
|
||||||
|
Some(bytes) => {
|
||||||
|
let ranked_map = RankedMap::read_from_bin(bytes.as_ref())?;
|
||||||
|
Ok(Some(ranked_map))
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_ranked_map(&self, value: &RankedMap) -> Result<(), Error> {
|
||||||
|
let mut bytes = Vec::new();
|
||||||
|
value.write_to_bin(&mut bytes)?;
|
||||||
|
self.0.set("ranked_map", bytes)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
161
meilidb-data/src/database/mod.rs
Normal file
161
meilidb-data/src/database/mod.rs
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::{HashSet, HashMap};
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
use crate::Schema;
|
||||||
|
|
||||||
|
mod docs_words_index;
|
||||||
|
mod documents_addition;
|
||||||
|
mod documents_deletion;
|
||||||
|
mod documents_index;
|
||||||
|
mod error;
|
||||||
|
mod index;
|
||||||
|
mod main_index;
|
||||||
|
mod raw_index;
|
||||||
|
mod words_index;
|
||||||
|
|
||||||
|
pub use self::error::Error;
|
||||||
|
pub use self::index::Index;
|
||||||
|
|
||||||
|
use self::docs_words_index::DocsWordsIndex;
|
||||||
|
use self::documents_addition::DocumentsAddition;
|
||||||
|
use self::documents_deletion::DocumentsDeletion;
|
||||||
|
use self::documents_index::DocumentsIndex;
|
||||||
|
use self::index::InnerIndex;
|
||||||
|
use self::main_index::MainIndex;
|
||||||
|
use self::raw_index::RawIndex;
|
||||||
|
use self::words_index::WordsIndex;
|
||||||
|
|
||||||
|
pub struct Database {
|
||||||
|
cache: RwLock<HashMap<String, Arc<Index>>>,
|
||||||
|
inner: sled::Db,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Database {
|
||||||
|
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
|
||||||
|
let cache = RwLock::new(HashMap::new());
|
||||||
|
let inner = sled::Db::start_default(path)?;
|
||||||
|
Ok(Database { cache, inner })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn indexes(&self) -> Result<Option<HashSet<String>>, Error> {
|
||||||
|
let bytes = match self.inner.get("indexes")? {
|
||||||
|
Some(bytes) => bytes,
|
||||||
|
None => return Ok(None),
|
||||||
|
};
|
||||||
|
|
||||||
|
let indexes = bincode::deserialize(&bytes)?;
|
||||||
|
Ok(Some(indexes))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_indexes(&self, value: &HashSet<String>) -> Result<(), Error> {
|
||||||
|
let bytes = bincode::serialize(value)?;
|
||||||
|
self.inner.set("indexes", bytes)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open_index(&self, name: &str) -> Result<Option<Arc<Index>>, Error> {
|
||||||
|
{
|
||||||
|
let cache = self.cache.read().unwrap();
|
||||||
|
if let Some(index) = cache.get(name).cloned() {
|
||||||
|
return Ok(Some(index))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut cache = self.cache.write().unwrap();
|
||||||
|
let index = match cache.entry(name.to_string()) {
|
||||||
|
Entry::Occupied(occupied) => {
|
||||||
|
occupied.get().clone()
|
||||||
|
},
|
||||||
|
Entry::Vacant(vacant) => {
|
||||||
|
if !self.indexes()?.map_or(false, |x| x.contains(name)) {
|
||||||
|
return Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
let main = {
|
||||||
|
let tree = self.inner.open_tree(name)?;
|
||||||
|
MainIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let words = {
|
||||||
|
let tree_name = format!("{}-words", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
WordsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let docs_words = {
|
||||||
|
let tree_name = format!("{}-docs-words", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
DocsWordsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let documents = {
|
||||||
|
let tree_name = format!("{}-documents", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
DocumentsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let raw_index = RawIndex { main, words, docs_words, documents };
|
||||||
|
let index = Index::from_raw(raw_index)?;
|
||||||
|
|
||||||
|
vacant.insert(Arc::new(index)).clone()
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Some(index))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_index(&self, name: &str, schema: Schema) -> Result<Arc<Index>, Error> {
|
||||||
|
let mut cache = self.cache.write().unwrap();
|
||||||
|
|
||||||
|
let index = match cache.entry(name.to_string()) {
|
||||||
|
Entry::Occupied(occupied) => {
|
||||||
|
occupied.get().clone()
|
||||||
|
},
|
||||||
|
Entry::Vacant(vacant) => {
|
||||||
|
let main = {
|
||||||
|
let tree = self.inner.open_tree(name)?;
|
||||||
|
MainIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(prev_schema) = main.schema()? {
|
||||||
|
if prev_schema != schema {
|
||||||
|
return Err(Error::SchemaDiffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
main.set_schema(&schema)?;
|
||||||
|
|
||||||
|
let words = {
|
||||||
|
let tree_name = format!("{}-words", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
WordsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let docs_words = {
|
||||||
|
let tree_name = format!("{}-docs-words", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
DocsWordsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let documents = {
|
||||||
|
let tree_name = format!("{}-documents", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
DocumentsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut indexes = self.indexes()?.unwrap_or_else(HashSet::new);
|
||||||
|
indexes.insert(name.to_string());
|
||||||
|
self.set_indexes(&indexes)?;
|
||||||
|
|
||||||
|
let raw_index = RawIndex { main, words, docs_words, documents };
|
||||||
|
let index = Index::from_raw(raw_index)?;
|
||||||
|
|
||||||
|
vacant.insert(Arc::new(index)).clone()
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(index)
|
||||||
|
}
|
||||||
|
}
|
9
meilidb-data/src/database/raw_index.rs
Normal file
9
meilidb-data/src/database/raw_index.rs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
use super::{MainIndex, WordsIndex, DocsWordsIndex, DocumentsIndex};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct RawIndex {
|
||||||
|
pub main: MainIndex,
|
||||||
|
pub words: WordsIndex,
|
||||||
|
pub docs_words: DocsWordsIndex,
|
||||||
|
pub documents: DocumentsIndex,
|
||||||
|
}
|
32
meilidb-data/src/database/words_index.rs
Normal file
32
meilidb-data/src/database/words_index.rs
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use meilidb_core::DocIndex;
|
||||||
|
use sdset::{Set, SetBuf};
|
||||||
|
use zerocopy::{LayoutVerified, AsBytes};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct WordsIndex(pub(crate) Arc<sled::Tree>);
|
||||||
|
|
||||||
|
impl WordsIndex {
|
||||||
|
pub fn doc_indexes(&self, word: &[u8]) -> sled::Result<Option<SetBuf<DocIndex>>> {
|
||||||
|
match self.0.get(word)? {
|
||||||
|
Some(bytes) => {
|
||||||
|
let layout = LayoutVerified::new_slice(bytes.as_ref()).expect("invalid layout");
|
||||||
|
let slice = layout.into_slice();
|
||||||
|
let setbuf = SetBuf::new_unchecked(slice.to_vec());
|
||||||
|
Ok(Some(setbuf))
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_doc_indexes(&self, word: &[u8], set: &Set<DocIndex>) -> sled::Result<()> {
|
||||||
|
self.0.set(word, set.as_bytes())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn del_doc_indexes(&self, word: &[u8]) -> sled::Result<()> {
|
||||||
|
self.0.del(word)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user