diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 3a614f708..0203e2e26 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -263,8 +263,6 @@ mod tests { let mut writer = env.write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); - - // don't forget to commit... writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -286,8 +284,6 @@ mod tests { let mut writer = env.write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); - - // don't forget to commit... writer.commit().unwrap(); // block until the transaction is processed @@ -329,8 +325,6 @@ mod tests { let mut writer = env.write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); - - // don't forget to commit... writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -351,8 +345,6 @@ mod tests { let mut writer = env.write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); - - // don't forget to commit... writer.commit().unwrap(); // block until the transaction is processed @@ -564,8 +556,6 @@ mod tests { let mut writer = env.write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); - - // don't forget to commit... writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -589,8 +579,6 @@ mod tests { let mut writer = env.write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); - - // don't forget to commit... writer.commit().unwrap(); // block until the transaction is processed @@ -613,4 +601,136 @@ mod tests { .unwrap(); assert!(document.is_some()); } + + #[test] + fn partial_document_update() { + let dir = tempfile::tempdir().unwrap(); + + let database = Database::open_or_create(dir.path()).unwrap(); + let env = &database.env; + + let (sender, receiver) = mpsc::sync_channel(100); + let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap(); + let index = database.create_index("test").unwrap(); + + let done = database.set_update_callback("test", Box::new(update_fn)); + assert!(done, "could not set the index update function"); + + let schema = { + let data = r#" + identifier = "id" + + [attributes."id"] + displayed = true + + [attributes."name"] + displayed = true + indexed = true + + [attributes."description"] + displayed = true + indexed = true + "#; + toml::from_str(data).unwrap() + }; + + let mut writer = env.write_txn().unwrap(); + let _update_id = index.schema_update(&mut writer, schema).unwrap(); + writer.commit().unwrap(); + + let mut additions = index.documents_addition(); + + // DocumentId(7900334843754999545) + let doc1 = serde_json::json!({ + "id": 123, + "name": "Marvin", + "description": "My name is Marvin", + }); + + // DocumentId(8367468610878465872) + let doc2 = serde_json::json!({ + "id": 234, + "name": "Kevin", + "description": "My name is Kevin", + }); + + additions.update_document(doc1); + additions.update_document(doc2); + + let mut writer = env.write_txn().unwrap(); + let update_id = additions.finalize(&mut writer).unwrap(); + writer.commit().unwrap(); + + // block until the transaction is processed + let _ = receiver.iter().find(|id| *id == update_id); + + let reader = env.read_txn().unwrap(); + let result = index.update_status(&reader, update_id).unwrap(); + assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok()); + + let document: Option = index.document(&reader, None, DocumentId(25)).unwrap(); + assert!(document.is_none()); + + let document: Option = index + .document(&reader, None, DocumentId(7900334843754999545)) + .unwrap(); + assert!(document.is_some()); + + let document: Option = index + .document(&reader, None, DocumentId(8367468610878465872)) + .unwrap(); + assert!(document.is_some()); + + reader.abort(); + + let mut partial_additions = index.documents_partial_addition(); + + // DocumentId(7900334843754999545) + let partial_doc1 = serde_json::json!({ + "id": 123, + "description": "I am the new Marvin", + }); + + // DocumentId(8367468610878465872) + let partial_doc2 = serde_json::json!({ + "id": 234, + "description": "I am the new Kevin", + }); + + partial_additions.update_document(partial_doc1); + partial_additions.update_document(partial_doc2); + + let mut writer = env.write_txn().unwrap(); + let update_id = partial_additions.finalize(&mut writer).unwrap(); + writer.commit().unwrap(); + + // block until the transaction is processed + let _ = receiver.iter().find(|id| *id == update_id); + + let reader = env.read_txn().unwrap(); + let result = index.update_status(&reader, update_id).unwrap(); + assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok()); + + let document: Option = index + .document(&reader, None, DocumentId(7900334843754999545)) + .unwrap(); + + let new_doc1 = serde_json::json!({ + "id": 123, + "name": "Marvin", + "description": "I am the new Marvin", + }); + assert_eq!(document, Some(new_doc1)); + + let document: Option = index + .document(&reader, None, DocumentId(8367468610878465872)) + .unwrap(); + + let new_doc2 = serde_json::json!({ + "id": 234, + "name": "Kevin", + "description": "I am the new Kevin", + }); + assert_eq!(document, Some(new_doc2)); + } } diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index ed2a5e328..f53900a79 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -156,6 +156,14 @@ impl Index { ) } + pub fn documents_partial_addition(&self) -> update::DocumentsAddition { + update::DocumentsAddition::new_partial( + self.updates, + self.updates_results, + self.updates_notifier.clone(), + ) + } + pub fn documents_deletion(&self) -> update::DocumentsDeletion { update::DocumentsDeletion::new( self.updates, diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 8aa5468d7..7a7f03362 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -2,10 +2,10 @@ use std::collections::HashMap; use fst::{set::OpBuilder, SetBuilder}; use sdset::{duo::Union, SetOperation}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use crate::raw_indexer::RawIndexer; -use crate::serde::{extract_document_id, serialize_value, Serializer}; +use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer}; use crate::store; use crate::update::{apply_documents_deletion, next_update_id, Update}; use crate::{Error, MResult, RankedMap}; @@ -15,6 +15,7 @@ pub struct DocumentsAddition { updates_results_store: store::UpdatesResults, updates_notifier: crossbeam_channel::Sender<()>, documents: Vec, + is_partial: bool, } impl DocumentsAddition { @@ -28,6 +29,21 @@ impl DocumentsAddition { updates_results_store, updates_notifier, documents: Vec::new(), + is_partial: false, + } + } + + pub fn new_partial( + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + ) -> DocumentsAddition { + DocumentsAddition { + updates_store, + updates_results_store, + updates_notifier, + documents: Vec::new(), + is_partial: true, } } @@ -45,6 +61,7 @@ impl DocumentsAddition { self.updates_store, self.updates_results_store, self.documents, + self.is_partial, )?; Ok(update_id) } @@ -61,6 +78,7 @@ pub fn push_documents_addition( updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: Vec, + is_partial: bool, ) -> MResult { let mut values = Vec::with_capacity(addition.len()); for add in addition { @@ -71,7 +89,12 @@ pub fn push_documents_addition( let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::DocumentsAddition(values); + let update = if is_partial { + Update::DocumentsPartial(values) + } else { + Update::DocumentsAddition(values) + }; + updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) @@ -84,7 +107,7 @@ pub fn apply_documents_addition<'a, 'b>( documents_fields_counts_store: store::DocumentsFieldsCounts, postings_lists_store: store::PostingsLists, docs_words_store: store::DocsWords, - addition: Vec, + addition: Vec>, ) -> MResult<()> { let mut documents_additions = HashMap::new(); @@ -156,6 +179,102 @@ pub fn apply_documents_addition<'a, 'b>( ) } +pub fn apply_documents_partial_addition<'a, 'b>( + writer: &'a mut heed::RwTxn<'b>, + main_store: store::Main, + documents_fields_store: store::DocumentsFields, + documents_fields_counts_store: store::DocumentsFieldsCounts, + postings_lists_store: store::PostingsLists, + docs_words_store: store::DocsWords, + addition: Vec>, +) -> MResult<()> { + let mut documents_additions = HashMap::new(); + + let schema = match main_store.schema(writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + + let identifier = schema.identifier_name(); + + // 1. store documents ids for future deletion + for mut document in addition { + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + let mut deserializer = Deserializer { + document_id, + reader: writer, + documents_fields: documents_fields_store, + schema: &schema, + attributes: None, + }; + + // retrieve the old document and + // update the new one with missing keys found in the old one + let result = Option::>::deserialize(&mut deserializer)?; + if let Some(old_document) = result { + for (key, value) in old_document { + document.entry(key).or_insert(value); + } + } + + documents_additions.insert(document_id, document); + } + + // 2. remove the documents posting lists + let number_of_inserted_documents = documents_additions.len(); + let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect(); + apply_documents_deletion( + writer, + main_store, + documents_fields_store, + documents_fields_counts_store, + postings_lists_store, + docs_words_store, + documents_ids, + )?; + + let mut ranked_map = match main_store.ranked_map(writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; + + let stop_words = match main_store.stop_words_fst(writer)? { + Some(stop_words) => stop_words, + None => fst::Set::default(), + }; + + // 3. index the documents fields in the stores + let mut indexer = RawIndexer::new(stop_words); + + for (document_id, document) in documents_additions { + let serializer = Serializer { + txn: writer, + schema: &schema, + document_store: documents_fields_store, + document_fields_counts: documents_fields_counts_store, + indexer: &mut indexer, + ranked_map: &mut ranked_map, + document_id, + }; + + document.serialize(serializer)?; + } + + write_documents_addition_index( + writer, + main_store, + postings_lists_store, + docs_words_store, + &ranked_map, + number_of_inserted_documents, + indexer, + ) +} + pub fn reindex_all_documents( writer: &mut heed::RwTxn, main_store: store::Main, diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 52d8ab3d5..8d00498f0 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -10,7 +10,9 @@ mod synonyms_deletion; pub use self::clear_all::{apply_clear_all, push_clear_all}; pub use self::customs_update::{apply_customs_update, push_customs_update}; -pub use self::documents_addition::{apply_documents_addition, DocumentsAddition}; +pub use self::documents_addition::{ + apply_documents_addition, apply_documents_partial_addition, DocumentsAddition, +}; pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion}; pub use self::schema_update::{apply_schema_update, push_schema_update}; pub use self::stop_words_addition::{apply_stop_words_addition, StopWordsAddition}; @@ -19,7 +21,7 @@ pub use self::synonyms_addition::{apply_synonyms_addition, SynonymsAddition}; pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion}; use std::cmp; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::time::{Duration, Instant}; use heed::Result as ZResult; @@ -34,7 +36,8 @@ pub enum Update { ClearAll, Schema(Schema), Customs(Vec), - DocumentsAddition(Vec), + DocumentsAddition(Vec>), + DocumentsPartial(Vec>), DocumentsDeletion(Vec), SynonymsAddition(BTreeMap>), SynonymsDeletion(BTreeMap>>), @@ -53,6 +56,9 @@ impl Update { Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition { number: addition.len(), }, + Update::DocumentsPartial(addition) => UpdateType::DocumentsPartial { + number: addition.len(), + }, Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { number: deletion.len(), }, @@ -78,6 +84,7 @@ pub enum UpdateType { Schema { schema: Schema }, Customs, DocumentsAddition { number: usize }, + DocumentsPartial { number: usize }, DocumentsDeletion { number: usize }, SynonymsAddition { number: usize }, SynonymsDeletion { number: usize }, @@ -218,6 +225,25 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } + Update::DocumentsPartial(documents) => { + let start = Instant::now(); + + let update_type = UpdateType::DocumentsPartial { + number: documents.len(), + }; + + let result = apply_documents_partial_addition( + writer, + index.main, + index.documents_fields, + index.documents_fields_counts, + index.postings_lists, + index.docs_words, + documents, + ); + + (update_type, result, start.elapsed()) + } Update::DocumentsDeletion(documents) => { let start = Instant::now();