From 24240934f9f450c79c533f716442ba9d7acb8cea Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 16 Jul 2024 10:20:20 +0200 Subject: [PATCH] Improve errors when indexing documents with a user provided embedder --- meilisearch-types/src/error.rs | 4 +- milli/src/error.rs | 6 +- .../extract/extract_vector_points.rs | 107 +++++++++++++++--- .../src/update/index_documents/extract/mod.rs | 9 +- milli/src/update/index_documents/mod.rs | 4 + milli/src/vector/error.rs | 69 ++++++++++- milli/src/vector/parsed_vectors.rs | 9 ++ 7 files changed, 185 insertions(+), 23 deletions(-) diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index a88ca6059..d27d6cd3d 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -415,7 +415,9 @@ impl ErrorCode for milli::Error { Code::InvalidSettingsTypoTolerance } UserError::InvalidEmbedder(_) => Code::InvalidEmbedder, - UserError::VectorEmbeddingError(_) => Code::VectorEmbeddingError, + UserError::VectorEmbeddingError(_) | UserError::DocumentEmbeddingError(_) => { + Code::VectorEmbeddingError + } UserError::DocumentEditionCannotModifyPrimaryKey | UserError::DocumentEditionDocumentMustBeObject | UserError::DocumentEditionRuntimeError(_) diff --git a/milli/src/error.rs b/milli/src/error.rs index 0effb7be7..47a497292 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -268,15 +268,17 @@ only composed of alphanumeric characters (a-z A-Z 0-9), hyphens (-) and undersco DocumentEditionRuntimeError(Box), #[error("Document edition runtime error encountered while compiling the function: {0}")] DocumentEditionCompilationError(rhai::ParseError), + #[error("{0}")] + DocumentEmbeddingError(String), } impl From for Error { fn from(value: crate::vector::Error) -> Self { match value.fault() { FaultSource::User => Error::UserError(value.into()), - FaultSource::Runtime => Error::InternalError(value.into()), + FaultSource::Runtime => Error::UserError(value.into()), FaultSource::Bug => Error::InternalError(value.into()), - FaultSource::Undecided => Error::InternalError(value.into()), + FaultSource::Undecided => Error::UserError(value.into()), } } } diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index 36fa346a5..b984c3020 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -13,13 +13,15 @@ use roaring::RoaringBitmap; use serde_json::Value; use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; +use crate::error::FaultSource; use crate::index::IndexEmbeddingConfig; use crate::prompt::Prompt; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::settings::InnerIndexSettingsDiff; +use crate::vector::error::{EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistribution}; use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME}; use crate::vector::settings::{EmbedderAction, ReindexAction}; -use crate::vector::Embedder; +use crate::vector::{Embedder, Embeddings}; use crate::{try_split_array_at, DocumentId, FieldId, FieldsIdsMap, Result, ThreadPoolNoAbort}; /// The length of the elements that are always in the buffer when inserting new values. @@ -102,7 +104,8 @@ pub fn extract_vector_points( indexer: GrenadParameters, embedders_configs: &[IndexEmbeddingConfig], settings_diff: &InnerIndexSettingsDiff, -) -> Result> { +) -> Result<(Vec, UnusedVectorsDistribution)> { + let mut unused_vectors_distribution = UnusedVectorsDistribution::new(); let reindex_vectors = settings_diff.reindex_vectors(); let old_fields_ids_map = &settings_diff.old.fields_ids_map; @@ -319,6 +322,8 @@ pub fn extract_vector_points( delta, )?; } + + unused_vectors_distribution.append(parsed_vectors); } let mut results = Vec::new(); @@ -355,7 +360,7 @@ pub fn extract_vector_points( }) } - Ok(results) + Ok((results, unused_vectors_distribution)) } fn extract_vector_document_diff( @@ -547,6 +552,9 @@ pub fn extract_embeddings( prompt_reader: grenad::Reader, indexer: GrenadParameters, embedder: Arc, + embedder_name: &str, + possible_embedding_mistakes: &PossibleEmbeddingMistakes, + unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism @@ -583,13 +591,14 @@ pub fn extract_embeddings( current_chunk_ids.push(docid); if chunks.len() == chunks.capacity() { - let chunked_embeds = embedder - .embed_chunks( - std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), - request_threads, - ) - .map_err(crate::vector::Error::from) - .map_err(crate::Error::from)?; + let chunked_embeds = embed_chunks( + &embedder, + std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), + embedder_name, + possible_embedding_mistakes, + unused_vectors_distribution, + request_threads, + )?; for (docid, embeddings) in chunks_ids .iter() @@ -604,10 +613,14 @@ pub fn extract_embeddings( // send last chunk if !chunks.is_empty() { - let chunked_embeds = embedder - .embed_chunks(std::mem::take(&mut chunks), request_threads) - .map_err(crate::vector::Error::from) - .map_err(crate::Error::from)?; + let chunked_embeds = embed_chunks( + &embedder, + std::mem::take(&mut chunks), + embedder_name, + possible_embedding_mistakes, + unused_vectors_distribution, + request_threads, + )?; for (docid, embeddings) in chunks_ids .iter() .flat_map(|docids| docids.iter()) @@ -618,10 +631,14 @@ pub fn extract_embeddings( } if !current_chunk.is_empty() { - let embeds = embedder - .embed_chunks(vec![std::mem::take(&mut current_chunk)], request_threads) - .map_err(crate::vector::Error::from) - .map_err(crate::Error::from)?; + let embeds = embed_chunks( + &embedder, + vec![std::mem::take(&mut current_chunk)], + embedder_name, + possible_embedding_mistakes, + unused_vectors_distribution, + request_threads, + )?; if let Some(embeds) = embeds.first() { for (docid, embeddings) in current_chunk_ids.iter().zip(embeds.iter()) { @@ -632,3 +649,57 @@ pub fn extract_embeddings( writer_into_reader(state_writer) } + +fn embed_chunks( + embedder: &Embedder, + text_chunks: Vec>, + embedder_name: &str, + possible_embedding_mistakes: &PossibleEmbeddingMistakes, + unused_vectors_distribution: &UnusedVectorsDistribution, + request_threads: &ThreadPoolNoAbort, +) -> Result>>> { + match embedder.embed_chunks(text_chunks, request_threads) { + Ok(chunks) => Ok(chunks), + Err(error) => { + if let FaultSource::Bug = error.fault { + Err(crate::Error::InternalError(crate::InternalError::VectorEmbeddingError( + error.into(), + ))) + } else { + let mut msg = + format!(r"While embedding documents for embedder `{embedder_name}`: {error}"); + + if let EmbedErrorKind::ManualEmbed(_) = &error.kind { + msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`."); + } + + let mut hint_count = 0; + + for (vector_misspelling, count) in + possible_embedding_mistakes.vector_mistakes().take(2) + { + msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s)."); + hint_count += 1; + } + + for (embedder_misspelling, count) in possible_embedding_mistakes + .embedder_mistakes(embedder_name, unused_vectors_distribution) + .take(2) + { + msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s)."); + hint_count += 1; + } + + if hint_count == 0 { + if let EmbedErrorKind::ManualEmbed(_) = &error.kind { + msg += &format!( + "\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`" + ); + } + } + + Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg))) + } + } + } +} diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 2feb85414..57d9d5e42 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -32,6 +32,7 @@ use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; use crate::index::IndexEmbeddingConfig; use crate::update::settings::InnerIndexSettingsDiff; +use crate::vector::error::PossibleEmbeddingMistakes; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; /// Extract data for each databases from obkv documents in parallel. @@ -47,6 +48,7 @@ pub(crate) fn data_from_obkv_documents( embedders_configs: Arc>, settings_diff: Arc, max_positions_per_attributes: Option, + possible_embedding_mistakes: Arc, ) -> Result<()> { let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( || { @@ -59,6 +61,7 @@ pub(crate) fn data_from_obkv_documents( lmdb_writer_sx.clone(), embedders_configs.clone(), settings_diff.clone(), + possible_embedding_mistakes.clone(), ) }) .collect::>() @@ -227,6 +230,7 @@ fn send_original_documents_data( lmdb_writer_sx: Sender>, embedders_configs: Arc>, settings_diff: Arc, + possible_embedding_mistakes: Arc, ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -248,7 +252,7 @@ fn send_original_documents_data( &embedders_configs, &settings_diff, ) { - Ok(extracted_vectors) => { + Ok((extracted_vectors, unused_vectors_distribution)) => { for ExtractedVectorPoints { manual_vectors, remove_vectors, @@ -263,6 +267,9 @@ fn send_original_documents_data( prompts, indexer, embedder.clone(), + &embedder_name, + &possible_embedding_mistakes, + &unused_vectors_distribution, request_threads(), ) { Ok(results) => Some(results), diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 271d4037c..7f07dafed 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -427,6 +427,9 @@ where let settings_diff = Arc::new(settings_diff); let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?); + let possible_embedding_mistakes = + crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); + let backup_pool; let pool = match self.indexer_config.thread_pool { Some(ref pool) => pool, @@ -542,6 +545,7 @@ where embedders_configs.clone(), settings_diff_cloned, max_positions_per_attributes, + Arc::new(possible_embedding_mistakes) ) }); diff --git a/milli/src/vector/error.rs b/milli/src/vector/error.rs index 650e1171e..af9718f08 100644 --- a/milli/src/vector/error.rs +++ b/milli/src/vector/error.rs @@ -1,9 +1,11 @@ +use std::collections::BTreeMap; use std::path::PathBuf; use hf_hub::api::sync::ApiError; +use super::parsed_vectors::ParsedVectorsDiff; use crate::error::FaultSource; -use crate::PanicCatched; +use crate::{FieldDistribution, PanicCatched}; #[derive(Debug, thiserror::Error)] #[error("Error while generating embeddings: {inner}")] @@ -310,3 +312,68 @@ pub enum NewEmbedderErrorKind { #[error("loading model failed: {0}")] LoadModel(candle_core::Error), } + +pub struct PossibleEmbeddingMistakes { + vectors_mistakes: BTreeMap, +} + +impl PossibleEmbeddingMistakes { + pub fn new(field_distribution: &FieldDistribution) -> Self { + let mut vectors_mistakes = BTreeMap::new(); + let builder = levenshtein_automata::LevenshteinAutomatonBuilder::new(2, true); + let automata = builder.build_dfa("_vectors"); + for (field, count) in field_distribution { + if *count == 0 { + continue; + } + if field.contains('.') { + continue; + } + match automata.eval(field) { + levenshtein_automata::Distance::Exact(0) => continue, + levenshtein_automata::Distance::Exact(_) => { + vectors_mistakes.insert(field.to_string(), *count); + } + levenshtein_automata::Distance::AtLeast(_) => continue, + } + } + + Self { vectors_mistakes } + } + + pub fn vector_mistakes(&self) -> impl Iterator { + self.vectors_mistakes.iter().map(|(misspelling, count)| (misspelling.as_str(), *count)) + } + + pub fn embedder_mistakes<'a>( + &'a self, + embedder_name: &'a str, + unused_vectors_distributions: &'a UnusedVectorsDistribution, + ) -> impl Iterator + 'a { + let builder = levenshtein_automata::LevenshteinAutomatonBuilder::new(2, true); + let automata = builder.build_dfa(embedder_name); + + unused_vectors_distributions.0.iter().filter_map(move |(field, count)| { + match automata.eval(field) { + levenshtein_automata::Distance::Exact(0) => None, + levenshtein_automata::Distance::Exact(_) => Some((field.as_str(), *count)), + levenshtein_automata::Distance::AtLeast(_) => None, + } + }) + } +} + +#[derive(Default)] +pub struct UnusedVectorsDistribution(BTreeMap); + +impl UnusedVectorsDistribution { + pub fn new() -> Self { + Self::default() + } + + pub fn append(&mut self, parsed_vectors_diff: ParsedVectorsDiff) { + for name in parsed_vectors_diff.into_new_vectors_keys_iter() { + *self.0.entry(name).or_default() += 1; + } + } +} diff --git a/milli/src/vector/parsed_vectors.rs b/milli/src/vector/parsed_vectors.rs index 11c59e155..9dbf025e6 100644 --- a/milli/src/vector/parsed_vectors.rs +++ b/milli/src/vector/parsed_vectors.rs @@ -179,6 +179,15 @@ impl ParsedVectorsDiff { (old, new) } + + pub fn into_new_vectors_keys_iter(self) -> impl Iterator { + let maybe_it = match self.new { + VectorsState::NoVectorsFid => None, + VectorsState::NoVectorsFieldInDocument => None, + VectorsState::Vectors(vectors) => Some(vectors.into_keys()), + }; + maybe_it.into_iter().flatten() + } } pub struct ParsedVectors(pub BTreeMap);