From e33cc89846ec4bc76c18574b83a51479e7909da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 26 Aug 2019 19:10:20 +0200 Subject: [PATCH] feat: Introduce update callbacks --- meilidb-data/src/database/index/mod.rs | 24 ++++++++++++++++++++---- meilidb-data/tests/updates.rs | 17 +++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs index 9661de629..769f0fd17 100644 --- a/meilidb-data/src/database/index/mod.rs +++ b/meilidb-data/src/database/index/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; -use arc_swap::{ArcSwap, Guard}; +use arc_swap::{ArcSwap, ArcSwapOption, Guard}; use meilidb_core::criterion::Criteria; use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; use meilidb_schema::Schema; @@ -60,7 +60,7 @@ enum Update { SynonymsDeletion(BTreeMap>>), } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub enum UpdateType { DocumentsAddition { number: usize }, DocumentsDeletion { number: usize }, @@ -68,12 +68,12 @@ pub enum UpdateType { SynonymsDeletion { number: usize }, } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct DetailedDuration { main: Duration, } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct UpdateStatus { pub update_id: u64, pub update_type: UpdateType, @@ -132,6 +132,10 @@ fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { detailed_duration, }; + if let Some(callback) = &*index.update_callback.load() { + (callback)(status.clone()); + } + let value = bincode::serialize(&status).unwrap(); results.insert(&key, value) }) @@ -168,6 +172,7 @@ pub struct Index { db: sled::Db, updates_index: Arc, updates_results_index: Arc, + update_callback: Arc>>, } pub(crate) struct Cache { @@ -238,6 +243,7 @@ impl Index { db, updates_index, updates_results_index, + update_callback: Arc::new(ArcSwapOption::empty()), }; let _handle = spawn_update_system(index.clone()); @@ -245,6 +251,16 @@ impl Index { Ok(index) } + pub fn set_update_callback(&self, callback: F) + where F: Fn(UpdateStatus) + Send + Sync + 'static + { + self.update_callback.store(Some(Arc::new(Box::new(callback)))); + } + + pub fn unset_update_callback(&self) { + self.update_callback.store(None); + } + pub fn stats(&self) -> sled::Result { let cache = self.cache.load(); Ok(IndexStats { diff --git a/meilidb-data/tests/updates.rs b/meilidb-data/tests/updates.rs index 3a292c082..ecdd07003 100644 --- a/meilidb-data/tests/updates.rs +++ b/meilidb-data/tests/updates.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; + use serde_json::json; use meilidb_data::Database; use meilidb_schema::{Schema, SchemaBuilder, DISPLAYED, INDEXED}; @@ -14,15 +17,21 @@ fn insert_delete_document() { let tmp_dir = tempfile::tempdir().unwrap(); let database = Database::open(&tmp_dir).unwrap(); + let as_been_updated = Arc::new(AtomicBool::new(false)); + let schema = simple_schema(); let index = database.create_index("hello", schema).unwrap(); + let as_been_updated_clone = as_been_updated.clone(); + index.set_update_callback(move |_| as_been_updated_clone.store(true, Relaxed)); + let doc1 = json!({ "objectId": 123, "title": "hello" }); let mut addition = index.documents_addition(); addition.update_document(&doc1); let update_id = addition.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); @@ -33,6 +42,7 @@ fn insert_delete_document() { deletion.delete_document(&doc1).unwrap(); let update_id = deletion.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); @@ -44,9 +54,14 @@ fn replace_document() { let tmp_dir = tempfile::tempdir().unwrap(); let database = Database::open(&tmp_dir).unwrap(); + let as_been_updated = Arc::new(AtomicBool::new(false)); + let schema = simple_schema(); let index = database.create_index("hello", schema).unwrap(); + let as_been_updated_clone = as_been_updated.clone(); + index.set_update_callback(move |_| as_been_updated_clone.store(true, Relaxed)); + let doc1 = json!({ "objectId": 123, "title": "hello" }); let doc2 = json!({ "objectId": 123, "title": "coucou" }); @@ -54,6 +69,7 @@ fn replace_document() { addition.update_document(&doc1); let update_id = addition.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap(); @@ -64,6 +80,7 @@ fn replace_document() { deletion.update_document(&doc2); let update_id = deletion.finalize().unwrap(); let status = index.update_status_blocking(update_id).unwrap(); + assert!(as_been_updated.swap(false, Relaxed)); assert!(status.result.is_ok()); let docs = index.query_builder().query("hello", 0..10).unwrap();