mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 13:24:27 +01:00
feat: add a method to get the current processed update id & next updates in queue
This commit is contained in:
parent
57dd679026
commit
0b5b7b0bf1
@ -359,6 +359,24 @@ impl Index {
|
|||||||
SynonymsDeletion::new(self)
|
SynonymsDeletion::new(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn current_update_id(&self) -> Result<Option<u64>, Error> {
|
||||||
|
if let Some((key, _)) = self.updates_index.iter()?.next() {
|
||||||
|
return Ok(Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap()))
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn enqueued_updates_ids(&self) -> Result<Vec<u64>, Error> {
|
||||||
|
let mut updates = Vec::new();
|
||||||
|
|
||||||
|
for (key, _) in self.updates_index.iter()? {
|
||||||
|
let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap();
|
||||||
|
updates.push(update_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(updates)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn update_status(
|
pub fn update_status(
|
||||||
&self,
|
&self,
|
||||||
update_id: u64,
|
update_id: u64,
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
mod common;
|
mod common;
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@ -97,3 +97,52 @@ fn documents_ids() {
|
|||||||
let documents_ids_count = index.documents_ids().unwrap().count();
|
let documents_ids_count = index.documents_ids().unwrap().count();
|
||||||
assert_eq!(documents_ids_count, 3);
|
assert_eq!(documents_ids_count, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn current_update_id() {
|
||||||
|
let index = common::simple_index();
|
||||||
|
let update_id = Arc::new(AtomicU64::new(0));
|
||||||
|
|
||||||
|
let update_id_cloned = update_id.clone();
|
||||||
|
let index_cloned = index.clone();
|
||||||
|
index.set_update_callback(move |_| {
|
||||||
|
let current_update_id = index_cloned.current_update_id().unwrap().unwrap();
|
||||||
|
assert_eq!(current_update_id, update_id_cloned.load(Relaxed));
|
||||||
|
});
|
||||||
|
|
||||||
|
let doc1 = json!({ "objectId": 123, "title": "hello" });
|
||||||
|
let mut addition = index.documents_addition();
|
||||||
|
addition.update_document(&doc1);
|
||||||
|
update_id.store(addition.finalize().unwrap(), Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn nest_updates_in_queue() {
|
||||||
|
let index = common::simple_index();
|
||||||
|
|
||||||
|
index.set_update_callback(move |_| {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(15));
|
||||||
|
});
|
||||||
|
|
||||||
|
let doc1 = json!({ "objectId": 123, "title": "hello" });
|
||||||
|
let doc2 = json!({ "objectId": 456, "title": "world" });
|
||||||
|
let doc3 = json!({ "objectId": 789 });
|
||||||
|
|
||||||
|
let mut addition = index.documents_addition();
|
||||||
|
addition.update_document(&doc1);
|
||||||
|
let _ = addition.finalize().unwrap();
|
||||||
|
|
||||||
|
let mut addition = index.documents_addition();
|
||||||
|
addition.update_document(&doc2);
|
||||||
|
let _ = addition.finalize().unwrap();
|
||||||
|
|
||||||
|
let mut addition = index.documents_addition();
|
||||||
|
addition.update_document(&doc3);
|
||||||
|
let _ = addition.finalize().unwrap();
|
||||||
|
|
||||||
|
let should_have_in_queue_updates = vec![1, 2, 3];
|
||||||
|
|
||||||
|
let in_queue_updates = index.enqueued_updates_ids().unwrap();
|
||||||
|
assert_eq!(in_queue_updates, should_have_in_queue_updates);
|
||||||
|
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user