Let the caller commit/abort the operation

This commit is contained in:
Clément Renault 2019-10-07 10:52:45 +02:00
parent 88d0d3931c
commit 5139dc7f3e
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -63,7 +63,7 @@ pub fn update_status<T: rkv::Readable>(
} }
pub fn push_documents_addition<D: serde::Serialize>( pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer, mut writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,
addition: Vec<D>, addition: Vec<D>,
) -> Result<u64, Error> ) -> Result<u64, Error>
@ -76,7 +76,9 @@ pub fn push_documents_addition<D: serde::Serialize>(
} }
let update = Update::DocumentsAddition(values); let update = Update::DocumentsAddition(values);
Ok(updates_store.push_back(writer, &update)?) let update_id = updates_store.push_back(writer, &update)?;
Ok(update_id)
} }
pub fn push_documents_deletion( pub fn push_documents_deletion(
@ -86,35 +88,34 @@ pub fn push_documents_deletion(
) -> Result<u64, Error> ) -> Result<u64, Error>
{ {
let update = Update::DocumentsDeletion(deletion); let update = Update::DocumentsDeletion(deletion);
Ok(updates_store.push_back(writer, &update)?) let update_id = updates_store.push_back(writer, &update)?;
Ok(update_id)
} }
pub fn update_task( pub fn update_task(
rkv: Arc<RwLock<rkv::Rkv>>, writer: &mut rkv::Writer,
index: store::Index, index: store::Index,
mut callback: Option<impl FnOnce(UpdateResult)>, mut callback: Option<impl FnOnce(UpdateResult)>,
) -> MResult<()> ) -> MResult<()>
{ {
let rkv = rkv.read().unwrap(); if let Some((update_id, update)) = index.updates.pop_back(writer)? {
let mut writer = rkv.write()?;
if let Some((update_id, update)) = index.updates.pop_back(&mut writer)? {
let (update_type, result, duration) = match update { let (update_type, result, duration) = match update {
Update::DocumentsAddition(documents) => { Update::DocumentsAddition(documents) => {
let update_type = UpdateType::DocumentsAddition { number: documents.len() }; let update_type = UpdateType::DocumentsAddition { number: documents.len() };
let schema = match index.main.schema(&writer)? { let schema = match index.main.schema(writer)? {
Some(schema) => schema, Some(schema) => schema,
None => return Err(Error::SchemaMissing), None => return Err(Error::SchemaMissing),
}; };
let ranked_map = match index.main.ranked_map(&writer)? { let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map, Some(ranked_map) => ranked_map,
None => RankedMap::default(), None => RankedMap::default(),
}; };
let start = Instant::now(); let start = Instant::now();
let result = apply_documents_addition( let result = apply_documents_addition(
&mut writer, writer,
index.main, index.main,
index.documents_fields, index.documents_fields,
index.postings_lists, index.postings_lists,
@ -129,18 +130,18 @@ pub fn update_task(
Update::DocumentsDeletion(documents) => { Update::DocumentsDeletion(documents) => {
let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
let schema = match index.main.schema(&writer)? { let schema = match index.main.schema(writer)? {
Some(schema) => schema, Some(schema) => schema,
None => return Err(Error::SchemaMissing), None => return Err(Error::SchemaMissing),
}; };
let ranked_map = match index.main.ranked_map(&writer)? { let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map, Some(ranked_map) => ranked_map,
None => RankedMap::default(), None => RankedMap::default(),
}; };
let start = Instant::now(); let start = Instant::now();
let result = apply_documents_deletion( let result = apply_documents_deletion(
&mut writer, writer,
index.main, index.main,
index.documents_fields, index.documents_fields,
index.postings_lists, index.postings_lists,
@ -162,14 +163,12 @@ pub fn update_task(
detailed_duration, detailed_duration,
}; };
index.updates_results.put_update_result(&mut writer, update_id, &status)?; index.updates_results.put_update_result(writer, update_id, &status)?;
if let Some(callback) = callback.take() { if let Some(callback) = callback.take() {
(callback)(status); (callback)(status);
} }
} }
writer.commit()?;
Ok(()) Ok(())
} }