introduce a new schemaless way

This commit is contained in:
qdequele 2020-01-13 19:10:58 +01:00
parent bbe1845f66
commit 130fb74928
No known key found for this signature in database
GPG key ID: B3F0A000EBF11745
22 changed files with 365 additions and 418 deletions

View file

@ -1,14 +1,13 @@
use std::collections::{HashMap, BTreeSet};
use std::collections::HashMap;
use fst::{set::OpBuilder, SetBuilder};
use sdset::{duo::Union, SetOperation};
use serde::{Deserialize, Serialize};
use meilisearch_schema::{Schema, DISPLAYED, INDEXED};
use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer};
use crate::serde::{extract_document_id, serialize_value_with_id, Deserializer, Serializer};
use crate::store;
use crate::update::{apply_documents_deletion, compute_short_prefixes, next_update_id, Update};
use crate::{Error, MResult, RankedMap};
@ -115,16 +114,11 @@ pub fn apply_documents_addition<'a, 'b>(
None => return Err(Error::SchemaMissing),
};
if let Some(new_schema) = lazy_new_schema(&schema, &addition) {
main_store.put_schema(writer, &new_schema)?;
schema = new_schema;
}
let identifier = schema.identifier_name();
let identifier = schema.identifier();
// 1. store documents ids for future deletion
for document in addition {
let document_id = match extract_document_id(identifier, &document)? {
let document_id = match extract_document_id(&identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
@ -147,8 +141,6 @@ pub fn apply_documents_addition<'a, 'b>(
None => fst::Set::default(),
};
let mut fields_map = main_store.fields_map(writer)?.unwrap_or_default();
// 3. index the documents fields in the stores
let mut indexer = RawIndexer::new(stop_words);
@ -160,7 +152,6 @@ pub fn apply_documents_addition<'a, 'b>(
document_fields_counts: index.documents_fields_counts,
indexer: &mut indexer,
ranked_map: &mut ranked_map,
fields_map: &mut fields_map,
document_id,
};
@ -192,16 +183,11 @@ pub fn apply_documents_partial_addition<'a, 'b>(
None => return Err(Error::SchemaMissing),
};
if let Some(new_schema) = lazy_new_schema(&schema, &addition) {
main_store.put_schema(writer, &new_schema)?;
schema = new_schema;
}
let identifier = schema.identifier_name();
let identifier = schema.identifier();
// 1. store documents ids for future deletion
for mut document in addition {
let document_id = match extract_document_id(identifier, &document)? {
let document_id = match extract_document_id(&identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
@ -241,8 +227,6 @@ pub fn apply_documents_partial_addition<'a, 'b>(
None => fst::Set::default(),
};
let mut fields_map = main_store.fields_map(writer)?.unwrap_or_default();
// 3. index the documents fields in the stores
let mut indexer = RawIndexer::new(stop_words);
@ -254,7 +238,6 @@ pub fn apply_documents_partial_addition<'a, 'b>(
document_fields_counts: index.documents_fields_counts,
indexer: &mut indexer,
ranked_map: &mut ranked_map,
fields_map: &mut fields_map,
document_id,
};
@ -281,7 +264,6 @@ pub fn reindex_all_documents(writer: &mut heed::RwTxn<MainT>, index: &store::Ind
};
let mut ranked_map = RankedMap::default();
let mut fields_map = main_store.fields_map(writer)?.unwrap_or_default();
// 1. retrieve all documents ids
let mut documents_ids_to_reindex = Vec::new();
@ -312,21 +294,20 @@ pub fn reindex_all_documents(writer: &mut heed::RwTxn<MainT>, index: &store::Ind
for result in index.documents_fields.document_fields(writer, *document_id)? {
let (attr, bytes) = result?;
let value: serde_json::Value = serde_json::from_slice(bytes)?;
ram_store.insert((document_id, attr), value);
ram_store.insert((document_id, field_id), value);
}
for ((docid, attr), value) in ram_store.drain() {
serialize_value(
for ((docid, field_id), value) in ram_store.drain() {
serialize_value_with_id(
writer,
attr,
schema.props(attr),
field_id,
&schema,
*docid,
index.documents_fields,
index.documents_fields_counts,
&mut indexer,
&mut ranked_map,
&mut fields_map,
&value,
&value
)?;
}
}
@ -401,30 +382,3 @@ pub fn write_documents_addition_index(
Ok(())
}
pub fn lazy_new_schema(
schema: &Schema,
documents: &[HashMap<String, serde_json::Value>],
) -> Option<Schema> {
let mut attributes_to_add = BTreeSet::new();
for document in documents {
for (key, _) in document {
if schema.attribute(key).is_none() {
attributes_to_add.insert(key);
}
}
}
if attributes_to_add.is_empty() {
return None
}
let mut schema_builder = schema.to_builder();
for attribute in attributes_to_add {
schema_builder.new_attribute(attribute, DISPLAYED | INDEXED);
}
let schema = schema_builder.build();
Some(schema)
}

View file

@ -40,8 +40,8 @@ impl DocumentsDeletion {
where
D: serde::Serialize,
{
let identifier = schema.identifier_name();
let document_id = match extract_document_id(identifier, &document)? {
let identifier = schema.identifier();
let document_id = match extract_document_id(&identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
@ -101,18 +101,7 @@ pub fn apply_documents_deletion(
};
// collect the ranked attributes according to the schema
let ranked_attrs: Vec<_> = schema
.iter()
.filter_map(
|(_, attr, prop)| {
if prop.is_ranked() {
Some(attr)
} else {
None
}
},
)
.collect();
let ranked_attrs = schema.get_ranked();
let mut words_document_ids = HashMap::new();
for id in idset {

View file

@ -1,16 +1,15 @@
use std::collections::{HashMap, BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet};
use heed::Result as ZResult;
use fst::{set::OpBuilder, SetBuilder};
use sdset::SetBuf;
use meilisearch_schema::{Schema, SchemaAttr, diff_transposition, generate_schema};
use meilisearch_schema::Schema;
use crate::database::{MainT, UpdateT};
use crate::settings::{UpdateState, SettingsUpdate};
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{store, MResult};
use crate::{store, MResult, Error};
pub fn push_settings_update(
writer: &mut heed::RwTxn<UpdateT>,
@ -35,7 +34,17 @@ pub fn apply_settings_update(
let mut must_reindex = false;
let old_schema = index.main.schema(writer)?;
let mut schema = match index.main.schema(writer)? {
Some(schema) => schema,
None => {
match settings.attribute_identifier.clone() {
UpdateState::Update(id) => Schema::with_identifier(id),
_ => return Err(Error::MissingSchemaIdentifier)
}
}
};
println!("settings: {:?}", settings);
match settings.ranking_rules {
UpdateState::Update(v) => {
@ -55,157 +64,69 @@ pub fn apply_settings_update(
},
_ => (),
}
let identifier = match settings.attribute_identifier.clone() {
UpdateState::Update(v) => v,
_ => {
old_schema.clone().unwrap().identifier_name().to_owned()
},
if let UpdateState::Update(id) = settings.attribute_identifier {
schema.set_identifier(id)?;
};
let attributes_searchable: Vec<String> = match settings.attributes_searchable.clone() {
UpdateState::Update(v) => v,
UpdateState::Clear => Vec::new(),
UpdateState::Nothing => {
match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_indexed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
}
match settings.attributes_searchable.clone() {
UpdateState::Update(v) => schema.update_indexed(v)?,
UpdateState::Clear => {
let clear: Vec<String> = Vec::new();
schema.update_indexed(clear)?;
},
UpdateState::Nothing => (),
UpdateState::Add(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_indexed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
if !old_attrs.contains(&attr) {
old_attrs.push(attr);
}
schema.set_indexed(attr)?;
}
old_attrs
},
UpdateState::Delete(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_indexed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
old_attrs.retain(|x| *x == attr)
schema.remove_indexed(attr);
}
old_attrs
}
};
let attributes_displayed: Vec<String> = match settings.attributes_displayed.clone() {
UpdateState::Update(v) => v,
UpdateState::Clear => Vec::new(),
UpdateState::Nothing => {
match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_displayed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
}
match settings.attributes_displayed.clone() {
UpdateState::Update(v) => schema.update_displayed(v)?,
UpdateState::Clear => {
let clear: Vec<String> = Vec::new();
schema.update_displayed(clear)?;
},
UpdateState::Nothing => (),
UpdateState::Add(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_displayed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
if !old_attrs.contains(&attr) {
old_attrs.push(attr);
}
schema.set_displayed(attr)?;
}
old_attrs
},
UpdateState::Delete(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_displayed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
old_attrs.retain(|x| *x == attr)
schema.remove_displayed(attr);
}
old_attrs
}
};
let attributes_ranked: Vec<String> = match settings.attributes_ranked.clone() {
UpdateState::Update(v) => v,
UpdateState::Clear => Vec::new(),
UpdateState::Nothing => {
match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_ranked())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
}
match settings.attributes_ranked.clone() {
UpdateState::Update(v) => schema.update_ranked(v)?,
UpdateState::Clear => {
let clear: Vec<String> = Vec::new();
schema.update_ranked(clear)?;
},
UpdateState::Nothing => (),
UpdateState::Add(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_ranked())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
if !old_attrs.contains(&attr) {
old_attrs.push(attr);
}
schema.set_ranked(attr)?;
}
old_attrs
},
UpdateState::Delete(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_ranked())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
old_attrs.retain(|x| *x == attr)
schema.remove_ranked(attr);
}
old_attrs
}
};
let new_schema = generate_schema(identifier, attributes_searchable, attributes_displayed, attributes_ranked);
index.main.put_schema(writer, &schema)?;
index.main.put_schema(writer, &new_schema)?;
println!("schema: {:?}", schema);
match settings.stop_words {
UpdateState::Update(stop_words) => {
@ -233,16 +154,6 @@ pub fn apply_settings_update(
let postings_lists_store = index.postings_lists;
let docs_words_store = index.docs_words;
if settings.attribute_identifier.is_changed() ||
settings.attributes_ranked.is_changed() ||
settings.attributes_searchable.is_changed() ||
settings.attributes_displayed.is_changed()
{
if let Some(old_schema) = old_schema {
rewrite_all_documents(writer, index, &old_schema, &new_schema)?;
must_reindex = true;
}
}
if must_reindex {
reindex_all_documents(
writer,
@ -438,46 +349,3 @@ pub fn apply_synonyms_update(
Ok(())
}
pub fn rewrite_all_documents(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
old_schema: &Schema,
new_schema: &Schema,
) -> MResult<()> {
let mut documents_ids_to_reindex = Vec::new();
// Retrieve all documents present on the database
for result in index.documents_fields_counts.documents_ids(writer)? {
let document_id = result?;
documents_ids_to_reindex.push(document_id);
}
let transpotition = diff_transposition(old_schema, new_schema);
// Rewrite all documents one by one
for id in documents_ids_to_reindex {
let mut document: HashMap<SchemaAttr, Vec<u8>> = HashMap::new();
// Retrieve the old document
for item in index.documents_fields.document_fields(writer, id)? {
if let Ok(item) = item {
if let Some(pos) = transpotition[(item.0).0 as usize] {
// Save the current document with the new SchemaAttr
document.insert(SchemaAttr::new(pos), item.1.to_vec());
}
}
}
// Remove the current document
index.documents_fields.del_all_document_fields(writer, id)?;
// Rewrite the new document
// TODO: use cursor to not do memory jump at each call
for (key, value) in document {
index.documents_fields.put_document_field(writer, id, key, &value)?;
}
}
Ok(())
}