diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 35f31468d..e0d9a8fd1 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -12,7 +12,7 @@ crossbeam-channel = "0.3.9" deunicode = "1.0.0" env_logger = "0.7.0" hashbrown = { version = "0.6.0", features = ["serde"] } -heed = "0.3.0" +heed = "0.5.0" log = "0.4.8" meilidb-schema = { path = "../meilidb-schema", version = "0.6.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.6.0" } diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index fa5b6bfde..25d23d164 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -7,7 +7,7 @@ use std::{fs, thread}; use crossbeam_channel::Receiver; use heed::types::{Str, Unit}; use heed::{CompactionOption, Result as ZResult}; -use log::{debug, error}; +use log::debug; use crate::{store, update, Index, MResult}; @@ -21,43 +21,62 @@ pub struct Database { indexes: RwLock, thread::JoinHandle<()>)>>, } +macro_rules! r#break_try { + ($expr:expr, $msg:tt) => { + match $expr { + core::result::Result::Ok(val) => val, + core::result::Result::Err(err) => { + log::error!(concat!($msg, ": {}"), err); + break; + } + } + }; +} + fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc, index: Index) { for () in receiver { // consume all updates in order (oldest first) loop { - let mut writer = match env.write_txn() { - Ok(writer) => writer, - Err(e) => { - error!("LMDB writer transaction begin failed: {}", e); - break; - } - }; + // instantiate a main/parent transaction + let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed"); - match update::update_task(&mut writer, index.clone()) { - Ok(Some(status)) => { - match status.result { - Ok(_) => { - if let Err(e) = writer.commit() { - error!("update transaction failed: {}", e) - } - } - Err(_) => writer.abort(), - } - - if let Some(ref callback) = *update_fn.load() { - (callback)(status); - } - } - // no more updates to handle for now - Ok(None) => { + // retrieve the update that needs to be processed + let result = index.updates.pop_front(&mut writer); + let (update_id, update) = match break_try!(result, "pop front update failed") { + Some(value) => value, + None => { debug!("no more updates"); writer.abort(); break; } - Err(e) => { - error!("update task failed: {}", e); - writer.abort() - } + }; + + // instantiate a nested transaction + let result = env.nested_write_txn(&mut writer); + let mut nested_writer = break_try!(result, "LMDB nested write transaction failed"); + + // try to apply the update to the database using the nested transaction + let result = update::update_task(&mut nested_writer, index.clone(), update_id, update); + let status = break_try!(result, "update task failed"); + + // commit the nested transaction if the update was successful, abort it otherwise + if status.result.is_ok() { + break_try!(nested_writer.commit(), "commit nested transaction failed"); + } else { + nested_writer.abort() + } + + // write the result of the update in the updates-results store + let updates_results = index.updates_results; + let result = updates_results.put_update_result(&mut writer, update_id, &status); + + // always commit the main/parent transaction, even if the update was unsuccessful + break_try!(result, "update result store commit failed"); + break_try!(writer.commit(), "update parent transaction failed"); + + // call the user callback when the update and the result are written consistently + if let Some(ref callback) = *update_fn.load() { + (callback)(status); } } } @@ -203,3 +222,141 @@ impl Database { self.common_store } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::update::{ProcessedUpdateResult, UpdateStatus}; + use std::sync::mpsc; + + #[test] + fn valid_updates() { + let dir = tempfile::tempdir().unwrap(); + + let database = Database::open_or_create(dir.path()).unwrap(); + let env = &database.env; + + let (sender, receiver) = mpsc::sync_channel(100); + let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let index = database.create_index("test").unwrap(); + + let done = database.set_update_callback("test", Box::new(update_fn)); + assert!(done, "could not set the index update function"); + + let schema = { + let data = r#" + identifier = "id" + + [attributes."name"] + displayed = true + indexed = true + + [attributes."description"] + displayed = true + indexed = true + "#; + toml::from_str(data).unwrap() + }; + + let mut writer = env.write_txn().unwrap(); + let _update_id = index.schema_update(&mut writer, schema).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + let mut additions = index.documents_addition(); + + let doc1 = serde_json::json!({ + "id": 123, + "name": "Marvin", + "description": "My name is Marvin", + }); + + let doc2 = serde_json::json!({ + "id": 234, + "name": "Kevin", + "description": "My name is Kevin", + }); + + additions.update_document(doc1); + additions.update_document(doc2); + + let mut writer = env.write_txn().unwrap(); + let update_id = additions.finalize(&mut writer).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + // block until the transaction is processed + let _ = receiver.into_iter().find(|id| *id == update_id); + + let reader = env.read_txn().unwrap(); + let result = index.update_status(&reader, update_id).unwrap(); + assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok()); + } + + #[test] + fn invalid_updates() { + let dir = tempfile::tempdir().unwrap(); + + let database = Database::open_or_create(dir.path()).unwrap(); + let env = &database.env; + + let (sender, receiver) = mpsc::sync_channel(100); + let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let index = database.create_index("test").unwrap(); + + let done = database.set_update_callback("test", Box::new(update_fn)); + assert!(done, "could not set the index update function"); + + let schema = { + let data = r#" + identifier = "id" + + [attributes."name"] + displayed = true + indexed = true + + [attributes."description"] + displayed = true + indexed = true + "#; + toml::from_str(data).unwrap() + }; + + let mut writer = env.write_txn().unwrap(); + let _update_id = index.schema_update(&mut writer, schema).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + let mut additions = index.documents_addition(); + + let doc1 = serde_json::json!({ + "id": 123, + "name": "Marvin", + "description": "My name is Marvin", + }); + + let doc2 = serde_json::json!({ + "name": "Kevin", + "description": "My name is Kevin", + }); + + additions.update_document(doc1); + additions.update_document(doc2); + + let mut writer = env.write_txn().unwrap(); + let update_id = additions.finalize(&mut writer).unwrap(); + + // don't forget to commit... + writer.commit().unwrap(); + + // block until the transaction is processed + let _ = receiver.into_iter().find(|id| *id == update_id); + + let reader = env.read_txn().unwrap(); + let result = index.update_status(&reader, update_id).unwrap(); + assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err()); + } +} diff --git a/meilidb-core/src/serde/serializer.rs b/meilidb-core/src/serde/serializer.rs index 34f54d655..36c403390 100644 --- a/meilidb-core/src/serde/serializer.rs +++ b/meilidb-core/src/serde/serializer.rs @@ -7,8 +7,8 @@ use crate::{DocumentId, RankedMap}; use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError}; -pub struct Serializer<'a> { - pub txn: &'a mut heed::RwTxn, +pub struct Serializer<'a, 'b> { + pub txn: &'a mut heed::RwTxn<'b>, pub schema: &'a Schema, pub document_store: DocumentsFields, pub document_fields_counts: DocumentsFieldsCounts, @@ -17,15 +17,15 @@ pub struct Serializer<'a> { pub document_id: DocumentId, } -impl<'a> ser::Serializer for Serializer<'a> { +impl<'a, 'b> ser::Serializer for Serializer<'a, 'b> { type Ok = (); type Error = SerializerError; type SerializeSeq = ser::Impossible; type SerializeTuple = ser::Impossible; type SerializeTupleStruct = ser::Impossible; type SerializeTupleVariant = ser::Impossible; - type SerializeMap = MapSerializer<'a>; - type SerializeStruct = StructSerializer<'a>; + type SerializeMap = MapSerializer<'a, 'b>; + type SerializeStruct = StructSerializer<'a, 'b>; type SerializeStructVariant = ser::Impossible; forward_to_unserializable_type! { @@ -190,8 +190,8 @@ impl<'a> ser::Serializer for Serializer<'a> { } } -pub struct MapSerializer<'a> { - txn: &'a mut heed::RwTxn, +pub struct MapSerializer<'a, 'b> { + txn: &'a mut heed::RwTxn<'b>, schema: &'a Schema, document_id: DocumentId, document_store: DocumentsFields, @@ -201,7 +201,7 @@ pub struct MapSerializer<'a> { current_key_name: Option, } -impl<'a> ser::SerializeMap for MapSerializer<'a> { +impl<'a, 'b> ser::SerializeMap for MapSerializer<'a, 'b> { type Ok = (); type Error = SerializerError; @@ -253,8 +253,8 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> { } } -pub struct StructSerializer<'a> { - txn: &'a mut heed::RwTxn, +pub struct StructSerializer<'a, 'b> { + txn: &'a mut heed::RwTxn<'b>, schema: &'a Schema, document_id: DocumentId, document_store: DocumentsFields, @@ -263,7 +263,7 @@ pub struct StructSerializer<'a> { ranked_map: &'a mut RankedMap, } -impl<'a> ser::SerializeStruct for StructSerializer<'a> { +impl<'a, 'b> ser::SerializeStruct for StructSerializer<'a, 'b> { type Ok = (); type Error = SerializerError; diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 1806e3063..8aa5468d7 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -77,8 +77,8 @@ pub fn push_documents_addition( Ok(last_update_id) } -pub fn apply_documents_addition( - writer: &mut heed::RwTxn, +pub fn apply_documents_addition<'a, 'b>( + writer: &'a mut heed::RwTxn<'b>, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 360539fe2..52d8ab3d5 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -149,15 +149,12 @@ pub fn next_update_id( Ok(new_update_id) } -pub fn update_task( - writer: &mut heed::RwTxn, +pub fn update_task<'a, 'b>( + writer: &'a mut heed::RwTxn<'b>, index: store::Index, -) -> MResult> { - let (update_id, update) = match index.updates.pop_front(writer)? { - Some(value) => value, - None => return Ok(None), - }; - + update_id: u64, + update: Update, +) -> MResult { debug!("Processing update number {}", update_id); let (update_type, result, duration) = match update { @@ -308,9 +305,5 @@ pub fn update_task( detailed_duration, }; - index - .updates_results - .put_update_result(writer, update_id, &status)?; - - Ok(Some(status)) + Ok(status) } diff --git a/meilidb-http/Cargo.toml b/meilidb-http/Cargo.toml index 12c1a5fc6..3887bd775 100644 --- a/meilidb-http/Cargo.toml +++ b/meilidb-http/Cargo.toml @@ -13,7 +13,7 @@ chrono = { version = "0.4.9", features = ["serde"] } crossbeam-channel = "0.3.9" envconfig = "0.5.1" envconfig_derive = "0.5.1" -heed = "0.3.0" +heed = "0.5.0" http = "0.1.19" indexmap = { version = "1.3.0", features = ["serde-1"] } jemallocator = "0.3.2"