From 6b326a45d75f6131cd7337b73bc4f44fcba85e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 31 Oct 2019 17:27:58 +0100 Subject: [PATCH 1/4] Fix the update system to always consume updates even if failing --- meilidb-core/Cargo.toml | 2 +- meilidb-core/src/database.rs | 63 +++++++++++++++++++++++++--------- meilidb-core/src/update/mod.rs | 15 +++----- meilidb-http/Cargo.toml | 2 +- 4 files changed, 52 insertions(+), 30 deletions(-) diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 35f31468d..751c1144b 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -12,7 +12,7 @@ crossbeam-channel = "0.3.9" deunicode = "1.0.0" env_logger = "0.7.0" hashbrown = { version = "0.6.0", features = ["serde"] } -heed = "0.3.0" +heed = "0.3.1" log = "0.4.8" meilidb-schema = { path = "../meilidb-schema", version = "0.6.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.6.0" } diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index fa5b6bfde..d2280dd48 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -28,34 +28,63 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc writer, Err(e) => { - error!("LMDB writer transaction begin failed: {}", e); + error!("LMDB write transaction begin failed: {}", e); break; } }; - match update::update_task(&mut writer, index.clone()) { - Ok(Some(status)) => { - match status.result { - Ok(_) => { - if let Err(e) = writer.commit() { - error!("update transaction failed: {}", e) - } - } - Err(_) => writer.abort(), - } - - if let Some(ref callback) = *update_fn.load() { - (callback)(status); - } - } - // no more updates to handle for now + let (update_id, update) = match index.updates.pop_front(&mut writer) { + Ok(Some(value)) => value, Ok(None) => { debug!("no more updates"); writer.abort(); break; } + Err(e) => { + error!("pop front update failed: {}", e); + break; + } + }; + + let mut nested_writer = match unsafe { env.nested_write_txn(&writer) } { + Ok(writer) => writer, + Err(e) => { + error!("LMDB nested write transaction begin failed: {}", e); + break; + } + }; + + match update::update_task(&mut nested_writer, index.clone(), update_id, update) { + Ok(status) => { + match &status.result { + Ok(_) => { + if let Err(e) = nested_writer.commit() { + error!("update nested transaction failed: {}", e); + } + } + Err(_) => nested_writer.abort(), + } + + let result = + index + .updates_results + .put_update_result(&mut writer, update_id, &status); + + if let Err(e) = result { + error!("update result store commit failed: {}", e); + } + + if let Err(e) = writer.commit() { + error!("update parent transaction failed: {}", e); + } + + if let Some(ref callback) = *update_fn.load() { + (callback)(status); + } + } Err(e) => { error!("update task failed: {}", e); + nested_writer.abort(); writer.abort() } } diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 360539fe2..64d85c088 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -152,12 +152,9 @@ pub fn next_update_id( pub fn update_task( writer: &mut heed::RwTxn, index: store::Index, -) -> MResult> { - let (update_id, update) = match index.updates.pop_front(writer)? { - Some(value) => value, - None => return Ok(None), - }; - + update_id: u64, + update: Update, +) -> MResult { debug!("Processing update number {}", update_id); let (update_type, result, duration) = match update { @@ -308,9 +305,5 @@ pub fn update_task( detailed_duration, }; - index - .updates_results - .put_update_result(writer, update_id, &status)?; - - Ok(Some(status)) + Ok(status) } diff --git a/meilidb-http/Cargo.toml b/meilidb-http/Cargo.toml index 12c1a5fc6..02c8fdcb4 100644 --- a/meilidb-http/Cargo.toml +++ b/meilidb-http/Cargo.toml @@ -13,7 +13,7 @@ chrono = { version = "0.4.9", features = ["serde"] } crossbeam-channel = "0.3.9" envconfig = "0.5.1" envconfig_derive = "0.5.1" -heed = "0.3.0" +heed = "0.3.1" http = "0.1.19" indexmap = { version = "1.3.0", features = ["serde-1"] } jemallocator = "0.3.2" From 2a50e08bb896a1ef2529439db2709b393227ee30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 4 Nov 2019 10:49:27 +0100 Subject: [PATCH 2/4] Moving to heed v0.5.0 --- meilidb-core/Cargo.toml | 2 +- meilidb-core/src/database.rs | 2 +- meilidb-core/src/serde/serializer.rs | 22 +++++++++---------- meilidb-core/src/update/documents_addition.rs | 4 ++-- meilidb-core/src/update/mod.rs | 4 ++-- meilidb-http/Cargo.toml | 2 +- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 751c1144b..e0d9a8fd1 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -12,7 +12,7 @@ crossbeam-channel = "0.3.9" deunicode = "1.0.0" env_logger = "0.7.0" hashbrown = { version = "0.6.0", features = ["serde"] } -heed = "0.3.1" +heed = "0.5.0" log = "0.4.8" meilidb-schema = { path = "../meilidb-schema", version = "0.6.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.6.0" } diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index d2280dd48..38f483f01 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -46,7 +46,7 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc writer, Err(e) => { error!("LMDB nested write transaction begin failed: {}", e); diff --git a/meilidb-core/src/serde/serializer.rs b/meilidb-core/src/serde/serializer.rs index 34f54d655..36c403390 100644 --- a/meilidb-core/src/serde/serializer.rs +++ b/meilidb-core/src/serde/serializer.rs @@ -7,8 +7,8 @@ use crate::{DocumentId, RankedMap}; use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError}; -pub struct Serializer<'a> { - pub txn: &'a mut heed::RwTxn, +pub struct Serializer<'a, 'b> { + pub txn: &'a mut heed::RwTxn<'b>, pub schema: &'a Schema, pub document_store: DocumentsFields, pub document_fields_counts: DocumentsFieldsCounts, @@ -17,15 +17,15 @@ pub struct Serializer<'a> { pub document_id: DocumentId, } -impl<'a> ser::Serializer for Serializer<'a> { +impl<'a, 'b> ser::Serializer for Serializer<'a, 'b> { type Ok = (); type Error = SerializerError; type SerializeSeq = ser::Impossible; type SerializeTuple = ser::Impossible; type SerializeTupleStruct = ser::Impossible; type SerializeTupleVariant = ser::Impossible; - type SerializeMap = MapSerializer<'a>; - type SerializeStruct = StructSerializer<'a>; + type SerializeMap = MapSerializer<'a, 'b>; + type SerializeStruct = StructSerializer<'a, 'b>; type SerializeStructVariant = ser::Impossible; forward_to_unserializable_type! { @@ -190,8 +190,8 @@ impl<'a> ser::Serializer for Serializer<'a> { } } -pub struct MapSerializer<'a> { - txn: &'a mut heed::RwTxn, +pub struct MapSerializer<'a, 'b> { + txn: &'a mut heed::RwTxn<'b>, schema: &'a Schema, document_id: DocumentId, document_store: DocumentsFields, @@ -201,7 +201,7 @@ pub struct MapSerializer<'a> { current_key_name: Option, } -impl<'a> ser::SerializeMap for MapSerializer<'a> { +impl<'a, 'b> ser::SerializeMap for MapSerializer<'a, 'b> { type Ok = (); type Error = SerializerError; @@ -253,8 +253,8 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> { } } -pub struct StructSerializer<'a> { - txn: &'a mut heed::RwTxn, +pub struct StructSerializer<'a, 'b> { + txn: &'a mut heed::RwTxn<'b>, schema: &'a Schema, document_id: DocumentId, document_store: DocumentsFields, @@ -263,7 +263,7 @@ pub struct StructSerializer<'a> { ranked_map: &'a mut RankedMap, } -impl<'a> ser::SerializeStruct for StructSerializer<'a> { +impl<'a, 'b> ser::SerializeStruct for StructSerializer<'a, 'b> { type Ok = (); type Error = SerializerError; diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 1806e3063..8aa5468d7 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -77,8 +77,8 @@ pub fn push_documents_addition( Ok(last_update_id) } -pub fn apply_documents_addition( - writer: &mut heed::RwTxn, +pub fn apply_documents_addition<'a, 'b>( + writer: &'a mut heed::RwTxn<'b>, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 64d85c088..52d8ab3d5 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -149,8 +149,8 @@ pub fn next_update_id( Ok(new_update_id) } -pub fn update_task( - writer: &mut heed::RwTxn, +pub fn update_task<'a, 'b>( + writer: &'a mut heed::RwTxn<'b>, index: store::Index, update_id: u64, update: Update, diff --git a/meilidb-http/Cargo.toml b/meilidb-http/Cargo.toml index 02c8fdcb4..3887bd775 100644 --- a/meilidb-http/Cargo.toml +++ b/meilidb-http/Cargo.toml @@ -13,7 +13,7 @@ chrono = { version = "0.4.9", features = ["serde"] } crossbeam-channel = "0.3.9" envconfig = "0.5.1" envconfig_derive = "0.5.1" -heed = "0.3.1" +heed = "0.5.0" http = "0.1.19" indexmap = { version = "1.3.0", features = ["serde-1"] } jemallocator = "0.3.2" From c2cc0704d70888924c26640da74e5f2ba0c1b94d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 4 Nov 2019 11:11:58 +0100 Subject: [PATCH 3/4] Clean up the update_awaiter function --- meilidb-core/src/database.rs | 90 ++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 50 deletions(-) diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 38f483f01..d7afd27b0 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -21,72 +21,62 @@ pub struct Database { indexes: RwLock, thread::JoinHandle<()>)>>, } +macro_rules! r#break_try { + ($expr:expr, $msg:tt) => { + match $expr { + core::result::Result::Ok(val) => val, + core::result::Result::Err(err) => { + log::error!(concat!($msg, ": {}"), err); + break; + } + } + }; +} + fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc, index: Index) { for () in receiver { // consume all updates in order (oldest first) loop { - let mut writer = match env.write_txn() { - Ok(writer) => writer, - Err(e) => { - error!("LMDB write transaction begin failed: {}", e); - break; - } - }; + // instantiate a main/parent transaction + let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed"); - let (update_id, update) = match index.updates.pop_front(&mut writer) { - Ok(Some(value)) => value, - Ok(None) => { + // retrieve the update that needs to be processed + let result = index.updates.pop_front(&mut writer); + let (update_id, update) = match break_try!(result, "pop front update failed") { + Some(value) => value, + None => { debug!("no more updates"); writer.abort(); break; } - Err(e) => { - error!("pop front update failed: {}", e); - break; - } }; - let mut nested_writer = match env.nested_write_txn(&mut writer) { - Ok(writer) => writer, - Err(e) => { - error!("LMDB nested write transaction begin failed: {}", e); - break; - } - }; + // instantiate a nested transaction + let result = env.nested_write_txn(&mut writer); + let mut nested_writer = break_try!(result, "LMDB nested write transaction failed"); - match update::update_task(&mut nested_writer, index.clone(), update_id, update) { - Ok(status) => { - match &status.result { - Ok(_) => { - if let Err(e) = nested_writer.commit() { - error!("update nested transaction failed: {}", e); - } - } - Err(_) => nested_writer.abort(), - } + // try to apply the update to the database using the nested transaction + let result = update::update_task(&mut nested_writer, index.clone(), update_id, update); + let status = break_try!(result, "update task failed"); - let result = - index - .updates_results - .put_update_result(&mut writer, update_id, &status); + // commit the nested transaction if the update was successful, abort it otherwise + if status.result.is_ok() { + break_try!(nested_writer.commit(), "commit nested transaction failed"); + } else { + nested_writer.abort() + } - if let Err(e) = result { - error!("update result store commit failed: {}", e); - } + // write the result of the update in the updates-results store + let updates_results = index.updates_results; + let result = updates_results.put_update_result(&mut writer, update_id, &status); - if let Err(e) = writer.commit() { - error!("update parent transaction failed: {}", e); - } + // always commit the main/parent transaction, even if the update was unsuccessful + break_try!(result, "update result store commit failed"); + break_try!(writer.commit(), "update parent transaction failed"); - if let Some(ref callback) = *update_fn.load() { - (callback)(status); - } - } - Err(e) => { - error!("update task failed: {}", e); - nested_writer.abort(); - writer.abort() - } + // call the user callback when the update and the result are written consistently + if let Some(ref callback) = *update_fn.load() { + (callback)(status); } } } From 1c3620a7d47e0fbaf7603d8cc69c568e53d31e3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 4 Nov 2019 13:18:07 +0100 Subject: [PATCH 4/4] Add tests to the update system --- meilidb-core/src/database.rs | 140 ++++++++++++++++++++++++++++++++++- 1 file changed, 139 insertions(+), 1 deletion(-) diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index d7afd27b0..25d23d164 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -7,7 +7,7 @@ use std::{fs, thread}; use crossbeam_channel::Receiver; use heed::types::{Str, Unit}; use heed::{CompactionOption, Result as ZResult}; -use log::{debug, error}; +use log::debug; use crate::{store, update, Index, MResult}; @@ -222,3 +222,141 @@ impl Database { self.common_store } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::update::{ProcessedUpdateResult, UpdateStatus}; + use std::sync::mpsc; + + #[test] + fn valid_updates() { + let dir = tempfile::tempdir().unwrap(); + + let database = Database::open_or_create(dir.path()).unwrap(); + let env = &database.env; + + let (sender, receiver) = mpsc::sync_channel(100); + let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let index = database.create_index("test").unwrap(); + + let done = database.set_update_callback("test", Box::new(update_fn)); + assert!(done, "could not set the index update function"); + + let schema = { + let data = r#" + identifier = "id" + + [attributes."name"] + displayed = true + indexed = true + + [attributes."description"] + displayed = true + indexed = true + "#; + toml::from_str(data).unwrap() + }; + + let mut writer = env.write_txn().unwrap(); + let _update_id = index.schema_update(&mut writer, schema).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + let mut additions = index.documents_addition(); + + let doc1 = serde_json::json!({ + "id": 123, + "name": "Marvin", + "description": "My name is Marvin", + }); + + let doc2 = serde_json::json!({ + "id": 234, + "name": "Kevin", + "description": "My name is Kevin", + }); + + additions.update_document(doc1); + additions.update_document(doc2); + + let mut writer = env.write_txn().unwrap(); + let update_id = additions.finalize(&mut writer).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + // block until the transaction is processed + let _ = receiver.into_iter().find(|id| *id == update_id); + + let reader = env.read_txn().unwrap(); + let result = index.update_status(&reader, update_id).unwrap(); + assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok()); + } + + #[test] + fn invalid_updates() { + let dir = tempfile::tempdir().unwrap(); + + let database = Database::open_or_create(dir.path()).unwrap(); + let env = &database.env; + + let (sender, receiver) = mpsc::sync_channel(100); + let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let index = database.create_index("test").unwrap(); + + let done = database.set_update_callback("test", Box::new(update_fn)); + assert!(done, "could not set the index update function"); + + let schema = { + let data = r#" + identifier = "id" + + [attributes."name"] + displayed = true + indexed = true + + [attributes."description"] + displayed = true + indexed = true + "#; + toml::from_str(data).unwrap() + }; + + let mut writer = env.write_txn().unwrap(); + let _update_id = index.schema_update(&mut writer, schema).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + let mut additions = index.documents_addition(); + + let doc1 = serde_json::json!({ + "id": 123, + "name": "Marvin", + "description": "My name is Marvin", + }); + + let doc2 = serde_json::json!({ + "name": "Kevin", + "description": "My name is Kevin", + }); + + additions.update_document(doc1); + additions.update_document(doc2); + + let mut writer = env.write_txn().unwrap(); + let update_id = additions.finalize(&mut writer).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + // block until the transaction is processed + let _ = receiver.into_iter().find(|id| *id == update_id); + + let reader = env.read_txn().unwrap(); + let result = index.update_status(&reader, update_id).unwrap(); + assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err()); + } +}