From f317a7a3221f2028cc1b1dea1f069f0f47c779a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 6 May 2019 16:25:49 +0200 Subject: [PATCH] feat: implement open/create_index on the Database type --- meilidb-data/src/database.rs | 306 +++++++++++------------------------ 1 file changed, 94 insertions(+), 212 deletions(-) diff --git a/meilidb-data/src/database.rs b/meilidb-data/src/database.rs index e2fb2db6a..a1608aab6 100644 --- a/meilidb-data/src/database.rs +++ b/meilidb-data/src/database.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::io::{self, Cursor, BufRead}; use std::iter::FromIterator; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::{error, fmt}; use arc_swap::{ArcSwap, Lease}; @@ -77,126 +77,122 @@ impl fmt::Display for Error { impl error::Error for Error { } -fn index_name(name: &str) -> Vec { - format!("index-{}", name).into_bytes() -} - -fn word_index_name(name: &str) -> Vec { - format!("word-index-{}", name).into_bytes() -} - -fn document_key(id: DocumentId, attr: SchemaAttr) -> Vec { - let DocumentId(document_id) = id; - let SchemaAttr(schema_attr) = attr; - - let mut bytes = Vec::new(); - bytes.extend_from_slice(b"document-"); - bytes.extend_from_slice(&document_id.to_be_bytes()[..]); - bytes.extend_from_slice(&schema_attr.to_be_bytes()[..]); - bytes -} - -trait CursorExt { - fn consume_if_eq(&mut self, needle: &[u8]) -> bool; -} - -impl> CursorExt for Cursor { - fn consume_if_eq(&mut self, needle: &[u8]) -> bool { - let position = self.position() as usize; - let slice = self.get_ref().as_ref(); - - if slice[position..].starts_with(needle) { - self.consume(needle.len()); - true - } else { - false - } - } -} - -fn extract_document_key(key: Vec) -> io::Result<(DocumentId, SchemaAttr)> { - let mut key = Cursor::new(key); - - if !key.consume_if_eq(b"document-") { - return Err(io::Error::from(io::ErrorKind::InvalidData)) - } - - let document_id = key.read_u64::().map(DocumentId)?; - let schema_attr = key.read_u16::().map(SchemaAttr)?; - - Ok((document_id, schema_attr)) -} - -#[derive(Clone)] pub struct Database { - opened: Arc>>, + cache: RwLock>>, inner: sled::Db, } impl Database { pub fn start_default>(path: P) -> Result { + let cache = RwLock::new(HashMap::new()); let inner = sled::Db::start_default(path)?; - let opened = Arc::new(ArcSwap::new(Arc::new(HashMap::new()))); - Ok(Database { opened, inner }) + Ok(Database { cache, inner }) } - pub fn open_index(&self, name: &str) -> Result, Error> { - // check if the index was already opened - if let Some(raw_index) = self.opened.lease().get(name) { - return Ok(Some(Index(raw_index.clone()))) + pub fn open_index(&self, name: &str) -> Result>, Error> { + { + let cache = self.cache.read().unwrap(); + if let Some(index) = cache.get(name).cloned() { + return Ok(Some(index)) + } } - let raw_name = index_name(name); - if self.inner.tree_names().into_iter().any(|tn| tn == raw_name) { - let tree = self.inner.open_tree(raw_name)?; - let word_index_tree = self.inner.open_tree(word_index_name(name))?; - let raw_index = RawIndex::from_raw(tree, word_index_tree)?; + let indexes: HashSet<&str> = match self.inner.get("indexes")? { + Some(bytes) => bincode::deserialize(&bytes)?, + None => return Ok(None), + }; - self.opened.rcu(|opened| { - let mut opened = HashMap::clone(opened); - opened.insert(name.to_string(), raw_index.clone()); - opened - }); - - return Ok(Some(Index(raw_index))) + if indexes.get(name).is_none() { + return Ok(None); } - Ok(None) + let main = { + let tree = self.inner.open_tree(name)?; + MainIndex(tree) + }; + + let words = { + let tree_name = format!("{}-words", name); + let tree = self.inner.open_tree(tree_name)?; + WordsIndex(tree) + }; + + let documents = { + let tree_name = format!("{}-documents", name); + let tree = self.inner.open_tree(tree_name)?; + DocumentsIndex(tree) + }; + + let raw_index = RawIndex { main, words, documents }; + let index = Arc::new(Index(raw_index)); + + { + let cache = self.cache.write().unwrap(); + cache.insert(name.to_string(), index.clone()); + } + + Ok(Some(index)) } - pub fn create_index(&self, name: String, schema: Schema) -> Result { - match self.open_index(&name)? { - Some(index) => { - if index.schema() != &schema { - return Err(Error::SchemaDiffer); - } - - Ok(index) - }, - None => { - let raw_name = index_name(&name); - let tree = self.inner.open_tree(raw_name)?; - let word_index_tree = self.inner.open_tree(word_index_name(&name))?; - let raw_index = RawIndex::new_from_raw(tree, word_index_tree, schema)?; - - self.opened.rcu(|opened| { - let mut opened = HashMap::clone(opened); - opened.insert(name.clone(), raw_index.clone()); - opened - }); - - Ok(Index(raw_index)) - }, + pub fn create_index(&self, name: &str, schema: Schema) -> Result, Error> { + { + let cache = self.cache.read().unwrap(); + if let Some(index) = cache.get(name).cloned() { + // TODO check if schemas are the same + return Ok(index) + } } + + let mut indexes: HashSet<&str> = match self.inner.get("indexes")? { + Some(bytes) => bincode::deserialize(&bytes)?, + None => HashSet::new(), + }; + + let new_insertion = indexes.insert(name); + + let main = { + let tree = self.inner.open_tree(name)?; + MainIndex(tree) + }; + + if let Some(prev_schema) = main.schema()? { + if prev_schema != schema { + return Err(Error::SchemaDiffer) + } + } + + let words = { + let tree_name = format!("{}-words", name); + let tree = self.inner.open_tree(tree_name)?; + WordsIndex(tree) + }; + + let documents = { + let tree_name = format!("{}-documents", name); + let tree = self.inner.open_tree(tree_name)?; + DocumentsIndex(tree) + }; + + let raw_index = RawIndex { main, words, documents }; + let index = Arc::new(Index(raw_index)); + + { + let cache = self.cache.write().unwrap(); + cache.insert(name.to_string(), index.clone()); + } + + Ok(index) } } -struct RawIndex2 { +#[derive(Clone)] +struct RawIndex { main: MainIndex, words: WordsIndex, documents: DocumentsIndex, } +#[derive(Clone)] struct MainIndex(Arc); impl MainIndex { @@ -233,6 +229,7 @@ impl MainIndex { } } +#[derive(Clone)] struct WordsIndex(Arc); impl WordsIndex { @@ -249,6 +246,7 @@ impl WordsIndex { } } +#[derive(Clone)] struct DocumentsIndex(Arc); impl DocumentsIndex { @@ -287,122 +285,6 @@ impl<'a> Iterator for DocumentFieldsIter<'a> { } } -#[derive(Clone)] -pub struct RawIndex { - schema: Schema, - word_index: Arc>, - ranked_map: Arc>, - inner: Arc, -} - -impl RawIndex { - fn from_raw(inner: Arc, word_index: Arc) -> Result { - let schema = { - let bytes = inner.get("schema")?; - let bytes = bytes.ok_or(Error::SchemaMissing)?; - Schema::read_from_bin(bytes.as_ref())? - }; - - let store = WordIndexTree(word_index); - let word_index = WordIndex::from_store(store)?; - let word_index = Arc::new(ArcSwap::new(Arc::new(word_index))); - - let ranked_map = { - let map = match inner.get("ranked-map")? { - Some(bytes) => RankedMap::read_from_bin(bytes.as_ref())?, - None => RankedMap::default(), - }; - - Arc::new(ArcSwap::new(Arc::new(map))) - }; - - Ok(RawIndex { schema, word_index, ranked_map, inner }) - } - - fn new_from_raw( - inner: Arc, - word_index: Arc, - schema: Schema, - ) -> Result - { - let mut schema_bytes = Vec::new(); - schema.write_to_bin(&mut schema_bytes)?; - inner.set("schema", schema_bytes)?; - - let store = WordIndexTree(word_index); - let word_index = WordIndex::from_store(store)?; - let word_index = Arc::new(ArcSwap::new(Arc::new(word_index))); - - let ranked_map = Arc::new(ArcSwap::new(Arc::new(RankedMap::default()))); - - Ok(RawIndex { schema, word_index, ranked_map, inner }) - } - - pub fn schema(&self) -> &Schema { - &self.schema - } - - pub fn word_index(&self) -> Lease> { - self.word_index.lease() - } - - pub fn ranked_map(&self) -> Lease> { - self.ranked_map.lease() - } - - pub fn update_word_index(&self, word_index: Arc) { - self.word_index.store(word_index) - } - - pub fn update_ranked_map(&self, ranked_map: Arc) -> sled::Result<()> { - let mut bytes = Vec::new(); - ranked_map.as_ref().write_to_bin(&mut bytes).unwrap(); - - self.inner.set("ranked-map", bytes).map(drop)?; - self.ranked_map.store(ranked_map); - - Ok(()) - } - - pub fn set_document_attribute( - &self, - id: DocumentId, - attr: SchemaAttr, - value: V, - ) -> Result, sled::Error> - where IVec: From, - { - let key = document_key(id, attr); - Ok(self.inner.set(key, value)?) - } - - pub fn get_document_attribute( - &self, - id: DocumentId, - attr: SchemaAttr - ) -> Result, sled::Error> - { - let key = document_key(id, attr); - Ok(self.inner.get(key)?) - } - - pub fn get_document_fields(&self, id: DocumentId) -> DocumentFieldsIter { - let start = document_key(id, SchemaAttr::min()); - let end = document_key(id, SchemaAttr::max()); - DocumentFieldsIter(self.inner.range(start..=end)) - } - - pub fn del_document_attribute( - &self, - id: DocumentId, - attr: SchemaAttr - ) -> Result, sled::Error> - { - let key = document_key(id, attr); - Ok(self.inner.del(key)?) - } -} - #[derive(Clone)] pub struct Index(RawIndex);