From 0615c5c52db4d5191618a1691fdbdb45faa1a5e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 7 Oct 2019 15:00:28 +0200 Subject: [PATCH] Consume updates in the order of insertion --- meilidb-core/src/store/updates.rs | 21 ++++- meilidb-core/src/update/mod.rs | 132 +++++++++++++++--------------- 2 files changed, 85 insertions(+), 68 deletions(-) diff --git a/meilidb-core/src/store/updates.rs b/meilidb-core/src/store/updates.rs index bd734eb03..28ff2306a 100644 --- a/meilidb-core/src/store/updates.rs +++ b/meilidb-core/src/store/updates.rs @@ -33,6 +33,23 @@ impl Updates { Ok(Some((number, last_data))) } + fn first_update_id<'a>( + &self, + reader: &'a impl rkv::Readable, + ) -> Result>)>, rkv::StoreError> + { + let mut iter = self.updates.iter_start(reader)?; + let (first_key, first_data) = match iter.next() { + Some(result) => result?, + None => return Ok(None), + }; + + let array = first_key.try_into().unwrap(); + let number = u64::from_be_bytes(array); + + Ok(Some((number, first_data))) + } + pub fn contains( &self, reader: &impl rkv::Readable, @@ -60,12 +77,12 @@ impl Updates { Ok(last_update_id) } - pub fn pop_back( + pub fn pop_front( &self, writer: &mut rkv::Writer, ) -> MResult> { - let (last_id, last_data) = match self.last_update_id(writer)? { + let (last_id, last_data) = match self.first_update_id(writer)? { Some(entry) => entry, None => return Ok(None), }; diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index a395279ad..9bd929f0d 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -4,10 +4,7 @@ mod documents_deletion; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; -use std::collections::BTreeMap; -use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; - use serde::{Serialize, Deserialize}; use crate::{store, Error, MResult, DocumentId, RankedMap}; @@ -63,7 +60,7 @@ pub fn update_status( } pub fn push_documents_addition( - mut writer: &mut rkv::Writer, + writer: &mut rkv::Writer, updates_store: store::Updates, addition: Vec, ) -> Result @@ -97,78 +94,81 @@ pub fn update_task( writer: &mut rkv::Writer, index: store::Index, mut callback: Option, -) -> MResult<()> +) -> MResult { - if let Some((update_id, update)) = index.updates.pop_back(writer)? { - let (update_type, result, duration) = match update { - Update::DocumentsAddition(documents) => { - let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + let (update_id, update) = match index.updates.pop_front(writer)? { + Some(value) => value, + None => return Ok(false), + }; - let schema = match index.main.schema(writer)? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; - let ranked_map = match index.main.ranked_map(writer)? { - Some(ranked_map) => ranked_map, - None => RankedMap::default(), - }; + let (update_type, result, duration) = match update { + Update::DocumentsAddition(documents) => { + let update_type = UpdateType::DocumentsAddition { number: documents.len() }; - let start = Instant::now(); - let result = apply_documents_addition( - writer, - index.main, - index.documents_fields, - index.postings_lists, - index.docs_words, - &schema, - ranked_map, - documents, - ); + let schema = match index.main.schema(writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + let ranked_map = match index.main.ranked_map(writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; - (update_type, result, start.elapsed()) - }, - Update::DocumentsDeletion(documents) => { - let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + let start = Instant::now(); + let result = apply_documents_addition( + writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + &schema, + ranked_map, + documents, + ); - let schema = match index.main.schema(writer)? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; - let ranked_map = match index.main.ranked_map(writer)? { - Some(ranked_map) => ranked_map, - None => RankedMap::default(), - }; + (update_type, result, start.elapsed()) + }, + Update::DocumentsDeletion(documents) => { + let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; - let start = Instant::now(); - let result = apply_documents_deletion( - writer, - index.main, - index.documents_fields, - index.postings_lists, - index.docs_words, - &schema, - ranked_map, - documents, - ); + let schema = match index.main.schema(writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + let ranked_map = match index.main.ranked_map(writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; - (update_type, result, start.elapsed()) - }, - }; + let start = Instant::now(); + let result = apply_documents_deletion( + writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + &schema, + ranked_map, + documents, + ); - let detailed_duration = DetailedDuration { main: duration }; - let status = UpdateResult { - update_id, - update_type, - result: result.map_err(|e| e.to_string()), - detailed_duration, - }; + (update_type, result, start.elapsed()) + }, + }; - index.updates_results.put_update_result(writer, update_id, &status)?; + let detailed_duration = DetailedDuration { main: duration }; + let status = UpdateResult { + update_id, + update_type, + result: result.map_err(|e| e.to_string()), + detailed_duration, + }; - if let Some(callback) = callback.take() { - (callback)(status); - } + index.updates_results.put_update_result(writer, update_id, &status)?; + + if let Some(callback) = callback.take() { + (callback)(status); } - Ok(()) + Ok(true) }