feat: Introduce update callbacks

This commit is contained in:
Clément Renault 2019-08-26 19:10:20 +02:00
parent f40b373f9f
commit e33cc89846
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 37 additions and 4 deletions

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, ArcSwapOption, Guard};
use meilidb_core::criterion::Criteria; use meilidb_core::criterion::Criteria;
use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder};
use meilidb_schema::Schema; use meilidb_schema::Schema;
@ -60,7 +60,7 @@ enum Update {
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>), SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
} }
#[derive(Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub enum UpdateType { pub enum UpdateType {
DocumentsAddition { number: usize }, DocumentsAddition { number: usize },
DocumentsDeletion { number: usize }, DocumentsDeletion { number: usize },
@ -68,12 +68,12 @@ pub enum UpdateType {
SynonymsDeletion { number: usize }, SynonymsDeletion { number: usize },
} }
#[derive(Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct DetailedDuration { pub struct DetailedDuration {
main: Duration, main: Duration,
} }
#[derive(Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct UpdateStatus { pub struct UpdateStatus {
pub update_id: u64, pub update_id: u64,
pub update_type: UpdateType, pub update_type: UpdateType,
@ -132,6 +132,10 @@ fn spawn_update_system(index: Index) -> thread::JoinHandle<()> {
detailed_duration, detailed_duration,
}; };
if let Some(callback) = &*index.update_callback.load() {
(callback)(status.clone());
}
let value = bincode::serialize(&status).unwrap(); let value = bincode::serialize(&status).unwrap();
results.insert(&key, value) results.insert(&key, value)
}) })
@ -168,6 +172,7 @@ pub struct Index {
db: sled::Db, db: sled::Db,
updates_index: Arc<sled::Tree>, updates_index: Arc<sled::Tree>,
updates_results_index: Arc<sled::Tree>, updates_results_index: Arc<sled::Tree>,
update_callback: Arc<ArcSwapOption<Box<dyn Fn(UpdateStatus) + Send + Sync + 'static>>>,
} }
pub(crate) struct Cache { pub(crate) struct Cache {
@ -238,6 +243,7 @@ impl Index {
db, db,
updates_index, updates_index,
updates_results_index, updates_results_index,
update_callback: Arc::new(ArcSwapOption::empty()),
}; };
let _handle = spawn_update_system(index.clone()); let _handle = spawn_update_system(index.clone());
@ -245,6 +251,16 @@ impl Index {
Ok(index) Ok(index)
} }
pub fn set_update_callback<F>(&self, callback: F)
where F: Fn(UpdateStatus) + Send + Sync + 'static
{
self.update_callback.store(Some(Arc::new(Box::new(callback))));
}
pub fn unset_update_callback(&self) {
self.update_callback.store(None);
}
pub fn stats(&self) -> sled::Result<IndexStats> { pub fn stats(&self) -> sled::Result<IndexStats> {
let cache = self.cache.load(); let cache = self.cache.load();
Ok(IndexStats { Ok(IndexStats {

View File

@ -1,3 +1,6 @@
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Arc;
use serde_json::json; use serde_json::json;
use meilidb_data::Database; use meilidb_data::Database;
use meilidb_schema::{Schema, SchemaBuilder, DISPLAYED, INDEXED}; use meilidb_schema::{Schema, SchemaBuilder, DISPLAYED, INDEXED};
@ -14,15 +17,21 @@ fn insert_delete_document() {
let tmp_dir = tempfile::tempdir().unwrap(); let tmp_dir = tempfile::tempdir().unwrap();
let database = Database::open(&tmp_dir).unwrap(); let database = Database::open(&tmp_dir).unwrap();
let as_been_updated = Arc::new(AtomicBool::new(false));
let schema = simple_schema(); let schema = simple_schema();
let index = database.create_index("hello", schema).unwrap(); let index = database.create_index("hello", schema).unwrap();
let as_been_updated_clone = as_been_updated.clone();
index.set_update_callback(move |_| as_been_updated_clone.store(true, Relaxed));
let doc1 = json!({ "objectId": 123, "title": "hello" }); let doc1 = json!({ "objectId": 123, "title": "hello" });
let mut addition = index.documents_addition(); let mut addition = index.documents_addition();
addition.update_document(&doc1); addition.update_document(&doc1);
let update_id = addition.finalize().unwrap(); let update_id = addition.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap(); let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok()); assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
@ -33,6 +42,7 @@ fn insert_delete_document() {
deletion.delete_document(&doc1).unwrap(); deletion.delete_document(&doc1).unwrap();
let update_id = deletion.finalize().unwrap(); let update_id = deletion.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap(); let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok()); assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
@ -44,9 +54,14 @@ fn replace_document() {
let tmp_dir = tempfile::tempdir().unwrap(); let tmp_dir = tempfile::tempdir().unwrap();
let database = Database::open(&tmp_dir).unwrap(); let database = Database::open(&tmp_dir).unwrap();
let as_been_updated = Arc::new(AtomicBool::new(false));
let schema = simple_schema(); let schema = simple_schema();
let index = database.create_index("hello", schema).unwrap(); let index = database.create_index("hello", schema).unwrap();
let as_been_updated_clone = as_been_updated.clone();
index.set_update_callback(move |_| as_been_updated_clone.store(true, Relaxed));
let doc1 = json!({ "objectId": 123, "title": "hello" }); let doc1 = json!({ "objectId": 123, "title": "hello" });
let doc2 = json!({ "objectId": 123, "title": "coucou" }); let doc2 = json!({ "objectId": 123, "title": "coucou" });
@ -54,6 +69,7 @@ fn replace_document() {
addition.update_document(&doc1); addition.update_document(&doc1);
let update_id = addition.finalize().unwrap(); let update_id = addition.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap(); let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok()); assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
@ -64,6 +80,7 @@ fn replace_document() {
deletion.update_document(&doc2); deletion.update_document(&doc2);
let update_id = deletion.finalize().unwrap(); let update_id = deletion.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap(); let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok()); assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();