mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-27 05:37:31 +01:00
Introduce partial updates to the update system
This commit is contained in:
parent
68c0a36b00
commit
36b74f0efe
@ -263,8 +263,6 @@ mod tests {
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let _update_id = index.schema_update(&mut writer, schema).unwrap();
|
||||
|
||||
// don't forget to commit...
|
||||
writer.commit().unwrap();
|
||||
|
||||
let mut additions = index.documents_addition();
|
||||
@ -286,8 +284,6 @@ mod tests {
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let update_id = additions.finalize(&mut writer).unwrap();
|
||||
|
||||
// don't forget to commit...
|
||||
writer.commit().unwrap();
|
||||
|
||||
// block until the transaction is processed
|
||||
@ -329,8 +325,6 @@ mod tests {
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let _update_id = index.schema_update(&mut writer, schema).unwrap();
|
||||
|
||||
// don't forget to commit...
|
||||
writer.commit().unwrap();
|
||||
|
||||
let mut additions = index.documents_addition();
|
||||
@ -351,8 +345,6 @@ mod tests {
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let update_id = additions.finalize(&mut writer).unwrap();
|
||||
|
||||
// don't forget to commit...
|
||||
writer.commit().unwrap();
|
||||
|
||||
// block until the transaction is processed
|
||||
@ -564,8 +556,6 @@ mod tests {
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let _update_id = index.schema_update(&mut writer, schema).unwrap();
|
||||
|
||||
// don't forget to commit...
|
||||
writer.commit().unwrap();
|
||||
|
||||
let mut additions = index.documents_addition();
|
||||
@ -589,8 +579,6 @@ mod tests {
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let update_id = additions.finalize(&mut writer).unwrap();
|
||||
|
||||
// don't forget to commit...
|
||||
writer.commit().unwrap();
|
||||
|
||||
// block until the transaction is processed
|
||||
@ -613,4 +601,136 @@ mod tests {
|
||||
.unwrap();
|
||||
assert!(document.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn partial_document_update() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
||||
let database = Database::open_or_create(dir.path()).unwrap();
|
||||
let env = &database.env;
|
||||
|
||||
let (sender, receiver) = mpsc::sync_channel(100);
|
||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
||||
let index = database.create_index("test").unwrap();
|
||||
|
||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
||||
assert!(done, "could not set the index update function");
|
||||
|
||||
let schema = {
|
||||
let data = r#"
|
||||
identifier = "id"
|
||||
|
||||
[attributes."id"]
|
||||
displayed = true
|
||||
|
||||
[attributes."name"]
|
||||
displayed = true
|
||||
indexed = true
|
||||
|
||||
[attributes."description"]
|
||||
displayed = true
|
||||
indexed = true
|
||||
"#;
|
||||
toml::from_str(data).unwrap()
|
||||
};
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let _update_id = index.schema_update(&mut writer, schema).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
let mut additions = index.documents_addition();
|
||||
|
||||
// DocumentId(7900334843754999545)
|
||||
let doc1 = serde_json::json!({
|
||||
"id": 123,
|
||||
"name": "Marvin",
|
||||
"description": "My name is Marvin",
|
||||
});
|
||||
|
||||
// DocumentId(8367468610878465872)
|
||||
let doc2 = serde_json::json!({
|
||||
"id": 234,
|
||||
"name": "Kevin",
|
||||
"description": "My name is Kevin",
|
||||
});
|
||||
|
||||
additions.update_document(doc1);
|
||||
additions.update_document(doc2);
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let update_id = additions.finalize(&mut writer).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
// block until the transaction is processed
|
||||
let _ = receiver.iter().find(|id| *id == update_id);
|
||||
|
||||
let reader = env.read_txn().unwrap();
|
||||
let result = index.update_status(&reader, update_id).unwrap();
|
||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
||||
|
||||
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
|
||||
assert!(document.is_none());
|
||||
|
||||
let document: Option<IgnoredAny> = index
|
||||
.document(&reader, None, DocumentId(7900334843754999545))
|
||||
.unwrap();
|
||||
assert!(document.is_some());
|
||||
|
||||
let document: Option<IgnoredAny> = index
|
||||
.document(&reader, None, DocumentId(8367468610878465872))
|
||||
.unwrap();
|
||||
assert!(document.is_some());
|
||||
|
||||
reader.abort();
|
||||
|
||||
let mut partial_additions = index.documents_partial_addition();
|
||||
|
||||
// DocumentId(7900334843754999545)
|
||||
let partial_doc1 = serde_json::json!({
|
||||
"id": 123,
|
||||
"description": "I am the new Marvin",
|
||||
});
|
||||
|
||||
// DocumentId(8367468610878465872)
|
||||
let partial_doc2 = serde_json::json!({
|
||||
"id": 234,
|
||||
"description": "I am the new Kevin",
|
||||
});
|
||||
|
||||
partial_additions.update_document(partial_doc1);
|
||||
partial_additions.update_document(partial_doc2);
|
||||
|
||||
let mut writer = env.write_txn().unwrap();
|
||||
let update_id = partial_additions.finalize(&mut writer).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
// block until the transaction is processed
|
||||
let _ = receiver.iter().find(|id| *id == update_id);
|
||||
|
||||
let reader = env.read_txn().unwrap();
|
||||
let result = index.update_status(&reader, update_id).unwrap();
|
||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
||||
|
||||
let document: Option<serde_json::Value> = index
|
||||
.document(&reader, None, DocumentId(7900334843754999545))
|
||||
.unwrap();
|
||||
|
||||
let new_doc1 = serde_json::json!({
|
||||
"id": 123,
|
||||
"name": "Marvin",
|
||||
"description": "I am the new Marvin",
|
||||
});
|
||||
assert_eq!(document, Some(new_doc1));
|
||||
|
||||
let document: Option<serde_json::Value> = index
|
||||
.document(&reader, None, DocumentId(8367468610878465872))
|
||||
.unwrap();
|
||||
|
||||
let new_doc2 = serde_json::json!({
|
||||
"id": 234,
|
||||
"name": "Kevin",
|
||||
"description": "I am the new Kevin",
|
||||
});
|
||||
assert_eq!(document, Some(new_doc2));
|
||||
}
|
||||
}
|
||||
|
@ -156,6 +156,14 @@ impl Index {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn documents_partial_addition<D>(&self) -> update::DocumentsAddition<D> {
|
||||
update::DocumentsAddition::new_partial(
|
||||
self.updates,
|
||||
self.updates_results,
|
||||
self.updates_notifier.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn documents_deletion(&self) -> update::DocumentsDeletion {
|
||||
update::DocumentsDeletion::new(
|
||||
self.updates,
|
||||
|
@ -2,10 +2,10 @@ use std::collections::HashMap;
|
||||
|
||||
use fst::{set::OpBuilder, SetBuilder};
|
||||
use sdset::{duo::Union, SetOperation};
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::raw_indexer::RawIndexer;
|
||||
use crate::serde::{extract_document_id, serialize_value, Serializer};
|
||||
use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer};
|
||||
use crate::store;
|
||||
use crate::update::{apply_documents_deletion, next_update_id, Update};
|
||||
use crate::{Error, MResult, RankedMap};
|
||||
@ -15,6 +15,7 @@ pub struct DocumentsAddition<D> {
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
documents: Vec<D>,
|
||||
is_partial: bool,
|
||||
}
|
||||
|
||||
impl<D> DocumentsAddition<D> {
|
||||
@ -28,6 +29,21 @@ impl<D> DocumentsAddition<D> {
|
||||
updates_results_store,
|
||||
updates_notifier,
|
||||
documents: Vec::new(),
|
||||
is_partial: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_partial(
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
) -> DocumentsAddition<D> {
|
||||
DocumentsAddition {
|
||||
updates_store,
|
||||
updates_results_store,
|
||||
updates_notifier,
|
||||
documents: Vec::new(),
|
||||
is_partial: true,
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,6 +61,7 @@ impl<D> DocumentsAddition<D> {
|
||||
self.updates_store,
|
||||
self.updates_results_store,
|
||||
self.documents,
|
||||
self.is_partial,
|
||||
)?;
|
||||
Ok(update_id)
|
||||
}
|
||||
@ -61,6 +78,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
addition: Vec<D>,
|
||||
is_partial: bool,
|
||||
) -> MResult<u64> {
|
||||
let mut values = Vec::with_capacity(addition.len());
|
||||
for add in addition {
|
||||
@ -71,7 +89,12 @@ pub fn push_documents_addition<D: serde::Serialize>(
|
||||
|
||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||
|
||||
let update = Update::DocumentsAddition(values);
|
||||
let update = if is_partial {
|
||||
Update::DocumentsPartial(values)
|
||||
} else {
|
||||
Update::DocumentsAddition(values)
|
||||
};
|
||||
|
||||
updates_store.put_update(writer, last_update_id, &update)?;
|
||||
|
||||
Ok(last_update_id)
|
||||
@ -84,7 +107,7 @@ pub fn apply_documents_addition<'a, 'b>(
|
||||
documents_fields_counts_store: store::DocumentsFieldsCounts,
|
||||
postings_lists_store: store::PostingsLists,
|
||||
docs_words_store: store::DocsWords,
|
||||
addition: Vec<serde_json::Value>,
|
||||
addition: Vec<HashMap<String, serde_json::Value>>,
|
||||
) -> MResult<()> {
|
||||
let mut documents_additions = HashMap::new();
|
||||
|
||||
@ -156,6 +179,102 @@ pub fn apply_documents_addition<'a, 'b>(
|
||||
)
|
||||
}
|
||||
|
||||
pub fn apply_documents_partial_addition<'a, 'b>(
|
||||
writer: &'a mut heed::RwTxn<'b>,
|
||||
main_store: store::Main,
|
||||
documents_fields_store: store::DocumentsFields,
|
||||
documents_fields_counts_store: store::DocumentsFieldsCounts,
|
||||
postings_lists_store: store::PostingsLists,
|
||||
docs_words_store: store::DocsWords,
|
||||
addition: Vec<HashMap<String, serde_json::Value>>,
|
||||
) -> MResult<()> {
|
||||
let mut documents_additions = HashMap::new();
|
||||
|
||||
let schema = match main_store.schema(writer)? {
|
||||
Some(schema) => schema,
|
||||
None => return Err(Error::SchemaMissing),
|
||||
};
|
||||
|
||||
let identifier = schema.identifier_name();
|
||||
|
||||
// 1. store documents ids for future deletion
|
||||
for mut document in addition {
|
||||
let document_id = match extract_document_id(identifier, &document)? {
|
||||
Some(id) => id,
|
||||
None => return Err(Error::MissingDocumentId),
|
||||
};
|
||||
|
||||
let mut deserializer = Deserializer {
|
||||
document_id,
|
||||
reader: writer,
|
||||
documents_fields: documents_fields_store,
|
||||
schema: &schema,
|
||||
attributes: None,
|
||||
};
|
||||
|
||||
// retrieve the old document and
|
||||
// update the new one with missing keys found in the old one
|
||||
let result = Option::<HashMap<String, serde_json::Value>>::deserialize(&mut deserializer)?;
|
||||
if let Some(old_document) = result {
|
||||
for (key, value) in old_document {
|
||||
document.entry(key).or_insert(value);
|
||||
}
|
||||
}
|
||||
|
||||
documents_additions.insert(document_id, document);
|
||||
}
|
||||
|
||||
// 2. remove the documents posting lists
|
||||
let number_of_inserted_documents = documents_additions.len();
|
||||
let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect();
|
||||
apply_documents_deletion(
|
||||
writer,
|
||||
main_store,
|
||||
documents_fields_store,
|
||||
documents_fields_counts_store,
|
||||
postings_lists_store,
|
||||
docs_words_store,
|
||||
documents_ids,
|
||||
)?;
|
||||
|
||||
let mut ranked_map = match main_store.ranked_map(writer)? {
|
||||
Some(ranked_map) => ranked_map,
|
||||
None => RankedMap::default(),
|
||||
};
|
||||
|
||||
let stop_words = match main_store.stop_words_fst(writer)? {
|
||||
Some(stop_words) => stop_words,
|
||||
None => fst::Set::default(),
|
||||
};
|
||||
|
||||
// 3. index the documents fields in the stores
|
||||
let mut indexer = RawIndexer::new(stop_words);
|
||||
|
||||
for (document_id, document) in documents_additions {
|
||||
let serializer = Serializer {
|
||||
txn: writer,
|
||||
schema: &schema,
|
||||
document_store: documents_fields_store,
|
||||
document_fields_counts: documents_fields_counts_store,
|
||||
indexer: &mut indexer,
|
||||
ranked_map: &mut ranked_map,
|
||||
document_id,
|
||||
};
|
||||
|
||||
document.serialize(serializer)?;
|
||||
}
|
||||
|
||||
write_documents_addition_index(
|
||||
writer,
|
||||
main_store,
|
||||
postings_lists_store,
|
||||
docs_words_store,
|
||||
&ranked_map,
|
||||
number_of_inserted_documents,
|
||||
indexer,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn reindex_all_documents(
|
||||
writer: &mut heed::RwTxn,
|
||||
main_store: store::Main,
|
||||
|
@ -10,7 +10,9 @@ mod synonyms_deletion;
|
||||
|
||||
pub use self::clear_all::{apply_clear_all, push_clear_all};
|
||||
pub use self::customs_update::{apply_customs_update, push_customs_update};
|
||||
pub use self::documents_addition::{apply_documents_addition, DocumentsAddition};
|
||||
pub use self::documents_addition::{
|
||||
apply_documents_addition, apply_documents_partial_addition, DocumentsAddition,
|
||||
};
|
||||
pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion};
|
||||
pub use self::schema_update::{apply_schema_update, push_schema_update};
|
||||
pub use self::stop_words_addition::{apply_stop_words_addition, StopWordsAddition};
|
||||
@ -19,7 +21,7 @@ pub use self::synonyms_addition::{apply_synonyms_addition, SynonymsAddition};
|
||||
pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion};
|
||||
|
||||
use std::cmp;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use heed::Result as ZResult;
|
||||
@ -34,7 +36,8 @@ pub enum Update {
|
||||
ClearAll,
|
||||
Schema(Schema),
|
||||
Customs(Vec<u8>),
|
||||
DocumentsAddition(Vec<serde_json::Value>),
|
||||
DocumentsAddition(Vec<HashMap<String, serde_json::Value>>),
|
||||
DocumentsPartial(Vec<HashMap<String, serde_json::Value>>),
|
||||
DocumentsDeletion(Vec<DocumentId>),
|
||||
SynonymsAddition(BTreeMap<String, Vec<String>>),
|
||||
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
|
||||
@ -53,6 +56,9 @@ impl Update {
|
||||
Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
|
||||
number: addition.len(),
|
||||
},
|
||||
Update::DocumentsPartial(addition) => UpdateType::DocumentsPartial {
|
||||
number: addition.len(),
|
||||
},
|
||||
Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
|
||||
number: deletion.len(),
|
||||
},
|
||||
@ -78,6 +84,7 @@ pub enum UpdateType {
|
||||
Schema { schema: Schema },
|
||||
Customs,
|
||||
DocumentsAddition { number: usize },
|
||||
DocumentsPartial { number: usize },
|
||||
DocumentsDeletion { number: usize },
|
||||
SynonymsAddition { number: usize },
|
||||
SynonymsDeletion { number: usize },
|
||||
@ -218,6 +225,25 @@ pub fn update_task<'a, 'b>(
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
}
|
||||
Update::DocumentsPartial(documents) => {
|
||||
let start = Instant::now();
|
||||
|
||||
let update_type = UpdateType::DocumentsPartial {
|
||||
number: documents.len(),
|
||||
};
|
||||
|
||||
let result = apply_documents_partial_addition(
|
||||
writer,
|
||||
index.main,
|
||||
index.documents_fields,
|
||||
index.documents_fields_counts,
|
||||
index.postings_lists,
|
||||
index.docs_words,
|
||||
documents,
|
||||
);
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
}
|
||||
Update::DocumentsDeletion(documents) => {
|
||||
let start = Instant::now();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user