From 0f30a221fa4e2953c05e0ecfa965b3ff857dfe97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 21 Oct 2019 17:33:52 +0200 Subject: [PATCH] Introduce the reindex_all_documents indexing function --- meilidb-core/src/error.rs | 12 ++ meilidb-core/src/serde/mod.rs | 28 +--- meilidb-core/src/serde/serializer.rs | 117 ++++++++------- .../src/store/documents_fields_counts.rs | 2 +- meilidb-core/src/store/postings_lists.rs | 4 + meilidb-core/src/update/documents_addition.rs | 142 ++++++++++++++---- meilidb-core/src/update/documents_deletion.rs | 7 +- meilidb-core/src/update/mod.rs | 24 ++- meilidb-core/src/update/schema_update.rs | 53 ++++++- 9 files changed, 259 insertions(+), 130 deletions(-) diff --git a/meilidb-core/src/error.rs b/meilidb-core/src/error.rs index 986f1d08b..7dca7c994 100644 --- a/meilidb-core/src/error.rs +++ b/meilidb-core/src/error.rs @@ -12,6 +12,7 @@ pub enum Error { SchemaMissing, WordIndexMissing, MissingDocumentId, + DuplicateDocument, Zlmdb(heed::Error), Fst(fst::Error), SerdeJson(SerdeJsonError), @@ -79,6 +80,7 @@ impl fmt::Display for Error { SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), + DuplicateDocument => write!(f, "update contains documents with the same id"), Zlmdb(e) => write!(f, "heed error; {}", e), Fst(e) => write!(f, "fst error; {}", e), SerdeJson(e) => write!(f, "serde json error; {}", e), @@ -95,6 +97,10 @@ impl error::Error for Error {} #[derive(Debug)] pub enum UnsupportedOperation { SchemaAlreadyExists, + CannotUpdateSchemaIdentifier, + CannotReorderSchemaAttribute, + CannotIntroduceNewSchemaAttribute, + CannotRemoveSchemaAttribute, } impl fmt::Display for UnsupportedOperation { @@ -102,6 +108,12 @@ impl fmt::Display for UnsupportedOperation { use self::UnsupportedOperation::*; match self { SchemaAlreadyExists => write!(f, "Cannot update index which already have a schema"), + CannotUpdateSchemaIdentifier => write!(f, "Cannot update the identifier of a schema"), + CannotReorderSchemaAttribute => write!(f, "Cannot reorder the attributes of a schema"), + CannotIntroduceNewSchemaAttribute => { + write!(f, "Cannot introduce new attributes in a schema") + } + CannotRemoveSchemaAttribute => write!(f, "Cannot remove attributes from a schema"), } } } diff --git a/meilidb-core/src/serde/mod.rs b/meilidb-core/src/serde/mod.rs index 12ba1286a..c25c4b158 100644 --- a/meilidb-core/src/serde/mod.rs +++ b/meilidb-core/src/serde/mod.rs @@ -20,16 +20,14 @@ pub use self::convert_to_string::ConvertToString; pub use self::deserializer::{Deserializer, DeserializerError}; pub use self::extract_document_id::{compute_document_id, extract_document_id, value_to_string}; pub use self::indexer::Indexer; -pub use self::serializer::Serializer; +pub use self::serializer::{serialize_value, Serializer}; -use std::collections::BTreeMap; use std::{error::Error, fmt}; -use meilidb_schema::SchemaAttr; use serde::ser; use serde_json::Error as SerdeJsonError; -use crate::{DocumentId, ParseNumberError}; +use crate::ParseNumberError; #[derive(Debug)] pub enum SerializerError { @@ -103,25 +101,3 @@ impl From for SerializerError { SerializerError::ParseNumber(error) } } - -pub struct RamDocumentStore(BTreeMap<(DocumentId, SchemaAttr), Vec>); - -impl RamDocumentStore { - pub fn new() -> RamDocumentStore { - RamDocumentStore(BTreeMap::new()) - } - - pub fn set_document_field(&mut self, id: DocumentId, attr: SchemaAttr, value: Vec) { - self.0.insert((id, attr), value); - } - - pub fn into_inner(self) -> BTreeMap<(DocumentId, SchemaAttr), Vec> { - self.0 - } -} - -impl Default for RamDocumentStore { - fn default() -> Self { - Self::new() - } -} diff --git a/meilidb-core/src/serde/serializer.rs b/meilidb-core/src/serde/serializer.rs index fa197f620..34f54d655 100644 --- a/meilidb-core/src/serde/serializer.rs +++ b/meilidb-core/src/serde/serializer.rs @@ -1,17 +1,17 @@ -use meilidb_schema::{Schema, SchemaAttr}; +use meilidb_schema::{Schema, SchemaAttr, SchemaProps}; use serde::ser; -use std::collections::HashMap; use crate::raw_indexer::RawIndexer; -use crate::serde::RamDocumentStore; +use crate::store::{DocumentsFields, DocumentsFieldsCounts}; use crate::{DocumentId, RankedMap}; use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError}; pub struct Serializer<'a> { + pub txn: &'a mut heed::RwTxn, pub schema: &'a Schema, - pub document_store: &'a mut RamDocumentStore, - pub document_fields_counts: &'a mut HashMap<(DocumentId, SchemaAttr), u64>, + pub document_store: DocumentsFields, + pub document_fields_counts: DocumentsFieldsCounts, pub indexer: &'a mut RawIndexer, pub ranked_map: &'a mut RankedMap, pub document_id: DocumentId, @@ -150,6 +150,7 @@ impl<'a> ser::Serializer for Serializer<'a> { fn serialize_map(self, _len: Option) -> Result { Ok(MapSerializer { + txn: self.txn, schema: self.schema, document_id: self.document_id, document_store: self.document_store, @@ -166,6 +167,7 @@ impl<'a> ser::Serializer for Serializer<'a> { _len: usize, ) -> Result { Ok(StructSerializer { + txn: self.txn, schema: self.schema, document_id: self.document_id, document_store: self.document_store, @@ -189,10 +191,11 @@ impl<'a> ser::Serializer for Serializer<'a> { } pub struct MapSerializer<'a> { + txn: &'a mut heed::RwTxn, schema: &'a Schema, document_id: DocumentId, - document_store: &'a mut RamDocumentStore, - document_fields_counts: &'a mut HashMap<(DocumentId, SchemaAttr), u64>, + document_store: DocumentsFields, + document_fields_counts: DocumentsFieldsCounts, indexer: &'a mut RawIndexer, ranked_map: &'a mut RankedMap, current_key_name: Option, @@ -229,17 +232,20 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> { V: ser::Serialize, { let key = key.serialize(ConvertToString)?; - - serialize_value( - self.schema, - self.document_id, - self.document_store, - self.document_fields_counts, - self.indexer, - self.ranked_map, - &key, - value, - ) + match self.schema.attribute(&key) { + Some(attribute) => serialize_value( + self.txn, + attribute, + self.schema.props(attribute), + self.document_id, + self.document_store, + self.document_fields_counts, + self.indexer, + self.ranked_map, + value, + ), + None => Ok(()), + } } fn end(self) -> Result { @@ -248,10 +254,11 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> { } pub struct StructSerializer<'a> { + txn: &'a mut heed::RwTxn, schema: &'a Schema, document_id: DocumentId, - document_store: &'a mut RamDocumentStore, - document_fields_counts: &'a mut HashMap<(DocumentId, SchemaAttr), u64>, + document_store: DocumentsFields, + document_fields_counts: DocumentsFieldsCounts, indexer: &'a mut RawIndexer, ranked_map: &'a mut RankedMap, } @@ -268,16 +275,20 @@ impl<'a> ser::SerializeStruct for StructSerializer<'a> { where T: ser::Serialize, { - serialize_value( - self.schema, - self.document_id, - self.document_store, - self.document_fields_counts, - self.indexer, - self.ranked_map, - key, - value, - ) + match self.schema.attribute(key) { + Some(attribute) => serialize_value( + self.txn, + attribute, + self.schema.props(attribute), + self.document_id, + self.document_store, + self.document_fields_counts, + self.indexer, + self.ranked_map, + value, + ), + None => Ok(()), + } } fn end(self) -> Result { @@ -285,40 +296,42 @@ impl<'a> ser::SerializeStruct for StructSerializer<'a> { } } -fn serialize_value( - schema: &Schema, +pub fn serialize_value( + txn: &mut heed::RwTxn, + attribute: SchemaAttr, + props: SchemaProps, document_id: DocumentId, - document_store: &mut RamDocumentStore, - documents_fields_counts: &mut HashMap<(DocumentId, SchemaAttr), u64>, + document_store: DocumentsFields, + documents_fields_counts: DocumentsFieldsCounts, indexer: &mut RawIndexer, ranked_map: &mut RankedMap, - key: &str, value: &T, ) -> Result<(), SerializerError> where T: ser::Serialize, { - if let Some(attribute) = schema.attribute(key) { - let props = schema.props(attribute); + let serialized = serde_json::to_vec(value)?; + document_store.put_document_field(txn, document_id, attribute, &serialized)?; - let serialized = serde_json::to_vec(value)?; - document_store.set_document_field(document_id, attribute, serialized); - - if props.is_indexed() { - let indexer = Indexer { - attribute, - indexer, + if props.is_indexed() { + let indexer = Indexer { + attribute, + indexer, + document_id, + }; + if let Some(number_of_words) = value.serialize(indexer)? { + documents_fields_counts.put_document_field_count( + txn, document_id, - }; - if let Some(number_of_words) = value.serialize(indexer)? { - documents_fields_counts.insert((document_id, attribute), number_of_words as u64); - } + attribute, + number_of_words as u64, + )?; } + } - if props.is_ranked() { - let number = value.serialize(ConvertToNumber)?; - ranked_map.insert(document_id, attribute, number); - } + if props.is_ranked() { + let number = value.serialize(ConvertToNumber)?; + ranked_map.insert(document_id, attribute, number); } Ok(()) diff --git a/meilidb-core/src/store/documents_fields_counts.rs b/meilidb-core/src/store/documents_fields_counts.rs index 5b7853ed6..9ba9c4478 100644 --- a/meilidb-core/src/store/documents_fields_counts.rs +++ b/meilidb-core/src/store/documents_fields_counts.rs @@ -121,7 +121,7 @@ pub struct AllDocumentsFieldsCountsIter<'txn> { iter: heed::RoIter<'txn, OwnedType, OwnedType>, } -impl<'r> Iterator for AllDocumentsFieldsCountsIter<'r> { +impl Iterator for AllDocumentsFieldsCountsIter<'_> { type Item = ZResult<(DocumentId, SchemaAttr, u64)>; fn next(&mut self) -> Option { diff --git a/meilidb-core/src/store/postings_lists.rs b/meilidb-core/src/store/postings_lists.rs index 8f8bc919e..7f886b491 100644 --- a/meilidb-core/src/store/postings_lists.rs +++ b/meilidb-core/src/store/postings_lists.rs @@ -23,6 +23,10 @@ impl PostingsLists { self.postings_lists.delete(writer, word) } + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.postings_lists.clear(writer) + } + pub fn postings_list<'txn>( self, reader: &'txn heed::RoTxn, diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 656dcba5b..1b02b5598 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -5,7 +5,7 @@ use sdset::{duo::Union, SetOperation}; use serde::Serialize; use crate::raw_indexer::RawIndexer; -use crate::serde::{extract_document_id, RamDocumentStore, Serializer}; +use crate::serde::{extract_document_id, serialize_value, Serializer}; use crate::store; use crate::update::{apply_documents_deletion, next_update_id, Update}; use crate::{Error, MResult, RankedMap}; @@ -84,12 +84,9 @@ pub fn apply_documents_addition( documents_fields_counts_store: store::DocumentsFieldsCounts, postings_lists_store: store::PostingsLists, docs_words_store: store::DocsWords, - mut ranked_map: RankedMap, addition: Vec, ) -> MResult<()> { - let mut document_ids = HashSet::new(); - let mut document_store = RamDocumentStore::new(); - let mut document_fields_counts = HashMap::new(); + let mut documents_ids = HashSet::new(); let mut indexer = RawIndexer::new(); let schema = match main_store.schema(writer)? { @@ -99,20 +96,47 @@ pub fn apply_documents_addition( let identifier = schema.identifier_name(); + // 1. store documents ids for future deletion + for document in addition.iter() { + let document_id = match extract_document_id(identifier, &document)? { + Some(id) => id, + None => return Err(Error::MissingDocumentId), + }; + + if !documents_ids.insert(document_id) { + return Err(Error::DuplicateDocument); + } + } + + // 2. remove the documents posting lists + let number_of_inserted_documents = documents_ids.len(); + apply_documents_deletion( + writer, + main_store, + documents_fields_store, + documents_fields_counts_store, + postings_lists_store, + docs_words_store, + documents_ids.into_iter().collect(), + )?; + + let mut ranked_map = match main_store.ranked_map(writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; + + // 3. index the documents fields in the stores for document in addition { let document_id = match extract_document_id(identifier, &document)? { Some(id) => id, None => return Err(Error::MissingDocumentId), }; - // 1. store the document id for future deletion - document_ids.insert(document_id); - - // 2. index the document fields in ram stores let serializer = Serializer { + txn: writer, schema: &schema, - document_store: &mut document_store, - document_fields_counts: &mut document_fields_counts, + document_store: documents_fields_store, + document_fields_counts: documents_fields_counts_store, indexer: &mut indexer, ranked_map: &mut ranked_map, document_id, @@ -121,29 +145,93 @@ pub fn apply_documents_addition( document.serialize(serializer)?; } - // 1. remove the previous documents match indexes - let documents_to_insert = document_ids.iter().cloned().collect(); - apply_documents_deletion( + write_documents_addition_index( writer, main_store, - documents_fields_store, - documents_fields_counts_store, postings_lists_store, docs_words_store, - ranked_map.clone(), - documents_to_insert, - )?; + ranked_map, + number_of_inserted_documents, + indexer, + ) +} - // 2. insert new document attributes in the database - for ((id, attr), value) in document_store.into_inner() { - documents_fields_store.put_document_field(writer, id, attr, &value)?; +pub fn reindex_all_documents( + writer: &mut heed::RwTxn, + main_store: store::Main, + documents_fields_store: store::DocumentsFields, + documents_fields_counts_store: store::DocumentsFieldsCounts, + postings_lists_store: store::PostingsLists, + docs_words_store: store::DocsWords, +) -> MResult<()> { + let schema = match main_store.schema(writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + + let mut ranked_map = RankedMap::default(); + + // 1. retrieve all documents ids + let mut documents_ids_to_reindex = Vec::new(); + for result in documents_fields_counts_store.documents_ids(writer)? { + let document_id = result?; + documents_ids_to_reindex.push(document_id); } - // 3. insert new document attributes counts - for ((id, attr), count) in document_fields_counts { - documents_fields_counts_store.put_document_field_count(writer, id, attr, count)?; + // 2. remove the documents posting lists + let number_of_inserted_documents = documents_ids_to_reindex.len(); + main_store.put_words_fst(writer, &fst::Set::default())?; + main_store.put_ranked_map(writer, &ranked_map)?; + main_store.put_number_of_documents(writer, |_| 0)?; + postings_lists_store.clear(writer)?; + + // 3. re-index one document by one document (otherwise we make the borrow checker unhappy) + let mut indexer = RawIndexer::new(); + let mut ram_store = HashMap::new(); + + for document_id in documents_ids_to_reindex { + for result in documents_fields_store.document_fields(writer, document_id)? { + let (attr, bytes) = result?; + let value: serde_json::Value = serde_json::from_slice(bytes)?; + ram_store.insert((document_id, attr), value); + } + + for ((docid, attr), value) in ram_store.drain() { + serialize_value( + writer, + attr, + schema.props(attr), + docid, + documents_fields_store, + documents_fields_counts_store, + &mut indexer, + &mut ranked_map, + &value, + )?; + } } + // 4. write the new index in the main store + write_documents_addition_index( + writer, + main_store, + postings_lists_store, + docs_words_store, + ranked_map, + number_of_inserted_documents, + indexer, + ) +} + +pub fn write_documents_addition_index( + writer: &mut heed::RwTxn, + main_store: store::Main, + postings_lists_store: store::PostingsLists, + docs_words_store: store::DocsWords, + ranked_map: RankedMap, + number_of_inserted_documents: usize, + indexer: RawIndexer, +) -> MResult<()> { let indexed = indexer.build(); let mut delta_words_builder = SetBuilder::memory(); @@ -186,9 +274,7 @@ pub fn apply_documents_addition( main_store.put_words_fst(writer, &words)?; main_store.put_ranked_map(writer, &ranked_map)?; - - let inserted_documents_len = document_ids.len() as u64; - main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?; + main_store.put_number_of_documents(writer, |old| old + number_of_inserted_documents as u64)?; Ok(()) } diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index e1ec71ddc..31a9f4927 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -88,7 +88,6 @@ pub fn apply_documents_deletion( documents_fields_counts_store: store::DocumentsFieldsCounts, postings_lists_store: store::PostingsLists, docs_words_store: store::DocsWords, - mut ranked_map: RankedMap, deletion: Vec, ) -> MResult<()> { let idset = SetBuf::from_dirty(deletion); @@ -98,6 +97,11 @@ pub fn apply_documents_deletion( None => return Err(Error::SchemaMissing), }; + let mut ranked_map = match main_store.ranked_map(writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; + // collect the ranked attributes according to the schema let ranked_attrs: Vec<_> = schema .iter() @@ -181,7 +185,6 @@ pub fn apply_documents_deletion( main_store.put_words_fst(writer, &words)?; main_store.put_ranked_map(writer, &ranked_map)?; - main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?; Ok(()) diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 8958074ec..1e8a95621 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -20,7 +20,7 @@ use heed::Result as ZResult; use log::debug; use serde::{Deserialize, Serialize}; -use crate::{store, DocumentId, MResult, RankedMap}; +use crate::{store, DocumentId, MResult}; use meilidb_schema::Schema; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -113,7 +113,15 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult MResult { let start = Instant::now(); - let ranked_map = match index.main.ranked_map(writer)? { - Some(ranked_map) => ranked_map, - None => RankedMap::default(), - }; - let update_type = UpdateType::DocumentsAddition { number: documents.len(), }; @@ -144,7 +147,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult MResult { let start = Instant::now(); - let ranked_map = match index.main.ranked_map(writer)? { - Some(ranked_map) => ranked_map, - None => RankedMap::default(), - }; - let update_type = UpdateType::DocumentsDeletion { number: documents.len(), }; @@ -169,7 +166,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult MResult<()> { - if main_store.schema(writer)?.is_some() { - return Err(UnsupportedOperation::SchemaAlreadyExists.into()); + use UnsupportedOperation::{ + CannotIntroduceNewSchemaAttribute, CannotRemoveSchemaAttribute, + CannotReorderSchemaAttribute, CannotUpdateSchemaIdentifier, + }; + + let mut need_full_reindexing = false; + + if let Some(old_schema) = main_store.schema(writer)? { + for diff in meilidb_schema::diff(&old_schema, new_schema) { + match diff { + Diff::IdentChange { .. } => return Err(CannotUpdateSchemaIdentifier.into()), + Diff::AttrMove { .. } => return Err(CannotReorderSchemaAttribute.into()), + Diff::AttrPropsChange { old, new, .. } => { + if new.indexed != old.indexed { + need_full_reindexing = true; + } + if new.ranked != old.ranked { + need_full_reindexing = true; + } + } + Diff::NewAttr { .. } => return Err(CannotIntroduceNewSchemaAttribute.into()), + Diff::RemovedAttr { .. } => return Err(CannotRemoveSchemaAttribute.into()), + } + } } - main_store - .put_schema(writer, new_schema) - .map_err(Into::into) + main_store.put_schema(writer, new_schema)?; + + if need_full_reindexing { + reindex_all_documents( + writer, + main_store, + documents_fields_store, + documents_fields_counts_store, + postings_lists_store, + docs_words_store, + )? + } + + Ok(()) } pub fn push_schema_update(