Consume updates in the order of insertion

This commit is contained in:
Clément Renault 2019-10-07 15:00:28 +02:00
parent 487411340a
commit 0615c5c52d
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 85 additions and 68 deletions

View File

@ -33,6 +33,23 @@ impl Updates {
Ok(Some((number, last_data))) Ok(Some((number, last_data)))
} }
fn first_update_id<'a>(
&self,
reader: &'a impl rkv::Readable,
) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError>
{
let mut iter = self.updates.iter_start(reader)?;
let (first_key, first_data) = match iter.next() {
Some(result) => result?,
None => return Ok(None),
};
let array = first_key.try_into().unwrap();
let number = u64::from_be_bytes(array);
Ok(Some((number, first_data)))
}
pub fn contains( pub fn contains(
&self, &self,
reader: &impl rkv::Readable, reader: &impl rkv::Readable,
@ -60,12 +77,12 @@ impl Updates {
Ok(last_update_id) Ok(last_update_id)
} }
pub fn pop_back( pub fn pop_front(
&self, &self,
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
) -> MResult<Option<(u64, Update)>> ) -> MResult<Option<(u64, Update)>>
{ {
let (last_id, last_data) = match self.last_update_id(writer)? { let (last_id, last_data) = match self.first_update_id(writer)? {
Some(entry) => entry, Some(entry) => entry,
None => return Ok(None), None => return Ok(None),
}; };

View File

@ -4,10 +4,7 @@ mod documents_deletion;
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use crate::{store, Error, MResult, DocumentId, RankedMap}; use crate::{store, Error, MResult, DocumentId, RankedMap};
@ -63,7 +60,7 @@ pub fn update_status<T: rkv::Readable>(
} }
pub fn push_documents_addition<D: serde::Serialize>( pub fn push_documents_addition<D: serde::Serialize>(
mut writer: &mut rkv::Writer, writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,
addition: Vec<D>, addition: Vec<D>,
) -> Result<u64, Error> ) -> Result<u64, Error>
@ -97,78 +94,81 @@ pub fn update_task(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
index: store::Index, index: store::Index,
mut callback: Option<impl FnOnce(UpdateResult)>, mut callback: Option<impl FnOnce(UpdateResult)>,
) -> MResult<()> ) -> MResult<bool>
{ {
if let Some((update_id, update)) = index.updates.pop_back(writer)? { let (update_id, update) = match index.updates.pop_front(writer)? {
let (update_type, result, duration) = match update { Some(value) => value,
Update::DocumentsAddition(documents) => { None => return Ok(false),
let update_type = UpdateType::DocumentsAddition { number: documents.len() }; };
let schema = match index.main.schema(writer)? { let (update_type, result, duration) = match update {
Some(schema) => schema, Update::DocumentsAddition(documents) => {
None => return Err(Error::SchemaMissing), let update_type = UpdateType::DocumentsAddition { number: documents.len() };
};
let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
let start = Instant::now(); let schema = match index.main.schema(writer)? {
let result = apply_documents_addition( Some(schema) => schema,
writer, None => return Err(Error::SchemaMissing),
index.main, };
index.documents_fields, let ranked_map = match index.main.ranked_map(writer)? {
index.postings_lists, Some(ranked_map) => ranked_map,
index.docs_words, None => RankedMap::default(),
&schema, };
ranked_map,
documents,
);
(update_type, result, start.elapsed()) let start = Instant::now();
}, let result = apply_documents_addition(
Update::DocumentsDeletion(documents) => { writer,
let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; index.main,
index.documents_fields,
index.postings_lists,
index.docs_words,
&schema,
ranked_map,
documents,
);
let schema = match index.main.schema(writer)? { (update_type, result, start.elapsed())
Some(schema) => schema, },
None => return Err(Error::SchemaMissing), Update::DocumentsDeletion(documents) => {
}; let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
let start = Instant::now(); let schema = match index.main.schema(writer)? {
let result = apply_documents_deletion( Some(schema) => schema,
writer, None => return Err(Error::SchemaMissing),
index.main, };
index.documents_fields, let ranked_map = match index.main.ranked_map(writer)? {
index.postings_lists, Some(ranked_map) => ranked_map,
index.docs_words, None => RankedMap::default(),
&schema, };
ranked_map,
documents,
);
(update_type, result, start.elapsed()) let start = Instant::now();
}, let result = apply_documents_deletion(
}; writer,
index.main,
index.documents_fields,
index.postings_lists,
index.docs_words,
&schema,
ranked_map,
documents,
);
let detailed_duration = DetailedDuration { main: duration }; (update_type, result, start.elapsed())
let status = UpdateResult { },
update_id, };
update_type,
result: result.map_err(|e| e.to_string()),
detailed_duration,
};
index.updates_results.put_update_result(writer, update_id, &status)?; let detailed_duration = DetailedDuration { main: duration };
let status = UpdateResult {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),
detailed_duration,
};
if let Some(callback) = callback.take() { index.updates_results.put_update_result(writer, update_id, &status)?;
(callback)(status);
} if let Some(callback) = callback.take() {
(callback)(status);
} }
Ok(()) Ok(true)
} }