use std::collections::HashMap; use fst::{set::OpBuilder, SetBuilder}; use indexmap::IndexMap; use sdset::{duo::Union, SetOperation}; use serde::{Deserialize, Serialize}; use crate::database::{MainT, UpdateT}; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::facets; use crate::raw_indexer::RawIndexer; use crate::serde::{extract_document_id, serialize_value_with_id, Deserializer, Serializer}; use crate::store; use crate::update::{apply_documents_deletion, compute_short_prefixes, next_update_id, Update}; use crate::{Error, MResult, RankedMap}; pub struct DocumentsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, updates_notifier: UpdateEventsEmitter, documents: Vec, is_partial: bool, } impl DocumentsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, updates_notifier: UpdateEventsEmitter, ) -> DocumentsAddition { DocumentsAddition { updates_store, updates_results_store, updates_notifier, documents: Vec::new(), is_partial: false, } } pub fn new_partial( updates_store: store::Updates, updates_results_store: store::UpdatesResults, updates_notifier: UpdateEventsEmitter, ) -> DocumentsAddition { DocumentsAddition { updates_store, updates_results_store, updates_notifier, documents: Vec::new(), is_partial: true, } } pub fn update_document(&mut self, document: D) { self.documents.push(document); } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult where D: serde::Serialize, { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_documents_addition( writer, self.updates_store, self.updates_results_store, self.documents, self.is_partial, )?; Ok(update_id) } } impl Extend for DocumentsAddition { fn extend>(&mut self, iter: T) { self.documents.extend(iter) } } pub fn push_documents_addition( writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: Vec, is_partial: bool, ) -> MResult { let mut values = Vec::with_capacity(addition.len()); for add in addition { let vec = serde_json::to_vec(&add)?; let add = serde_json::from_slice(&vec)?; values.push(add); } let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let update = if is_partial { Update::documents_partial(values) } else { Update::documents_addition(values) }; updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) } pub fn apply_addition<'a, 'b>( writer: &'a mut heed::RwTxn<'b, MainT>, index: &store::Index, addition: Vec>, partial: bool ) -> MResult<()> { let mut documents_additions = HashMap::new(); let mut schema = match index.main.schema(writer)? { Some(schema) => schema, None => return Err(Error::SchemaMissing), }; let primary_key = schema.primary_key().ok_or(Error::MissingPrimaryKey)?; // 1. store documents ids for future deletion for mut document in addition { let document_id = match extract_document_id(&primary_key, &document)? { Some(id) => id, None => return Err(Error::MissingDocumentId), }; if partial { let mut deserializer = Deserializer { document_id, reader: writer, documents_fields: index.documents_fields, schema: &schema, fields: None, }; // retrieve the old document and // update the new one with missing keys found in the old one let result = Option::>::deserialize(&mut deserializer)?; if let Some(old_document) = result { for (key, value) in old_document { document.entry(key).or_insert(value); } } } documents_additions.insert(document_id, document); } // 2. remove the documents posting lists let number_of_inserted_documents = documents_additions.len(); let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect(); apply_documents_deletion(writer, index, documents_ids)?; let mut ranked_map = match index.main.ranked_map(writer)? { Some(ranked_map) => ranked_map, None => RankedMap::default(), }; let stop_words = match index.main.stop_words_fst(writer)? { Some(stop_words) => stop_words, None => fst::Set::default(), }; // 3. index the documents fields in the stores if let Some(attributes_for_facetting) = index.main.attributes_for_faceting(writer)? { let facet_map = facets::facet_map_from_docs(&schema, &documents_additions, attributes_for_facetting.as_ref())?; index.facets.add(writer, facet_map)?; } let mut indexer = RawIndexer::new(stop_words); for (document_id, document) in documents_additions { let serializer = Serializer { txn: writer, schema: &mut schema, document_store: index.documents_fields, document_fields_counts: index.documents_fields_counts, indexer: &mut indexer, ranked_map: &mut ranked_map, document_id, }; document.serialize(serializer)?; } write_documents_addition_index( writer, index, &ranked_map, number_of_inserted_documents, indexer, )?; index.main.put_schema(writer, &schema)?; Ok(()) } pub fn apply_documents_partial_addition<'a, 'b>( writer: &'a mut heed::RwTxn<'b, MainT>, index: &store::Index, addition: Vec>, ) -> MResult<()> { apply_addition(writer, index, addition, true) } pub fn apply_documents_addition<'a, 'b>( writer: &'a mut heed::RwTxn<'b, MainT>, index: &store::Index, addition: Vec>, ) -> MResult<()> { apply_addition(writer, index, addition, false) } pub fn reindex_all_documents(writer: &mut heed::RwTxn, index: &store::Index) -> MResult<()> { let schema = match index.main.schema(writer)? { Some(schema) => schema, None => return Err(Error::SchemaMissing), }; let mut ranked_map = RankedMap::default(); // 1. retrieve all documents ids let mut documents_ids_to_reindex = Vec::new(); for result in index.documents_fields_counts.documents_ids(writer)? { let document_id = result?; documents_ids_to_reindex.push(document_id); } // 2. remove the documents posting lists index.main.put_words_fst(writer, &fst::Set::default())?; index.main.put_ranked_map(writer, &ranked_map)?; index.main.put_number_of_documents(writer, |_| 0)?; index.facets.clear(writer)?; index.postings_lists.clear(writer)?; index.docs_words.clear(writer)?; let stop_words = match index.main.stop_words_fst(writer)? { Some(stop_words) => stop_words, None => fst::Set::default(), }; let number_of_inserted_documents = documents_ids_to_reindex.len(); let mut indexer = RawIndexer::new(stop_words); let mut ram_store = HashMap::new(); if let Some(ref attributes_for_facetting) = index.main.attributes_for_faceting(writer)? { let facet_map = facets::facet_map_from_docids(writer, &index, &documents_ids_to_reindex, &attributes_for_facetting)?; index.facets.add(writer, facet_map)?; } // ^-- https://github.com/meilisearch/MeiliSearch/pull/631#issuecomment-626624470 --v for document_id in documents_ids_to_reindex { for result in index.documents_fields.document_fields(writer, document_id)? { let (field_id, bytes) = result?; let value: serde_json::Value = serde_json::from_slice(bytes)?; ram_store.insert((document_id, field_id), value); } for ((docid, field_id), value) in ram_store.drain() { serialize_value_with_id( writer, field_id, &schema, docid, index.documents_fields, index.documents_fields_counts, &mut indexer, &mut ranked_map, &value )?; } } // 4. write the new index in the main store write_documents_addition_index( writer, index, &ranked_map, number_of_inserted_documents, indexer, )?; index.main.put_schema(writer, &schema)?; Ok(()) } pub fn write_documents_addition_index( writer: &mut heed::RwTxn, index: &store::Index, ranked_map: &RankedMap, number_of_inserted_documents: usize, indexer: RawIndexer, ) -> MResult<()> { let indexed = indexer.build(); let mut delta_words_builder = SetBuilder::memory(); for (word, delta_set) in indexed.words_doc_indexes { delta_words_builder.insert(&word).unwrap(); let set = match index.postings_lists.postings_list(writer, &word)? { Some(postings) => Union::new(&postings.matches, &delta_set).into_set_buf(), None => delta_set, }; index.postings_lists.put_postings_list(writer, &word, &set)?; } for (id, words) in indexed.docs_words { index.docs_words.put_doc_words(writer, id, &words)?; } let delta_words = delta_words_builder .into_inner() .and_then(fst::Set::from_bytes) .unwrap(); let words = match index.main.words_fst(writer)? { Some(words) => { let op = OpBuilder::new() .add(words.stream()) .add(delta_words.stream()) .r#union(); let mut words_builder = SetBuilder::memory(); words_builder.extend_stream(op).unwrap(); words_builder .into_inner() .and_then(fst::Set::from_bytes) .unwrap() } None => delta_words, }; index.main.put_words_fst(writer, &words)?; index.main.put_ranked_map(writer, ranked_map)?; index.main.put_number_of_documents(writer, |old| old + number_of_inserted_documents as u64)?; compute_short_prefixes(writer, index)?; Ok(()) }