mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-24 12:17:29 +01:00
feat: implement open/create_index on the Database type
This commit is contained in:
parent
8434ecbb43
commit
f317a7a322
@ -3,7 +3,7 @@ use std::convert::TryInto;
|
|||||||
use std::io::{self, Cursor, BufRead};
|
use std::io::{self, Cursor, BufRead};
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, RwLock};
|
||||||
use std::{error, fmt};
|
use std::{error, fmt};
|
||||||
|
|
||||||
use arc_swap::{ArcSwap, Lease};
|
use arc_swap::{ArcSwap, Lease};
|
||||||
@ -77,126 +77,122 @@ impl fmt::Display for Error {
|
|||||||
|
|
||||||
impl error::Error for Error { }
|
impl error::Error for Error { }
|
||||||
|
|
||||||
fn index_name(name: &str) -> Vec<u8> {
|
|
||||||
format!("index-{}", name).into_bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn word_index_name(name: &str) -> Vec<u8> {
|
|
||||||
format!("word-index-{}", name).into_bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn document_key(id: DocumentId, attr: SchemaAttr) -> Vec<u8> {
|
|
||||||
let DocumentId(document_id) = id;
|
|
||||||
let SchemaAttr(schema_attr) = attr;
|
|
||||||
|
|
||||||
let mut bytes = Vec::new();
|
|
||||||
bytes.extend_from_slice(b"document-");
|
|
||||||
bytes.extend_from_slice(&document_id.to_be_bytes()[..]);
|
|
||||||
bytes.extend_from_slice(&schema_attr.to_be_bytes()[..]);
|
|
||||||
bytes
|
|
||||||
}
|
|
||||||
|
|
||||||
trait CursorExt {
|
|
||||||
fn consume_if_eq(&mut self, needle: &[u8]) -> bool;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsRef<[u8]>> CursorExt for Cursor<T> {
|
|
||||||
fn consume_if_eq(&mut self, needle: &[u8]) -> bool {
|
|
||||||
let position = self.position() as usize;
|
|
||||||
let slice = self.get_ref().as_ref();
|
|
||||||
|
|
||||||
if slice[position..].starts_with(needle) {
|
|
||||||
self.consume(needle.len());
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_document_key(key: Vec<u8>) -> io::Result<(DocumentId, SchemaAttr)> {
|
|
||||||
let mut key = Cursor::new(key);
|
|
||||||
|
|
||||||
if !key.consume_if_eq(b"document-") {
|
|
||||||
return Err(io::Error::from(io::ErrorKind::InvalidData))
|
|
||||||
}
|
|
||||||
|
|
||||||
let document_id = key.read_u64::<BigEndian>().map(DocumentId)?;
|
|
||||||
let schema_attr = key.read_u16::<BigEndian>().map(SchemaAttr)?;
|
|
||||||
|
|
||||||
Ok((document_id, schema_attr))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
opened: Arc<ArcSwap<HashMap<String, RawIndex>>>,
|
cache: RwLock<HashMap<String, Arc<Index>>>,
|
||||||
inner: sled::Db,
|
inner: sled::Db,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
|
pub fn start_default<P: AsRef<Path>>(path: P) -> Result<Database, Error> {
|
||||||
|
let cache = RwLock::new(HashMap::new());
|
||||||
let inner = sled::Db::start_default(path)?;
|
let inner = sled::Db::start_default(path)?;
|
||||||
let opened = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
|
Ok(Database { cache, inner })
|
||||||
Ok(Database { opened, inner })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open_index(&self, name: &str) -> Result<Option<Index>, Error> {
|
pub fn open_index(&self, name: &str) -> Result<Option<Arc<Index>>, Error> {
|
||||||
// check if the index was already opened
|
{
|
||||||
if let Some(raw_index) = self.opened.lease().get(name) {
|
let cache = self.cache.read().unwrap();
|
||||||
return Ok(Some(Index(raw_index.clone())))
|
if let Some(index) = cache.get(name).cloned() {
|
||||||
|
return Ok(Some(index))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let raw_name = index_name(name);
|
let indexes: HashSet<&str> = match self.inner.get("indexes")? {
|
||||||
if self.inner.tree_names().into_iter().any(|tn| tn == raw_name) {
|
Some(bytes) => bincode::deserialize(&bytes)?,
|
||||||
let tree = self.inner.open_tree(raw_name)?;
|
None => return Ok(None),
|
||||||
let word_index_tree = self.inner.open_tree(word_index_name(name))?;
|
};
|
||||||
let raw_index = RawIndex::from_raw(tree, word_index_tree)?;
|
|
||||||
|
|
||||||
self.opened.rcu(|opened| {
|
if indexes.get(name).is_none() {
|
||||||
let mut opened = HashMap::clone(opened);
|
return Ok(None);
|
||||||
opened.insert(name.to_string(), raw_index.clone());
|
|
||||||
opened
|
|
||||||
});
|
|
||||||
|
|
||||||
return Ok(Some(Index(raw_index)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(None)
|
let main = {
|
||||||
|
let tree = self.inner.open_tree(name)?;
|
||||||
|
MainIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let words = {
|
||||||
|
let tree_name = format!("{}-words", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
WordsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let documents = {
|
||||||
|
let tree_name = format!("{}-documents", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
DocumentsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let raw_index = RawIndex { main, words, documents };
|
||||||
|
let index = Arc::new(Index(raw_index));
|
||||||
|
|
||||||
|
{
|
||||||
|
let cache = self.cache.write().unwrap();
|
||||||
|
cache.insert(name.to_string(), index.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_index(&self, name: String, schema: Schema) -> Result<Index, Error> {
|
Ok(Some(index))
|
||||||
match self.open_index(&name)? {
|
}
|
||||||
Some(index) => {
|
|
||||||
if index.schema() != &schema {
|
pub fn create_index(&self, name: &str, schema: Schema) -> Result<Arc<Index>, Error> {
|
||||||
return Err(Error::SchemaDiffer);
|
{
|
||||||
|
let cache = self.cache.read().unwrap();
|
||||||
|
if let Some(index) = cache.get(name).cloned() {
|
||||||
|
// TODO check if schemas are the same
|
||||||
|
return Ok(index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut indexes: HashSet<&str> = match self.inner.get("indexes")? {
|
||||||
|
Some(bytes) => bincode::deserialize(&bytes)?,
|
||||||
|
None => HashSet::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_insertion = indexes.insert(name);
|
||||||
|
|
||||||
|
let main = {
|
||||||
|
let tree = self.inner.open_tree(name)?;
|
||||||
|
MainIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(prev_schema) = main.schema()? {
|
||||||
|
if prev_schema != schema {
|
||||||
|
return Err(Error::SchemaDiffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let words = {
|
||||||
|
let tree_name = format!("{}-words", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
WordsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let documents = {
|
||||||
|
let tree_name = format!("{}-documents", name);
|
||||||
|
let tree = self.inner.open_tree(tree_name)?;
|
||||||
|
DocumentsIndex(tree)
|
||||||
|
};
|
||||||
|
|
||||||
|
let raw_index = RawIndex { main, words, documents };
|
||||||
|
let index = Arc::new(Index(raw_index));
|
||||||
|
|
||||||
|
{
|
||||||
|
let cache = self.cache.write().unwrap();
|
||||||
|
cache.insert(name.to_string(), index.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
},
|
|
||||||
None => {
|
|
||||||
let raw_name = index_name(&name);
|
|
||||||
let tree = self.inner.open_tree(raw_name)?;
|
|
||||||
let word_index_tree = self.inner.open_tree(word_index_name(&name))?;
|
|
||||||
let raw_index = RawIndex::new_from_raw(tree, word_index_tree, schema)?;
|
|
||||||
|
|
||||||
self.opened.rcu(|opened| {
|
|
||||||
let mut opened = HashMap::clone(opened);
|
|
||||||
opened.insert(name.clone(), raw_index.clone());
|
|
||||||
opened
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Index(raw_index))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RawIndex2 {
|
#[derive(Clone)]
|
||||||
|
struct RawIndex {
|
||||||
main: MainIndex,
|
main: MainIndex,
|
||||||
words: WordsIndex,
|
words: WordsIndex,
|
||||||
documents: DocumentsIndex,
|
documents: DocumentsIndex,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
struct MainIndex(Arc<sled::Tree>);
|
struct MainIndex(Arc<sled::Tree>);
|
||||||
|
|
||||||
impl MainIndex {
|
impl MainIndex {
|
||||||
@ -233,6 +229,7 @@ impl MainIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
struct WordsIndex(Arc<sled::Tree>);
|
struct WordsIndex(Arc<sled::Tree>);
|
||||||
|
|
||||||
impl WordsIndex {
|
impl WordsIndex {
|
||||||
@ -249,6 +246,7 @@ impl WordsIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
struct DocumentsIndex(Arc<sled::Tree>);
|
struct DocumentsIndex(Arc<sled::Tree>);
|
||||||
|
|
||||||
impl DocumentsIndex {
|
impl DocumentsIndex {
|
||||||
@ -287,122 +285,6 @@ impl<'a> Iterator for DocumentFieldsIter<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct RawIndex {
|
|
||||||
schema: Schema,
|
|
||||||
word_index: Arc<ArcSwap<WordIndex>>,
|
|
||||||
ranked_map: Arc<ArcSwap<RankedMap>>,
|
|
||||||
inner: Arc<sled::Tree>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RawIndex {
|
|
||||||
fn from_raw(inner: Arc<sled::Tree>, word_index: Arc<sled::Tree>) -> Result<RawIndex, Error> {
|
|
||||||
let schema = {
|
|
||||||
let bytes = inner.get("schema")?;
|
|
||||||
let bytes = bytes.ok_or(Error::SchemaMissing)?;
|
|
||||||
Schema::read_from_bin(bytes.as_ref())?
|
|
||||||
};
|
|
||||||
|
|
||||||
let store = WordIndexTree(word_index);
|
|
||||||
let word_index = WordIndex::from_store(store)?;
|
|
||||||
let word_index = Arc::new(ArcSwap::new(Arc::new(word_index)));
|
|
||||||
|
|
||||||
let ranked_map = {
|
|
||||||
let map = match inner.get("ranked-map")? {
|
|
||||||
Some(bytes) => RankedMap::read_from_bin(bytes.as_ref())?,
|
|
||||||
None => RankedMap::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Arc::new(ArcSwap::new(Arc::new(map)))
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(RawIndex { schema, word_index, ranked_map, inner })
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_from_raw(
|
|
||||||
inner: Arc<sled::Tree>,
|
|
||||||
word_index: Arc<sled::Tree>,
|
|
||||||
schema: Schema,
|
|
||||||
) -> Result<RawIndex, Error>
|
|
||||||
{
|
|
||||||
let mut schema_bytes = Vec::new();
|
|
||||||
schema.write_to_bin(&mut schema_bytes)?;
|
|
||||||
inner.set("schema", schema_bytes)?;
|
|
||||||
|
|
||||||
let store = WordIndexTree(word_index);
|
|
||||||
let word_index = WordIndex::from_store(store)?;
|
|
||||||
let word_index = Arc::new(ArcSwap::new(Arc::new(word_index)));
|
|
||||||
|
|
||||||
let ranked_map = Arc::new(ArcSwap::new(Arc::new(RankedMap::default())));
|
|
||||||
|
|
||||||
Ok(RawIndex { schema, word_index, ranked_map, inner })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn schema(&self) -> &Schema {
|
|
||||||
&self.schema
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn word_index(&self) -> Lease<Arc<WordIndex>> {
|
|
||||||
self.word_index.lease()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ranked_map(&self) -> Lease<Arc<RankedMap>> {
|
|
||||||
self.ranked_map.lease()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_word_index(&self, word_index: Arc<WordIndex>) {
|
|
||||||
self.word_index.store(word_index)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_ranked_map(&self, ranked_map: Arc<RankedMap>) -> sled::Result<()> {
|
|
||||||
let mut bytes = Vec::new();
|
|
||||||
ranked_map.as_ref().write_to_bin(&mut bytes).unwrap();
|
|
||||||
|
|
||||||
self.inner.set("ranked-map", bytes).map(drop)?;
|
|
||||||
self.ranked_map.store(ranked_map);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_document_attribute<V>(
|
|
||||||
&self,
|
|
||||||
id: DocumentId,
|
|
||||||
attr: SchemaAttr,
|
|
||||||
value: V,
|
|
||||||
) -> Result<Option<IVec>, sled::Error>
|
|
||||||
where IVec: From<V>,
|
|
||||||
{
|
|
||||||
let key = document_key(id, attr);
|
|
||||||
Ok(self.inner.set(key, value)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_document_attribute(
|
|
||||||
&self,
|
|
||||||
id: DocumentId,
|
|
||||||
attr: SchemaAttr
|
|
||||||
) -> Result<Option<IVec>, sled::Error>
|
|
||||||
{
|
|
||||||
let key = document_key(id, attr);
|
|
||||||
Ok(self.inner.get(key)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_document_fields(&self, id: DocumentId) -> DocumentFieldsIter {
|
|
||||||
let start = document_key(id, SchemaAttr::min());
|
|
||||||
let end = document_key(id, SchemaAttr::max());
|
|
||||||
DocumentFieldsIter(self.inner.range(start..=end))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn del_document_attribute(
|
|
||||||
&self,
|
|
||||||
id: DocumentId,
|
|
||||||
attr: SchemaAttr
|
|
||||||
) -> Result<Option<IVec>, sled::Error>
|
|
||||||
{
|
|
||||||
let key = document_key(id, attr);
|
|
||||||
Ok(self.inner.del(key)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Index(RawIndex);
|
pub struct Index(RawIndex);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user