diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs index bf5f3575c..9661de629 100644 --- a/meilidb-data/src/database/index/mod.rs +++ b/meilidb-data/src/database/index/mod.rs @@ -1,6 +1,8 @@ use std::collections::{HashSet, BTreeMap}; +use std::convert::TryInto; use std::sync::Arc; use std::thread; +use std::time::{Duration, Instant}; use arc_swap::{ArcSwap, Guard}; use meilidb_core::criterion::Criteria; @@ -58,41 +60,79 @@ enum Update { SynonymsDeletion(BTreeMap>>), } +#[derive(Serialize, Deserialize)] +pub enum UpdateType { + DocumentsAddition { number: usize }, + DocumentsDeletion { number: usize }, + SynonymsAddition { number: usize }, + SynonymsDeletion { number: usize }, +} + +#[derive(Serialize, Deserialize)] +pub struct DetailedDuration { + main: Duration, +} + +#[derive(Serialize, Deserialize)] +pub struct UpdateStatus { + pub update_id: u64, + pub update_type: UpdateType, + pub result: Result<(), String>, + pub detailed_duration: DetailedDuration, +} + fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { thread::spawn(move || { loop { let subscription = index.updates_index.watch_prefix(vec![]); while let Some(result) = index.updates_index.iter().next() { let (key, _) = result.unwrap(); + let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap(); let updates = &index.updates_index; let results = &index.updates_results_index; + (updates, results).transaction(|(updates, results)| { let update = updates.remove(&key)?.unwrap(); - // this is an emulation of the try block (#31436) - let result: Result<(), Error> = (|| { - match rmp_serde::from_read_ref(&update)? { - UpdateOwned::DocumentsAddition(documents) => { - let ranked_map = index.cache.load().ranked_map.clone(); - apply_documents_addition(&index, ranked_map, documents)?; - }, - UpdateOwned::DocumentsDeletion(documents) => { - let ranked_map = index.cache.load().ranked_map.clone(); - apply_documents_deletion(&index, ranked_map, documents)?; - }, - UpdateOwned::SynonymsAddition(synonyms) => { - apply_synonyms_addition(&index, synonyms)?; - }, - UpdateOwned::SynonymsDeletion(synonyms) => { - apply_synonyms_deletion(&index, synonyms)?; - }, - } - Ok(()) - })(); + let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() { + UpdateOwned::DocumentsAddition(documents) => { + let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + let ranked_map = index.cache.load().ranked_map.clone(); + let start = Instant::now(); + let result = apply_documents_addition(&index, ranked_map, documents); + (update_type, result, start.elapsed()) + }, + UpdateOwned::DocumentsDeletion(documents) => { + let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + let ranked_map = index.cache.load().ranked_map.clone(); + let start = Instant::now(); + let result = apply_documents_deletion(&index, ranked_map, documents); + (update_type, result, start.elapsed()) + }, + UpdateOwned::SynonymsAddition(synonyms) => { + let update_type = UpdateType::SynonymsAddition { number: synonyms.len() }; + let start = Instant::now(); + let result = apply_synonyms_addition(&index, synonyms); + (update_type, result, start.elapsed()) + }, + UpdateOwned::SynonymsDeletion(synonyms) => { + let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() }; + let start = Instant::now(); + let result = apply_synonyms_deletion(&index, synonyms); + (update_type, result, start.elapsed()) + }, + }; - let result = result.map_err(|e| e.to_string()); - let value = bincode::serialize(&result).unwrap(); + let detailed_duration = DetailedDuration { main: duration }; + let status = UpdateStatus { + update_id, + update_type, + result: result.map_err(|e| e.to_string()), + detailed_duration, + }; + + let value = bincode::serialize(&status).unwrap(); results.insert(&key, value) }) .unwrap(); @@ -267,7 +307,7 @@ impl Index { pub fn update_status( &self, update_id: u64, - ) -> Result>, Error> + ) -> Result, Error> { let update_id = update_id.to_be_bytes(); match self.updates_results_index.get(update_id)? { @@ -282,7 +322,7 @@ impl Index { pub fn update_status_blocking( &self, update_id: u64, - ) -> Result, Error> + ) -> Result { let update_id_bytes = update_id.to_be_bytes().to_vec(); let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes); diff --git a/meilidb-data/tests/updates.rs b/meilidb-data/tests/updates.rs index 783664410..3a292c082 100644 --- a/meilidb-data/tests/updates.rs +++ b/meilidb-data/tests/updates.rs @@ -23,7 +23,7 @@ fn insert_delete_document() { addition.update_document(&doc1); let update_id = addition.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); - assert_eq!(status, Ok(())); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 1); @@ -33,7 +33,7 @@ fn insert_delete_document() { deletion.delete_document(&doc1).unwrap(); let update_id = deletion.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); - assert_eq!(status, Ok(())); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 0); @@ -54,7 +54,7 @@ fn replace_document() { addition.update_document(&doc1); let update_id = addition.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); - assert_eq!(status, Ok(())); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 1); @@ -64,7 +64,7 @@ fn replace_document() { deletion.update_document(&doc2); let update_id = deletion.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); - assert_eq!(status, Ok(())); + assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); assert_eq!(docs.len(), 0);