feat: Introduce documents addition using the update system

This commit is contained in:
Clément Renault 2019-08-21 17:12:52 +02:00 committed by Clément Renault
parent 5a9e25c315
commit f83d6df4ef
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 186 additions and 192 deletions

View File

@ -18,7 +18,7 @@ sdset = "0.3.2"
serde = { version = "1.0.99", features = ["derive"] } serde = { version = "1.0.99", features = ["derive"] }
serde_json = "1.0.40" serde_json = "1.0.40"
siphasher = "0.3.0" siphasher = "0.3.0"
sled = "0.25.0" sled = "0.26.0"
zerocopy = "0.2.8" zerocopy = "0.2.8"
[dependencies.rmp-serde] [dependencies.rmp-serde]
@ -30,4 +30,4 @@ git = "https://github.com/Kerollmops/fst.git"
branch = "arc-byte-slice" branch = "arc-byte-slice"
[dev-dependencies] [dev-dependencies]
tempfile = "3.0.7" tempfile = "3.1.0"

View File

@ -12,7 +12,28 @@ use crate::RankedMap;
use super::{Error, Index, DocumentsDeletion}; use super::{Error, Index, DocumentsDeletion};
use super::index::Cache; use super::index::Cache;
pub struct DocumentsAddition<'a> { pub struct DocumentsAddition<'a, D> {
index: &'a Index,
documents: Vec<D>,
}
impl<'a, D> DocumentsAddition<'a, D> {
pub fn new(index: &'a Index) -> DocumentsAddition<'a, D> {
DocumentsAddition { index, documents: Vec::new() }
}
pub fn update_document(&mut self, document: D) {
self.documents.push(document);
}
pub fn finalize(self) -> Result<u64, Error>
where D: serde::Serialize
{
self.index.push_documents_addition(self.documents)
}
}
pub struct FinalDocumentsAddition<'a> {
inner: &'a Index, inner: &'a Index,
document_ids: HashSet<DocumentId>, document_ids: HashSet<DocumentId>,
document_store: RamDocumentStore, document_store: RamDocumentStore,
@ -20,9 +41,9 @@ pub struct DocumentsAddition<'a> {
ranked_map: RankedMap, ranked_map: RankedMap,
} }
impl<'a> DocumentsAddition<'a> { impl<'a> FinalDocumentsAddition<'a> {
pub fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> { pub fn new(inner: &'a Index, ranked_map: RankedMap) -> FinalDocumentsAddition<'a> {
DocumentsAddition { FinalDocumentsAddition {
inner, inner,
document_ids: HashSet::new(), document_ids: HashSet::new(),
document_store: RamDocumentStore::new(), document_store: RamDocumentStore::new(),

View File

@ -1,12 +1,15 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, Guard};
use meilidb_core::criterion::Criteria; use meilidb_core::criterion::Criteria;
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
use meilidb_schema::Schema; use meilidb_schema::Schema;
use sdset::SetBuf; use sdset::SetBuf;
use serde::de; use serde::{de, Serialize, Deserialize};
use sled::Transactional;
use crate::ranked_map::RankedMap; use crate::ranked_map::RankedMap;
use crate::serde::{Deserializer, DeserializerError}; use crate::serde::{Deserializer, DeserializerError};
@ -18,11 +21,11 @@ use self::docs_words_index::DocsWordsIndex;
use self::documents_index::DocumentsIndex; use self::documents_index::DocumentsIndex;
use self::main_index::MainIndex; use self::main_index::MainIndex;
use self::synonyms_index::SynonymsIndex; use self::synonyms_index::SynonymsIndex;
use self::updates_index::UpdatesIndex;
use self::words_index::WordsIndex; use self::words_index::WordsIndex;
use super::{ use super::{
DocumentsAddition, DocumentsDeletion, DocumentsAddition, FinalDocumentsAddition,
DocumentsDeletion,
SynonymsAddition, SynonymsDeletion, SynonymsAddition, SynonymsDeletion,
}; };
@ -31,9 +34,83 @@ mod docs_words_index;
mod documents_index; mod documents_index;
mod main_index; mod main_index;
mod synonyms_index; mod synonyms_index;
mod updates_index;
mod words_index; mod words_index;
fn event_is_set(event: &sled::Event) -> bool {
match event {
sled::Event::Set(_, _) => true,
_ => false,
}
}
#[derive(Deserialize)]
enum UpdateOwned {
DocumentsAddition(Vec<serde_json::Value>),
DocumentsDeletion( () /*DocumentsDeletion*/),
SynonymsAddition( () /*SynonymsAddition*/),
SynonymsDeletion( () /*SynonymsDeletion*/),
}
#[derive(Serialize)]
enum Update<D: serde::Serialize> {
DocumentsAddition(Vec<D>),
DocumentsDeletion( () /*DocumentsDeletion*/),
SynonymsAddition( () /*SynonymsAddition*/),
SynonymsDeletion( () /*SynonymsDeletion*/),
}
fn spawn_update_system(index: Index) -> thread::JoinHandle<()> {
thread::spawn(move || {
loop {
let subscription = index.updates_index.watch_prefix(vec![]);
while let Some(result) = index.updates_index.iter().next() {
let (key, _) = result.unwrap();
let updates = &index.updates_index;
let results = &index.updates_results_index;
(updates, results).transaction(|(updates, results)| {
let update = updates.remove(&key)?.unwrap();
let array_id = key.as_ref().try_into().unwrap();
let id = u64::from_be_bytes(array_id);
// this is an emulation of the try block (#31436)
let result: Result<(), Error> = (|| {
match bincode::deserialize(&update)? {
UpdateOwned::DocumentsAddition(documents) => {
let ranked_map = index.cache.load().ranked_map.clone();
let mut addition = FinalDocumentsAddition::new(&index, ranked_map);
for document in documents {
addition.update_document(document)?;
}
addition.finalize()?;
},
UpdateOwned::DocumentsDeletion(_) => {
// ...
},
UpdateOwned::SynonymsAddition(_) => {
// ...
},
UpdateOwned::SynonymsDeletion(_) => {
// ...
},
}
Ok(())
})();
let result = result.map_err(|e| e.to_string());
let value = bincode::serialize(&result).unwrap();
results.insert(&array_id, value)
})
.unwrap();
}
// this subscription is just used to block
// the loop until a new update is inserted
subscription.filter(event_is_set).next();
}
})
}
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct IndexStats { pub struct IndexStats {
pub number_of_words: usize, pub number_of_words: usize,
@ -52,7 +129,11 @@ pub struct Index {
docs_words_index: DocsWordsIndex, docs_words_index: DocsWordsIndex,
documents_index: DocumentsIndex, documents_index: DocumentsIndex,
custom_settings_index: CustomSettingsIndex, custom_settings_index: CustomSettingsIndex,
updates_index: UpdatesIndex,
// used by the update system
db: sled::Db,
updates_index: Arc<sled::Tree>,
updates_results_index: Arc<sled::Tree>,
} }
pub(crate) struct Cache { pub(crate) struct Cache {
@ -63,64 +144,23 @@ pub(crate) struct Cache {
} }
impl Index { impl Index {
pub fn new(db: &sled::Db, name: &str) -> Result<Index, Error> { pub fn new(db: sled::Db, name: &str) -> Result<Index, Error> {
let main_index = db.open_tree(name).map(MainIndex)?; Index::new_raw(db, name, None)
let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?;
let updates = db.open_tree(format!("{}-updates", name))?;
let updates_results = db.open_tree(format!("{}-updates-results", name))?;
let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results);
let words = match main_index.words_set()? {
Some(words) => Arc::new(words),
None => Arc::new(fst::Set::default()),
};
let synonyms = match main_index.synonyms_set()? {
Some(synonyms) => Arc::new(synonyms),
None => Arc::new(fst::Set::default()),
};
let schema = match main_index.schema()? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let ranked_map = match main_index.ranked_map()? {
Some(map) => map,
None => RankedMap::default(),
};
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = ArcSwap::from_pointee(cache);
Ok(Index {
cache,
main_index,
synonyms_index,
words_index,
docs_words_index,
documents_index,
custom_settings_index,
updates_index,
})
} }
pub fn with_schema(db: &sled::Db, name: &str, schema: Schema) -> Result<Index, Error> { pub fn with_schema(db: sled::Db, name: &str, schema: Schema) -> Result<Index, Error> {
Index::new_raw(db, name, Some(schema))
}
fn new_raw(db: sled::Db, name: &str, schema: Option<Schema>) -> Result<Index, Error> {
let main_index = db.open_tree(name).map(MainIndex)?; let main_index = db.open_tree(name).map(MainIndex)?;
let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?;
let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?;
let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?;
let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?;
let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?;
let updates_index = db.open_tree(format!("{}-updates", name))?;
let updates = db.open_tree(format!("{}-updates", name))?; let updates_results_index = db.open_tree(format!("{}-updates-results", name))?;
let updates_results = db.open_tree(format!("{}-updates-results", name))?;
let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results);
let words = match main_index.words_set()? { let words = match main_index.words_set()? {
Some(words) => Arc::new(words), Some(words) => Arc::new(words),
@ -132,12 +172,18 @@ impl Index {
None => Arc::new(fst::Set::default()), None => Arc::new(fst::Set::default()),
}; };
match main_index.schema()? { let schema = match (schema, main_index.schema()?) {
Some(current) => if current != schema { (Some(ref expected), Some(ref current)) if current != expected => {
return Err(Error::SchemaDiffer) return Err(Error::SchemaDiffer)
}, },
None => main_index.set_schema(&schema)?, (Some(expected), Some(_)) => expected,
} (Some(expected), None) => {
main_index.set_schema(&expected)?;
expected
},
(None, Some(current)) => current,
(None, None) => return Err(Error::SchemaMissing),
};
let ranked_map = match main_index.ranked_map()? { let ranked_map = match main_index.ranked_map()? {
Some(map) => map, Some(map) => map,
@ -147,7 +193,7 @@ impl Index {
let cache = Cache { words, synonyms, schema, ranked_map }; let cache = Cache { words, synonyms, schema, ranked_map };
let cache = ArcSwap::from_pointee(cache); let cache = ArcSwap::from_pointee(cache);
Ok(Index { let index = Index {
cache, cache,
main_index, main_index,
synonyms_index, synonyms_index,
@ -155,8 +201,14 @@ impl Index {
docs_words_index, docs_words_index,
documents_index, documents_index,
custom_settings_index, custom_settings_index,
db,
updates_index, updates_index,
}) updates_results_index,
};
let _handle = spawn_update_system(index.clone());
Ok(index)
} }
pub fn stats(&self) -> sled::Result<IndexStats> { pub fn stats(&self) -> sled::Result<IndexStats> {
@ -202,9 +254,8 @@ impl Index {
self.custom_settings_index.clone() self.custom_settings_index.clone()
} }
pub fn documents_addition(&self) -> DocumentsAddition { pub fn documents_addition<D>(&self) -> DocumentsAddition<D> {
let ranked_map = self.cache.load().ranked_map.clone(); DocumentsAddition::new(self)
DocumentsAddition::new(self, ranked_map)
} }
pub fn documents_deletion(&self) -> DocumentsDeletion { pub fn documents_deletion(&self) -> DocumentsDeletion {
@ -245,6 +296,40 @@ impl Index {
} }
} }
impl Index {
pub(crate) fn push_documents_addition<D>(&self, addition: Vec<D>) -> Result<u64, Error>
where D: serde::Serialize
{
let addition = Update::DocumentsAddition(addition);
let update = bincode::serialize(&addition)?;
self.raw_push_update(update)
}
pub(crate) fn push_documents_deletion(&self, deletion: DocumentsDeletion) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub(crate) fn push_synonyms_addition(&self, addition: SynonymsAddition) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub(crate) fn push_synonyms_deletion(&self, deletion: SynonymsDeletion) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
fn raw_push_update(&self, raw_update: Vec<u8>) -> Result<u64, Error> {
let update_id = self.db.generate_id()?;
let update_id_array = update_id.to_be_bytes();
self.updates_index.insert(update_id_array, raw_update)?;
Ok(update_id)
}
}
pub struct RefIndex<'a> { pub struct RefIndex<'a> {
pub(crate) cache: Guard<'static, Arc<Cache>>, pub(crate) cache: Guard<'static, Arc<Cache>>,
pub main_index: &'a MainIndex, pub main_index: &'a MainIndex,

View File

@ -1,110 +0,0 @@
use std::convert::TryInto;
use std::sync::Arc;
use std::thread;
use log::info;
use sled::Event;
use serde::{Serialize, Deserialize};
use super::Error;
use crate::database::{
DocumentsAddition, DocumentsDeletion, SynonymsAddition, SynonymsDeletion
};
fn event_is_set(event: &Event) -> bool {
match event {
Event::Set(_, _) => true,
_ => false,
}
}
#[derive(Serialize, Deserialize)]
enum Update {
DocumentsAddition( () /*DocumentsAddition*/),
DocumentsDeletion( () /*DocumentsDeletion*/),
SynonymsAddition( () /*SynonymsAddition*/),
SynonymsDeletion( () /*SynonymsDeletion*/),
}
#[derive(Clone)]
pub struct UpdatesIndex {
db: sled::Db,
updates: Arc<sled::Tree>,
results: Arc<sled::Tree>,
}
impl UpdatesIndex {
pub fn new(
db: sled::Db,
updates: Arc<sled::Tree>,
results: Arc<sled::Tree>,
) -> UpdatesIndex
{
let updates_clone = updates.clone();
let results_clone = results.clone();
let _handle = thread::spawn(move || {
loop {
let mut subscription = updates_clone.watch_prefix(vec![]);
while let Some((key, update)) = updates_clone.pop_min().unwrap() {
let array = key.as_ref().try_into().unwrap();
let id = u64::from_be_bytes(array);
match bincode::deserialize(&update).unwrap() {
Update::DocumentsAddition(_) => {
info!("processing the document addition (update number {})", id);
// ...
},
Update::DocumentsDeletion(_) => {
info!("processing the document deletion (update number {})", id);
// ...
},
Update::SynonymsAddition(_) => {
info!("processing the synonyms addition (update number {})", id);
// ...
},
Update::SynonymsDeletion(_) => {
info!("processing the synonyms deletion (update number {})", id);
// ...
},
}
}
// this subscription is just used to block
// the loop until a new update is inserted
subscription.filter(event_is_set).next();
}
});
UpdatesIndex { db, updates, results }
}
pub fn push_documents_addition(&self, addition: DocumentsAddition) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub fn push_documents_deletion(&self, deletion: DocumentsDeletion) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub fn push_synonyms_addition(&self, addition: SynonymsAddition) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
pub fn push_synonyms_deletion(&self, deletion: SynonymsDeletion) -> Result<u64, Error> {
let update = bincode::serialize(&())?;
self.raw_push_update(update)
}
fn raw_push_update(&self, raw_update: Vec<u8>) -> Result<u64, Error> {
let update_id = self.db.generate_id()?;
let update_id_array = update_id.to_be_bytes();
self.updates.insert(update_id_array, raw_update)?;
Ok(update_id)
}
}

View File

@ -15,7 +15,7 @@ mod synonyms_deletion;
pub use self::error::Error; pub use self::error::Error;
pub use self::index::{Index, CustomSettingsIndex}; pub use self::index::{Index, CustomSettingsIndex};
use self::documents_addition::DocumentsAddition; use self::documents_addition::{DocumentsAddition, FinalDocumentsAddition};
use self::documents_deletion::DocumentsDeletion; use self::documents_deletion::DocumentsDeletion;
use self::synonyms_addition::SynonymsAddition; use self::synonyms_addition::SynonymsAddition;
use self::synonyms_deletion::SynonymsDeletion; use self::synonyms_deletion::SynonymsDeletion;
@ -75,7 +75,7 @@ impl Database {
return Ok(None) return Ok(None)
} }
let index = Index::new(&self.inner, name)?; let index = Index::new(self.inner.clone(), name)?;
vacant.insert(index).clone() vacant.insert(index).clone()
}, },
}; };
@ -91,7 +91,7 @@ impl Database {
occupied.get().clone() occupied.get().clone()
}, },
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
let index = Index::with_schema(&self.inner, name, schema)?; let index = Index::with_schema(self.inner.clone(), name, schema)?;
let mut indexes = self.indexes()?; let mut indexes = self.indexes()?;
indexes.insert(name.to_string()); indexes.insert(name.to_string());

View File

@ -20,7 +20,7 @@ fn insert_delete_document() {
let doc1 = json!({ "objectId": 123, "title": "hello" }); let doc1 = json!({ "objectId": 123, "title": "hello" });
let mut addition = index.documents_addition(); let mut addition = index.documents_addition();
addition.update_document(&doc1).unwrap(); addition.update_document(&doc1);
addition.finalize().unwrap(); addition.finalize().unwrap();
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
@ -28,7 +28,7 @@ fn insert_delete_document() {
assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1)); assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1));
let mut deletion = index.documents_deletion(); let mut deletion = index.documents_deletion();
deletion.delete_document(&doc1).unwrap(); deletion.delete_document(&doc1);
deletion.finalize().unwrap(); deletion.finalize().unwrap();
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
@ -47,7 +47,7 @@ fn replace_document() {
let doc2 = json!({ "objectId": 123, "title": "coucou" }); let doc2 = json!({ "objectId": 123, "title": "coucou" });
let mut addition = index.documents_addition(); let mut addition = index.documents_addition();
addition.update_document(&doc1).unwrap(); addition.update_document(&doc1);
addition.finalize().unwrap(); addition.finalize().unwrap();
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
@ -55,7 +55,7 @@ fn replace_document() {
assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1)); assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1));
let mut deletion = index.documents_addition(); let mut deletion = index.documents_addition();
deletion.update_document(&doc2).unwrap(); deletion.update_document(&doc2);
deletion.finalize().unwrap(); deletion.finalize().unwrap();
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();

View File

@ -6,7 +6,6 @@ use std::io::{self, BufRead, BufReader};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Instant; use std::time::Instant;
use std::error::Error; use std::error::Error;
use std::borrow::Cow;
use std::fs::File; use std::fs::File;
use diskus::Walk; use diskus::Walk;
@ -44,9 +43,8 @@ pub struct Opt {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct Document<'a> ( struct Document (
#[serde(borrow)] HashMap<String, String>
HashMap<Cow<'a, str>, Cow<'a, str>>
); );
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -138,7 +136,7 @@ fn index(
} }
}; };
update.update_document(&document)?; update.update_document(document);
print!("\rindexing document {}", i); print!("\rindexing document {}", i);
i += 1; i += 1;