Merge pull request #181 from meilisearch/diff-schema

Make possible to update an index schema
This commit is contained in:
Clément Renault 2019-10-22 14:23:43 +02:00 committed by GitHub
commit 5dc8465ebd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 403 additions and 130 deletions

View File

@ -12,6 +12,7 @@ pub enum Error {
SchemaMissing,
WordIndexMissing,
MissingDocumentId,
DuplicateDocument,
Zlmdb(heed::Error),
Fst(fst::Error),
SerdeJson(SerdeJsonError),
@ -79,6 +80,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"),
DuplicateDocument => write!(f, "update contains documents with the same id"),
Zlmdb(e) => write!(f, "heed error; {}", e),
Fst(e) => write!(f, "fst error; {}", e),
SerdeJson(e) => write!(f, "serde json error; {}", e),
@ -95,6 +97,10 @@ impl error::Error for Error {}
#[derive(Debug)]
pub enum UnsupportedOperation {
SchemaAlreadyExists,
CannotUpdateSchemaIdentifier,
CannotReorderSchemaAttribute,
CannotIntroduceNewSchemaAttribute,
CannotRemoveSchemaAttribute,
}
impl fmt::Display for UnsupportedOperation {
@ -102,6 +108,12 @@ impl fmt::Display for UnsupportedOperation {
use self::UnsupportedOperation::*;
match self {
SchemaAlreadyExists => write!(f, "Cannot update index which already have a schema"),
CannotUpdateSchemaIdentifier => write!(f, "Cannot update the identifier of a schema"),
CannotReorderSchemaAttribute => write!(f, "Cannot reorder the attributes of a schema"),
CannotIntroduceNewSchemaAttribute => {
write!(f, "Cannot introduce new attributes in a schema")
}
CannotRemoveSchemaAttribute => write!(f, "Cannot remove attributes from a schema"),
}
}
}

View File

@ -20,16 +20,14 @@ pub use self::convert_to_string::ConvertToString;
pub use self::deserializer::{Deserializer, DeserializerError};
pub use self::extract_document_id::{compute_document_id, extract_document_id, value_to_string};
pub use self::indexer::Indexer;
pub use self::serializer::Serializer;
pub use self::serializer::{serialize_value, Serializer};
use std::collections::BTreeMap;
use std::{error::Error, fmt};
use meilidb_schema::SchemaAttr;
use serde::ser;
use serde_json::Error as SerdeJsonError;
use crate::{DocumentId, ParseNumberError};
use crate::ParseNumberError;
#[derive(Debug)]
pub enum SerializerError {
@ -103,25 +101,3 @@ impl From<ParseNumberError> for SerializerError {
SerializerError::ParseNumber(error)
}
}
pub struct RamDocumentStore(BTreeMap<(DocumentId, SchemaAttr), Vec<u8>>);
impl RamDocumentStore {
pub fn new() -> RamDocumentStore {
RamDocumentStore(BTreeMap::new())
}
pub fn set_document_field(&mut self, id: DocumentId, attr: SchemaAttr, value: Vec<u8>) {
self.0.insert((id, attr), value);
}
pub fn into_inner(self) -> BTreeMap<(DocumentId, SchemaAttr), Vec<u8>> {
self.0
}
}
impl Default for RamDocumentStore {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,17 +1,17 @@
use meilidb_schema::{Schema, SchemaAttr};
use meilidb_schema::{Schema, SchemaAttr, SchemaProps};
use serde::ser;
use std::collections::HashMap;
use crate::raw_indexer::RawIndexer;
use crate::serde::RamDocumentStore;
use crate::store::{DocumentsFields, DocumentsFieldsCounts};
use crate::{DocumentId, RankedMap};
use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError};
pub struct Serializer<'a> {
pub txn: &'a mut heed::RwTxn,
pub schema: &'a Schema,
pub document_store: &'a mut RamDocumentStore,
pub document_fields_counts: &'a mut HashMap<(DocumentId, SchemaAttr), u64>,
pub document_store: DocumentsFields,
pub document_fields_counts: DocumentsFieldsCounts,
pub indexer: &'a mut RawIndexer,
pub ranked_map: &'a mut RankedMap,
pub document_id: DocumentId,
@ -150,6 +150,7 @@ impl<'a> ser::Serializer for Serializer<'a> {
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Ok(MapSerializer {
txn: self.txn,
schema: self.schema,
document_id: self.document_id,
document_store: self.document_store,
@ -166,6 +167,7 @@ impl<'a> ser::Serializer for Serializer<'a> {
_len: usize,
) -> Result<Self::SerializeStruct, Self::Error> {
Ok(StructSerializer {
txn: self.txn,
schema: self.schema,
document_id: self.document_id,
document_store: self.document_store,
@ -189,10 +191,11 @@ impl<'a> ser::Serializer for Serializer<'a> {
}
pub struct MapSerializer<'a> {
txn: &'a mut heed::RwTxn,
schema: &'a Schema,
document_id: DocumentId,
document_store: &'a mut RamDocumentStore,
document_fields_counts: &'a mut HashMap<(DocumentId, SchemaAttr), u64>,
document_store: DocumentsFields,
document_fields_counts: DocumentsFieldsCounts,
indexer: &'a mut RawIndexer,
ranked_map: &'a mut RankedMap,
current_key_name: Option<String>,
@ -229,17 +232,20 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> {
V: ser::Serialize,
{
let key = key.serialize(ConvertToString)?;
serialize_value(
self.schema,
match self.schema.attribute(&key) {
Some(attribute) => serialize_value(
self.txn,
attribute,
self.schema.props(attribute),
self.document_id,
self.document_store,
self.document_fields_counts,
self.indexer,
self.ranked_map,
&key,
value,
)
),
None => Ok(()),
}
}
fn end(self) -> Result<Self::Ok, Self::Error> {
@ -248,10 +254,11 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> {
}
pub struct StructSerializer<'a> {
txn: &'a mut heed::RwTxn,
schema: &'a Schema,
document_id: DocumentId,
document_store: &'a mut RamDocumentStore,
document_fields_counts: &'a mut HashMap<(DocumentId, SchemaAttr), u64>,
document_store: DocumentsFields,
document_fields_counts: DocumentsFieldsCounts,
indexer: &'a mut RawIndexer,
ranked_map: &'a mut RankedMap,
}
@ -268,16 +275,20 @@ impl<'a> ser::SerializeStruct for StructSerializer<'a> {
where
T: ser::Serialize,
{
serialize_value(
self.schema,
match self.schema.attribute(key) {
Some(attribute) => serialize_value(
self.txn,
attribute,
self.schema.props(attribute),
self.document_id,
self.document_store,
self.document_fields_counts,
self.indexer,
self.ranked_map,
key,
value,
)
),
None => Ok(()),
}
}
fn end(self) -> Result<Self::Ok, Self::Error> {
@ -285,24 +296,22 @@ impl<'a> ser::SerializeStruct for StructSerializer<'a> {
}
}
fn serialize_value<T: ?Sized>(
schema: &Schema,
pub fn serialize_value<T: ?Sized>(
txn: &mut heed::RwTxn,
attribute: SchemaAttr,
props: SchemaProps,
document_id: DocumentId,
document_store: &mut RamDocumentStore,
documents_fields_counts: &mut HashMap<(DocumentId, SchemaAttr), u64>,
document_store: DocumentsFields,
documents_fields_counts: DocumentsFieldsCounts,
indexer: &mut RawIndexer,
ranked_map: &mut RankedMap,
key: &str,
value: &T,
) -> Result<(), SerializerError>
where
T: ser::Serialize,
{
if let Some(attribute) = schema.attribute(key) {
let props = schema.props(attribute);
let serialized = serde_json::to_vec(value)?;
document_store.set_document_field(document_id, attribute, serialized);
document_store.put_document_field(txn, document_id, attribute, &serialized)?;
if props.is_indexed() {
let indexer = Indexer {
@ -311,7 +320,12 @@ where
document_id,
};
if let Some(number_of_words) = value.serialize(indexer)? {
documents_fields_counts.insert((document_id, attribute), number_of_words as u64);
documents_fields_counts.put_document_field_count(
txn,
document_id,
attribute,
number_of_words as u64,
)?;
}
}
@ -319,7 +333,6 @@ where
let number = value.serialize(ConvertToNumber)?;
ranked_map.insert(document_id, attribute, number);
}
}
Ok(())
}

View File

@ -121,7 +121,7 @@ pub struct AllDocumentsFieldsCountsIter<'txn> {
iter: heed::RoIter<'txn, OwnedType<DocumentAttrKey>, OwnedType<u64>>,
}
impl<'r> Iterator for AllDocumentsFieldsCountsIter<'r> {
impl Iterator for AllDocumentsFieldsCountsIter<'_> {
type Item = ZResult<(DocumentId, SchemaAttr, u64)>;
fn next(&mut self) -> Option<Self::Item> {

View File

@ -23,6 +23,10 @@ impl PostingsLists {
self.postings_lists.delete(writer, word)
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.postings_lists.clear(writer)
}
pub fn postings_list<'txn>(
self,
reader: &'txn heed::RoTxn,

View File

@ -5,7 +5,7 @@ use sdset::{duo::Union, SetOperation};
use serde::Serialize;
use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, RamDocumentStore, Serializer};
use crate::serde::{extract_document_id, serialize_value, Serializer};
use crate::store;
use crate::update::{apply_documents_deletion, next_update_id, Update};
use crate::{Error, MResult, RankedMap};
@ -84,12 +84,9 @@ pub fn apply_documents_addition(
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
mut ranked_map: RankedMap,
addition: Vec<serde_json::Value>,
) -> MResult<()> {
let mut document_ids = HashSet::new();
let mut document_store = RamDocumentStore::new();
let mut document_fields_counts = HashMap::new();
let mut documents_ids = HashSet::new();
let mut indexer = RawIndexer::new();
let schema = match main_store.schema(writer)? {
@ -99,20 +96,47 @@ pub fn apply_documents_addition(
let identifier = schema.identifier_name();
// 1. store documents ids for future deletion
for document in addition.iter() {
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
if !documents_ids.insert(document_id) {
return Err(Error::DuplicateDocument);
}
}
// 2. remove the documents posting lists
let number_of_inserted_documents = documents_ids.len();
apply_documents_deletion(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
documents_ids.into_iter().collect(),
)?;
let mut ranked_map = match main_store.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
// 3. index the documents fields in the stores
for document in addition {
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
// 1. store the document id for future deletion
document_ids.insert(document_id);
// 2. index the document fields in ram stores
let serializer = Serializer {
txn: writer,
schema: &schema,
document_store: &mut document_store,
document_fields_counts: &mut document_fields_counts,
document_store: documents_fields_store,
document_fields_counts: documents_fields_counts_store,
indexer: &mut indexer,
ranked_map: &mut ranked_map,
document_id,
@ -121,29 +145,93 @@ pub fn apply_documents_addition(
document.serialize(serializer)?;
}
// 1. remove the previous documents match indexes
let documents_to_insert = document_ids.iter().cloned().collect();
apply_documents_deletion(
write_documents_addition_index(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
ranked_map.clone(),
documents_to_insert,
ranked_map,
number_of_inserted_documents,
indexer,
)
}
pub fn reindex_all_documents(
writer: &mut heed::RwTxn,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
) -> MResult<()> {
let schema = match main_store.schema(writer)? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let mut ranked_map = RankedMap::default();
// 1. retrieve all documents ids
let mut documents_ids_to_reindex = Vec::new();
for result in documents_fields_counts_store.documents_ids(writer)? {
let document_id = result?;
documents_ids_to_reindex.push(document_id);
}
// 2. remove the documents posting lists
let number_of_inserted_documents = documents_ids_to_reindex.len();
main_store.put_words_fst(writer, &fst::Set::default())?;
main_store.put_ranked_map(writer, &ranked_map)?;
main_store.put_number_of_documents(writer, |_| 0)?;
postings_lists_store.clear(writer)?;
// 3. re-index one document by one document (otherwise we make the borrow checker unhappy)
let mut indexer = RawIndexer::new();
let mut ram_store = HashMap::new();
for document_id in documents_ids_to_reindex {
for result in documents_fields_store.document_fields(writer, document_id)? {
let (attr, bytes) = result?;
let value: serde_json::Value = serde_json::from_slice(bytes)?;
ram_store.insert((document_id, attr), value);
}
for ((docid, attr), value) in ram_store.drain() {
serialize_value(
writer,
attr,
schema.props(attr),
docid,
documents_fields_store,
documents_fields_counts_store,
&mut indexer,
&mut ranked_map,
&value,
)?;
// 2. insert new document attributes in the database
for ((id, attr), value) in document_store.into_inner() {
documents_fields_store.put_document_field(writer, id, attr, &value)?;
}
}
// 3. insert new document attributes counts
for ((id, attr), count) in document_fields_counts {
documents_fields_counts_store.put_document_field_count(writer, id, attr, count)?;
// 4. write the new index in the main store
write_documents_addition_index(
writer,
main_store,
postings_lists_store,
docs_words_store,
ranked_map,
number_of_inserted_documents,
indexer,
)
}
pub fn write_documents_addition_index(
writer: &mut heed::RwTxn,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
ranked_map: RankedMap,
number_of_inserted_documents: usize,
indexer: RawIndexer,
) -> MResult<()> {
let indexed = indexer.build();
let mut delta_words_builder = SetBuilder::memory();
@ -186,9 +274,7 @@ pub fn apply_documents_addition(
main_store.put_words_fst(writer, &words)?;
main_store.put_ranked_map(writer, &ranked_map)?;
let inserted_documents_len = document_ids.len() as u64;
main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?;
main_store.put_number_of_documents(writer, |old| old + number_of_inserted_documents as u64)?;
Ok(())
}

View File

@ -88,7 +88,6 @@ pub fn apply_documents_deletion(
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
mut ranked_map: RankedMap,
deletion: Vec<DocumentId>,
) -> MResult<()> {
let idset = SetBuf::from_dirty(deletion);
@ -98,6 +97,11 @@ pub fn apply_documents_deletion(
None => return Err(Error::SchemaMissing),
};
let mut ranked_map = match main_store.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
// collect the ranked attributes according to the schema
let ranked_attrs: Vec<_> = schema
.iter()
@ -181,7 +185,6 @@ pub fn apply_documents_deletion(
main_store.put_words_fst(writer, &words)?;
main_store.put_ranked_map(writer, &ranked_map)?;
main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
Ok(())

View File

@ -20,7 +20,7 @@ use heed::Result as ZResult;
use log::debug;
use serde::{Deserialize, Serialize};
use crate::{store, DocumentId, MResult, RankedMap};
use crate::{store, DocumentId, MResult};
use meilidb_schema::Schema;
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -113,7 +113,15 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
let update_type = UpdateType::Schema {
schema: schema.clone(),
};
let result = apply_schema_update(writer, index.main, &schema);
let result = apply_schema_update(
writer,
&schema,
index.main,
index.documents_fields,
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
);
(update_type, result, start.elapsed())
}
@ -128,11 +136,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
Update::DocumentsAddition(documents) => {
let start = Instant::now();
let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
let update_type = UpdateType::DocumentsAddition {
number: documents.len(),
};
@ -144,7 +147,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
ranked_map,
documents,
);
@ -153,11 +155,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
Update::DocumentsDeletion(documents) => {
let start = Instant::now();
let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
let update_type = UpdateType::DocumentsDeletion {
number: documents.len(),
};
@ -169,7 +166,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
ranked_map,
documents,
);

View File

@ -1,19 +1,58 @@
use meilidb_schema::{Diff, Schema};
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{error::UnsupportedOperation, store, MResult};
use meilidb_schema::Schema;
pub fn apply_schema_update(
writer: &mut heed::RwTxn,
main_store: store::Main,
new_schema: &Schema,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
) -> MResult<()> {
if main_store.schema(writer)?.is_some() {
return Err(UnsupportedOperation::SchemaAlreadyExists.into());
use UnsupportedOperation::{
CannotIntroduceNewSchemaAttribute, CannotRemoveSchemaAttribute,
CannotReorderSchemaAttribute, CannotUpdateSchemaIdentifier,
};
let mut need_full_reindexing = false;
if let Some(old_schema) = main_store.schema(writer)? {
for diff in meilidb_schema::diff(&old_schema, new_schema) {
match diff {
Diff::IdentChange { .. } => return Err(CannotUpdateSchemaIdentifier.into()),
Diff::AttrMove { .. } => return Err(CannotReorderSchemaAttribute.into()),
Diff::AttrPropsChange { old, new, .. } => {
if new.indexed != old.indexed {
need_full_reindexing = true;
}
if new.ranked != old.ranked {
need_full_reindexing = true;
}
}
Diff::NewAttr { .. } => return Err(CannotIntroduceNewSchemaAttribute.into()),
Diff::RemovedAttr { .. } => return Err(CannotRemoveSchemaAttribute.into()),
}
}
}
main_store
.put_schema(writer, new_schema)
.map_err(Into::into)
main_store.put_schema(writer, new_schema)?;
if need_full_reindexing {
reindex_all_documents(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
)?
}
Ok(())
}
pub fn push_schema_update(

View File

@ -215,11 +215,155 @@ impl fmt::Display for SchemaAttr {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Diff {
IdentChange {
old: String,
new: String,
},
AttrMove {
name: String,
old: usize,
new: usize,
},
AttrPropsChange {
name: String,
old: SchemaProps,
new: SchemaProps,
},
NewAttr {
name: String,
pos: usize,
props: SchemaProps,
},
RemovedAttr {
name: String,
},
}
pub fn diff(old: &Schema, new: &Schema) -> Vec<Diff> {
use Diff::{AttrMove, AttrPropsChange, IdentChange, NewAttr, RemovedAttr};
let mut differences = Vec::new();
let old = old.to_builder();
let new = new.to_builder();
// check if the old identifier differs from the new one
if old.identifier != new.identifier {
let old = old.identifier;
let new = new.identifier;
differences.push(IdentChange { old, new });
}
// compare all old attributes positions
// and properties with the new ones
for (pos, (name, props)) in old.attributes.iter().enumerate() {
match new.attributes.get_full(name) {
Some((npos, _, nprops)) => {
if pos != npos {
let name = name.clone();
differences.push(AttrMove {
name,
old: pos,
new: npos,
});
}
if props != nprops {
let name = name.clone();
differences.push(AttrPropsChange {
name,
old: *props,
new: *nprops,
});
}
}
None => differences.push(RemovedAttr { name: name.clone() }),
}
}
// retrieve all attributes that
// were not present in the old schema
for (pos, (name, props)) in new.attributes.iter().enumerate() {
if !old.attributes.contains_key(name) {
let name = name.clone();
differences.push(NewAttr {
name,
pos,
props: *props,
});
}
}
differences
}
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error;
#[test]
fn difference() {
use Diff::{AttrMove, AttrPropsChange, IdentChange, NewAttr, RemovedAttr};
let mut builder = SchemaBuilder::with_identifier("id");
builder.new_attribute("alpha", DISPLAYED);
builder.new_attribute("beta", DISPLAYED | INDEXED);
builder.new_attribute("gamma", INDEXED);
builder.new_attribute("omega", INDEXED);
let old = builder.build();
let mut builder = SchemaBuilder::with_identifier("kiki");
builder.new_attribute("beta", DISPLAYED | INDEXED);
builder.new_attribute("alpha", DISPLAYED | INDEXED);
builder.new_attribute("delta", RANKED);
builder.new_attribute("gamma", DISPLAYED);
let new = builder.build();
let differences = diff(&old, &new);
let expected = &[
IdentChange {
old: format!("id"),
new: format!("kiki"),
},
AttrMove {
name: format!("alpha"),
old: 0,
new: 1,
},
AttrPropsChange {
name: format!("alpha"),
old: DISPLAYED,
new: DISPLAYED | INDEXED,
},
AttrMove {
name: format!("beta"),
old: 1,
new: 0,
},
AttrMove {
name: format!("gamma"),
old: 2,
new: 3,
},
AttrPropsChange {
name: format!("gamma"),
old: INDEXED,
new: DISPLAYED,
},
RemovedAttr {
name: format!("omega"),
},
NewAttr {
name: format!("delta"),
pos: 2,
props: RANKED,
},
];
assert_eq!(&differences, expected)
}
#[test]
fn serialize_deserialize() -> bincode::Result<()> {
let mut builder = SchemaBuilder::with_identifier("id");