Fix many indexing and searching related bugs

This commit is contained in:
Clément Renault 2019-10-08 14:53:35 +02:00
parent 2236ebbd42
commit d8d0442d63
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
9 changed files with 99 additions and 28 deletions

View File

@ -36,6 +36,7 @@ impl AutomatonProducer {
}
}
#[derive(Debug)]
pub struct Automaton {
pub index: usize,
pub ngram: usize,
@ -108,6 +109,7 @@ fn generate_automatons(
let query_words: Vec<_> = split_query_string(query).map(str::to_lowercase).collect();
let synonyms = synonym_store.synonyms_fst(reader)?;
let mut automaton_index = 0;
let mut automatons = Vec::new();
let mut enhancer_builder = QueryEnhancerBuilder::new(&query_words);
@ -121,10 +123,11 @@ fn generate_automatons(
let not_prefix_dfa = has_following_word || has_end_whitespace || word.chars().all(is_cjk);
let automaton = if not_prefix_dfa {
Automaton::exact(automatons.len(), 1, word)
Automaton::exact(automaton_index, 1, word)
} else {
Automaton::prefix_exact(automatons.len(), 1, word)
Automaton::prefix_exact(automaton_index, 1, word)
};
automaton_index += 1;
original_automatons.push(automaton);
}
@ -162,15 +165,16 @@ fn generate_automatons(
let synonyms_words: Vec<_> = split_query_string(synonyms).collect();
let nb_synonym_words = synonyms_words.len();
let real_query_index = automatons.len();
let real_query_index = automaton_index;
enhancer_builder.declare(query_range.clone(), real_query_index, &synonyms_words);
for synonym in synonyms_words {
let automaton = if nb_synonym_words == 1 {
Automaton::exact(automatons.len(), n, synonym)
Automaton::exact(automaton_index, n, synonym)
} else {
Automaton::non_exact(automatons.len(), n, synonym)
Automaton::non_exact(automaton_index, n, synonym)
};
automaton_index += 1;
automatons.push(vec![automaton]);
}
}
@ -182,10 +186,11 @@ fn generate_automatons(
let concat = ngram_slice.concat();
let normalized = normalize_str(&concat);
let real_query_index = automatons.len();
let real_query_index = automaton_index;
enhancer_builder.declare(query_range.clone(), real_query_index, &[&normalized]);
let automaton = Automaton::exact(automatons.len(), n, &normalized);
let automaton = Automaton::exact(automaton_index, n, &normalized);
automaton_index += 1;
automatons.push(vec![automaton]);
}
}

View File

@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock};
use std::{fs, thread};
use crossbeam_channel::Receiver;
use log::error;
use log::{debug, error};
use crate::{store, update, Index, MResult};
@ -23,6 +23,7 @@ fn update_awaiter(receiver: Receiver<()>, rkv: Arc<RwLock<rkv::Rkv>>, index: Ind
Ok(rkv) => rkv,
Err(e) => { error!("rkv RwLock read failed: {}", e); break }
};
let mut writer = match rkv.write() {
Ok(writer) => writer,
Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break }
@ -31,7 +32,7 @@ fn update_awaiter(receiver: Receiver<()>, rkv: Arc<RwLock<rkv::Rkv>>, index: Ind
match update::update_task(&mut writer, index.clone(), None as Option::<fn(_)>) {
Ok(true) => if let Err(e) = writer.commit() { error!("update transaction failed: {}", e) },
// no more updates to handle for now
Ok(false) => { writer.abort(); break },
Ok(false) => { debug!("no more updates"); writer.abort(); break },
Err(e) => { error!("update task failed: {}", e); writer.abort() },
}
}

View File

@ -1,5 +1,5 @@
use std::{error, fmt, io};
use crate::serde::SerializerError;
use crate::serde::{SerializerError, DeserializerError};
pub type MResult<T> = Result<T, Error>;
@ -16,6 +16,7 @@ pub enum Error {
RmpEncode(rmp_serde::encode::Error),
Bincode(bincode::Error),
Serializer(SerializerError),
Deserializer(DeserializerError),
UnsupportedOperation(UnsupportedOperation),
}
@ -61,6 +62,12 @@ impl From<SerializerError> for Error {
}
}
impl From<DeserializerError> for Error {
fn from(error: DeserializerError) -> Error {
Error::Deserializer(error)
}
}
impl From<UnsupportedOperation> for Error {
fn from(op: UnsupportedOperation) -> Error {
Error::UnsupportedOperation(op)
@ -82,6 +89,7 @@ impl fmt::Display for Error {
RmpEncode(e) => write!(f, "rmp encode error; {}", e),
Bincode(e) => write!(f, "bincode error; {}", e),
Serializer(e) => write!(f, "serializer error; {}", e),
Deserializer(e) => write!(f, "deserializer error; {}", e),
UnsupportedOperation(op) => write!(f, "unsupported operation; {}", op),
}
}

View File

@ -1,5 +1,5 @@
use std::sync::Arc;
use rkv::Value;
use rkv::{Value, StoreError};
use crate::{DocumentId, MResult};
#[derive(Copy, Clone)]
@ -24,10 +24,14 @@ impl DocsWords {
&self,
writer: &mut rkv::Writer,
document_id: DocumentId,
) -> Result<(), rkv::StoreError>
) -> Result<bool, rkv::StoreError>
{
let document_id_bytes = document_id.0.to_be_bytes();
self.docs_words.delete(writer, document_id_bytes)
match self.docs_words.delete(writer, document_id_bytes) {
Ok(()) => Ok(true),
Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false),
Err(e) => Err(e),
}
}
pub fn doc_words<T: rkv::Readable>(

View File

@ -18,6 +18,20 @@ fn document_attribute_into_key(document_id: DocumentId, attribute: SchemaAttr) -
key
}
fn document_attribute_from_key(key: [u8; 10]) -> (DocumentId, SchemaAttr) {
let document_id = {
let array = TryFrom::try_from(&key[0..8]).unwrap();
DocumentId(u64::from_be_bytes(array))
};
let schema_attr = {
let array = TryFrom::try_from(&key[8..8+2]).unwrap();
SchemaAttr(u16::from_be_bytes(array))
};
(document_id, schema_attr)
}
impl DocumentsFields {
pub fn put_document_field(
&self,
@ -45,13 +59,10 @@ impl DocumentsFields {
let iter = self.documents_fields.iter_from(writer, document_id_bytes)?;
for result in iter {
let (key, _) = result?;
let current_document_id = {
let bytes = key.get(0..8).unwrap();
let array = TryFrom::try_from(bytes).unwrap();
DocumentId(u64::from_be_bytes(array))
};
let array = TryFrom::try_from(key).unwrap();
let (current_document_id, _) = document_attribute_from_key(array);
if current_document_id != document_id { break }
keys_to_delete.push(key.to_owned());
}
@ -103,10 +114,10 @@ impl<'r, T: rkv::Readable + 'r> Iterator for DocumentFieldsIter<'r, T> {
fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() {
Some(Ok((key, Some(rkv::Value::Blob(bytes))))) => {
let key_bytes = key.get(8..8+2).unwrap();
let array = TryFrom::try_from(key_bytes).unwrap();
let attr = u16::from_be_bytes(array);
let attr = SchemaAttr::new(attr);
let array = TryFrom::try_from(key).unwrap();
let (current_document_id, attr) = document_attribute_from_key(array);
if current_document_id != self.document_id { return None; }
Some(Ok((attr, bytes)))
},
Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data),

View File

@ -14,8 +14,11 @@ pub use self::synonyms::Synonyms;
pub use self::updates::Updates;
pub use self::updates_results::UpdatesResults;
use std::collections::HashSet;
use meilidb_schema::Schema;
use crate::{update, query_builder::QueryBuilder, MResult};
use serde::de;
use crate::{update, query_builder::QueryBuilder, DocumentId, MResult, Error};
use crate::serde::Deserializer;
fn aligned_to(bytes: &[u8], align: usize) -> bool {
(bytes as *const _ as *const () as usize) % align == 0
@ -63,6 +66,34 @@ pub struct Index {
}
impl Index {
pub fn document<T: de::DeserializeOwned, R: rkv::Readable>(
&self,
reader: &R,
fields: Option<&HashSet<&str>>,
document_id: DocumentId,
) -> MResult<Option<T>>
{
let schema = self.main.schema(reader)?;
let schema = schema.ok_or(Error::SchemaMissing)?;
let fields = match fields {
Some(fields) => fields.into_iter().map(|name| schema.attribute(name)).collect(),
None => None,
};
let mut deserializer = Deserializer {
document_id,
reader,
documents_fields: self.documents_fields,
schema: &schema,
fields: fields.as_ref(),
};
// TODO: currently we return an error if all document fields are missing,
// returning None would have been better
Ok(T::deserialize(&mut deserializer).map(Some)?)
}
pub fn schema_update(&self, mut writer: rkv::Writer, schema: Schema) -> MResult<()> {
update::push_schema_update(&mut writer, self.updates, self.updates_results, schema)?;
writer.commit()?;

View File

@ -1,6 +1,8 @@
use std::borrow::Cow;
use std::{mem, ptr};
use zerocopy::{AsBytes, LayoutVerified};
use rkv::StoreError;
use crate::DocIndex;
use crate::store::aligned_to;
@ -26,9 +28,13 @@ impl PostingsLists {
&self,
writer: &mut rkv::Writer,
word: &[u8],
) -> Result<(), rkv::StoreError>
) -> Result<bool, rkv::StoreError>
{
self.postings_lists.delete(writer, word)
match self.postings_lists.delete(writer, word) {
Ok(()) => Ok(true),
Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false),
Err(e) => Err(e),
}
}
pub fn postings_list<'a>(

View File

@ -130,10 +130,14 @@ pub fn apply_documents_deletion(
if documents_fields_store.del_all_document_fields(writer, id)? != 0 {
deleted_documents.insert(id);
}
docs_words_store.del_doc_words(writer, id)?;
}
}
let deleted_documents_len = deleted_documents.len() as u64;
for id in deleted_documents {
docs_words_store.del_doc_words(writer, id)?;
}
let removed_words = fst::Set::from_iter(removed_words).unwrap();
let words = match main_store.words_fst(writer)? {
Some(words_set) => {
@ -155,7 +159,6 @@ pub fn apply_documents_deletion(
main_store.put_words_fst(writer, &words)?;
main_store.put_ranked_map(writer, &ranked_map)?;
let deleted_documents_len = deleted_documents.len() as u64;
main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
Ok(())

View File

@ -151,6 +151,8 @@ pub fn update_task(
None => return Ok(false),
};
debug!("Processing update number {}", update_id);
let (update_type, result, duration) = match update {
Update::SchemaUpdate(schema) => {
let start = Instant::now();