feat: Introduce the update_status/_blocking functions

This commit is contained in:
Clément Renault 2019-08-23 14:53:31 +02:00 committed by Clément Renault
parent f07b99fe97
commit cd8535d410
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 45 additions and 16 deletions

View File

@ -264,6 +264,43 @@ impl Index {
SynonymsDeletion::new(self) SynonymsDeletion::new(self)
} }
pub fn update_status(
&self,
update_id: u64,
) -> Result<Option<Result<(), String>>, Error>
{
let update_id = update_id.to_be_bytes();
match self.updates_results_index.get(update_id)? {
Some(value) => {
let value = bincode::deserialize(&value)?;
Ok(Some(value))
},
None => Ok(None),
}
}
pub fn update_status_blocking(
&self,
update_id: u64,
) -> Result<Result<(), String>, Error>
{
let update_id_bytes = update_id.to_be_bytes().to_vec();
let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes);
// if we find the update result return it now
if let Some(result) = self.update_status(update_id)? {
return Ok(result)
}
// this subscription is used to block the thread
// until the update_id is inserted in the tree
subscription.next();
// the thread has been unblocked, it means that the update result
// has been inserted in the tree, retrieve it
Ok(self.update_status(update_id)?.unwrap())
}
pub fn document<T>( pub fn document<T>(
&self, &self,
fields: Option<&HashSet<&str>>, fields: Option<&HashSet<&str>>,

View File

@ -22,10 +22,8 @@ fn insert_delete_document() {
let mut addition = index.documents_addition(); let mut addition = index.documents_addition();
addition.update_document(&doc1); addition.update_document(&doc1);
let update_id = addition.finalize().unwrap(); let update_id = addition.finalize().unwrap();
println!("addition update id: {}", update_id); let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
// TODO remove this and create a waitable function
std::thread::sleep(std::time::Duration::from_millis(100));
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 1); assert_eq!(docs.len(), 1);
@ -34,10 +32,8 @@ fn insert_delete_document() {
let mut deletion = index.documents_deletion(); let mut deletion = index.documents_deletion();
deletion.delete_document(&doc1).unwrap(); deletion.delete_document(&doc1).unwrap();
let update_id = deletion.finalize().unwrap(); let update_id = deletion.finalize().unwrap();
println!("deletion update id: {}", update_id); let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
// TODO remove this and create a waitable function
std::thread::sleep(std::time::Duration::from_millis(100));
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 0); assert_eq!(docs.len(), 0);
@ -57,10 +53,8 @@ fn replace_document() {
let mut addition = index.documents_addition(); let mut addition = index.documents_addition();
addition.update_document(&doc1); addition.update_document(&doc1);
let update_id = addition.finalize().unwrap(); let update_id = addition.finalize().unwrap();
println!("addition update id: {}", update_id); let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
// TODO remove this and create a waitable function
std::thread::sleep(std::time::Duration::from_millis(100));
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 1); assert_eq!(docs.len(), 1);
@ -69,10 +63,8 @@ fn replace_document() {
let mut deletion = index.documents_addition(); let mut deletion = index.documents_addition();
deletion.update_document(&doc2); deletion.update_document(&doc2);
let update_id = deletion.finalize().unwrap(); let update_id = deletion.finalize().unwrap();
println!("deletion update id: {}", update_id); let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
// TODO remove this and create a waitable function
std::thread::sleep(std::time::Duration::from_millis(100));
let docs = index.query_builder().query("hello", 0..10).unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 0); assert_eq!(docs.len(), 0);