Improve errors when indexing documents with a user provided embedder

This commit is contained in:
Louis Dureuil 2024-07-16 10:20:20 +02:00
parent f4c94ac57f
commit 24240934f9
No known key found for this signature in database
7 changed files with 185 additions and 23 deletions

View File

@ -415,7 +415,9 @@ impl ErrorCode for milli::Error {
Code::InvalidSettingsTypoTolerance Code::InvalidSettingsTypoTolerance
} }
UserError::InvalidEmbedder(_) => Code::InvalidEmbedder, UserError::InvalidEmbedder(_) => Code::InvalidEmbedder,
UserError::VectorEmbeddingError(_) => Code::VectorEmbeddingError, UserError::VectorEmbeddingError(_) | UserError::DocumentEmbeddingError(_) => {
Code::VectorEmbeddingError
}
UserError::DocumentEditionCannotModifyPrimaryKey UserError::DocumentEditionCannotModifyPrimaryKey
| UserError::DocumentEditionDocumentMustBeObject | UserError::DocumentEditionDocumentMustBeObject
| UserError::DocumentEditionRuntimeError(_) | UserError::DocumentEditionRuntimeError(_)

View File

@ -268,15 +268,17 @@ only composed of alphanumeric characters (a-z A-Z 0-9), hyphens (-) and undersco
DocumentEditionRuntimeError(Box<EvalAltResult>), DocumentEditionRuntimeError(Box<EvalAltResult>),
#[error("Document edition runtime error encountered while compiling the function: {0}")] #[error("Document edition runtime error encountered while compiling the function: {0}")]
DocumentEditionCompilationError(rhai::ParseError), DocumentEditionCompilationError(rhai::ParseError),
#[error("{0}")]
DocumentEmbeddingError(String),
} }
impl From<crate::vector::Error> for Error { impl From<crate::vector::Error> for Error {
fn from(value: crate::vector::Error) -> Self { fn from(value: crate::vector::Error) -> Self {
match value.fault() { match value.fault() {
FaultSource::User => Error::UserError(value.into()), FaultSource::User => Error::UserError(value.into()),
FaultSource::Runtime => Error::InternalError(value.into()), FaultSource::Runtime => Error::UserError(value.into()),
FaultSource::Bug => Error::InternalError(value.into()), FaultSource::Bug => Error::InternalError(value.into()),
FaultSource::Undecided => Error::InternalError(value.into()), FaultSource::Undecided => Error::UserError(value.into()),
} }
} }
} }

View File

@ -13,13 +13,15 @@ use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
use crate::error::FaultSource;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::prompt::Prompt; use crate::prompt::Prompt;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::{EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistribution};
use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME}; use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME};
use crate::vector::settings::{EmbedderAction, ReindexAction}; use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::Embedder; use crate::vector::{Embedder, Embeddings};
use crate::{try_split_array_at, DocumentId, FieldId, FieldsIdsMap, Result, ThreadPoolNoAbort}; use crate::{try_split_array_at, DocumentId, FieldId, FieldsIdsMap, Result, ThreadPoolNoAbort};
/// The length of the elements that are always in the buffer when inserting new values. /// The length of the elements that are always in the buffer when inserting new values.
@ -102,7 +104,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
embedders_configs: &[IndexEmbeddingConfig], embedders_configs: &[IndexEmbeddingConfig],
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
) -> Result<Vec<ExtractedVectorPoints>> { ) -> Result<(Vec<ExtractedVectorPoints>, UnusedVectorsDistribution)> {
let mut unused_vectors_distribution = UnusedVectorsDistribution::new();
let reindex_vectors = settings_diff.reindex_vectors(); let reindex_vectors = settings_diff.reindex_vectors();
let old_fields_ids_map = &settings_diff.old.fields_ids_map; let old_fields_ids_map = &settings_diff.old.fields_ids_map;
@ -319,6 +322,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
delta, delta,
)?; )?;
} }
unused_vectors_distribution.append(parsed_vectors);
} }
let mut results = Vec::new(); let mut results = Vec::new();
@ -355,7 +360,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
}) })
} }
Ok(results) Ok((results, unused_vectors_distribution))
} }
fn extract_vector_document_diff( fn extract_vector_document_diff(
@ -547,6 +552,9 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
prompt_reader: grenad::Reader<R>, prompt_reader: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
embedder: Arc<Embedder>, embedder: Arc<Embedder>,
embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort, request_threads: &ThreadPoolNoAbort,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism
@ -583,13 +591,14 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
current_chunk_ids.push(docid); current_chunk_ids.push(docid);
if chunks.len() == chunks.capacity() { if chunks.len() == chunks.capacity() {
let chunked_embeds = embedder let chunked_embeds = embed_chunks(
.embed_chunks( &embedder,
std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)),
request_threads, embedder_name,
) possible_embedding_mistakes,
.map_err(crate::vector::Error::from) unused_vectors_distribution,
.map_err(crate::Error::from)?; request_threads,
)?;
for (docid, embeddings) in chunks_ids for (docid, embeddings) in chunks_ids
.iter() .iter()
@ -604,10 +613,14 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
// send last chunk // send last chunk
if !chunks.is_empty() { if !chunks.is_empty() {
let chunked_embeds = embedder let chunked_embeds = embed_chunks(
.embed_chunks(std::mem::take(&mut chunks), request_threads) &embedder,
.map_err(crate::vector::Error::from) std::mem::take(&mut chunks),
.map_err(crate::Error::from)?; embedder_name,
possible_embedding_mistakes,
unused_vectors_distribution,
request_threads,
)?;
for (docid, embeddings) in chunks_ids for (docid, embeddings) in chunks_ids
.iter() .iter()
.flat_map(|docids| docids.iter()) .flat_map(|docids| docids.iter())
@ -618,10 +631,14 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
} }
if !current_chunk.is_empty() { if !current_chunk.is_empty() {
let embeds = embedder let embeds = embed_chunks(
.embed_chunks(vec![std::mem::take(&mut current_chunk)], request_threads) &embedder,
.map_err(crate::vector::Error::from) vec![std::mem::take(&mut current_chunk)],
.map_err(crate::Error::from)?; embedder_name,
possible_embedding_mistakes,
unused_vectors_distribution,
request_threads,
)?;
if let Some(embeds) = embeds.first() { if let Some(embeds) = embeds.first() {
for (docid, embeddings) in current_chunk_ids.iter().zip(embeds.iter()) { for (docid, embeddings) in current_chunk_ids.iter().zip(embeds.iter()) {
@ -632,3 +649,57 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
writer_into_reader(state_writer) writer_into_reader(state_writer)
} }
fn embed_chunks(
embedder: &Embedder,
text_chunks: Vec<Vec<String>>,
embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embeddings<f32>>>> {
match embedder.embed_chunks(text_chunks, request_threads) {
Ok(chunks) => Ok(chunks),
Err(error) => {
if let FaultSource::Bug = error.fault {
Err(crate::Error::InternalError(crate::InternalError::VectorEmbeddingError(
error.into(),
)))
} else {
let mut msg =
format!(r"While embedding documents for embedder `{embedder_name}`: {error}");
if let EmbedErrorKind::ManualEmbed(_) = &error.kind {
msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`.");
}
let mut hint_count = 0;
for (vector_misspelling, count) in
possible_embedding_mistakes.vector_mistakes().take(2)
{
msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s).");
hint_count += 1;
}
for (embedder_misspelling, count) in possible_embedding_mistakes
.embedder_mistakes(embedder_name, unused_vectors_distribution)
.take(2)
{
msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s).");
hint_count += 1;
}
if hint_count == 0 {
if let EmbedErrorKind::ManualEmbed(_) = &error.kind {
msg += &format!(
"\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`"
);
}
}
Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)))
}
}
}
}

View File

@ -32,6 +32,7 @@ use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk}; use super::{helpers, TypedChunk};
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::PossibleEmbeddingMistakes;
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
/// Extract data for each databases from obkv documents in parallel. /// Extract data for each databases from obkv documents in parallel.
@ -47,6 +48,7 @@ pub(crate) fn data_from_obkv_documents(
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>, embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
) -> Result<()> { ) -> Result<()> {
let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join(
|| { || {
@ -59,6 +61,7 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
embedders_configs.clone(), embedders_configs.clone(),
settings_diff.clone(), settings_diff.clone(),
possible_embedding_mistakes.clone(),
) )
}) })
.collect::<Result<()>>() .collect::<Result<()>>()
@ -227,6 +230,7 @@ fn send_original_documents_data(
lmdb_writer_sx: Sender<Result<TypedChunk>>, lmdb_writer_sx: Sender<Result<TypedChunk>>,
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>, embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
) -> Result<()> { ) -> Result<()> {
let original_documents_chunk = let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
@ -248,7 +252,7 @@ fn send_original_documents_data(
&embedders_configs, &embedders_configs,
&settings_diff, &settings_diff,
) { ) {
Ok(extracted_vectors) => { Ok((extracted_vectors, unused_vectors_distribution)) => {
for ExtractedVectorPoints { for ExtractedVectorPoints {
manual_vectors, manual_vectors,
remove_vectors, remove_vectors,
@ -263,6 +267,9 @@ fn send_original_documents_data(
prompts, prompts,
indexer, indexer,
embedder.clone(), embedder.clone(),
&embedder_name,
&possible_embedding_mistakes,
&unused_vectors_distribution,
request_threads(), request_threads(),
) { ) {
Ok(results) => Some(results), Ok(results) => Some(results),

View File

@ -427,6 +427,9 @@ where
let settings_diff = Arc::new(settings_diff); let settings_diff = Arc::new(settings_diff);
let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?); let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?);
let possible_embedding_mistakes =
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
let backup_pool; let backup_pool;
let pool = match self.indexer_config.thread_pool { let pool = match self.indexer_config.thread_pool {
Some(ref pool) => pool, Some(ref pool) => pool,
@ -542,6 +545,7 @@ where
embedders_configs.clone(), embedders_configs.clone(),
settings_diff_cloned, settings_diff_cloned,
max_positions_per_attributes, max_positions_per_attributes,
Arc::new(possible_embedding_mistakes)
) )
}); });

View File

@ -1,9 +1,11 @@
use std::collections::BTreeMap;
use std::path::PathBuf; use std::path::PathBuf;
use hf_hub::api::sync::ApiError; use hf_hub::api::sync::ApiError;
use super::parsed_vectors::ParsedVectorsDiff;
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::PanicCatched; use crate::{FieldDistribution, PanicCatched};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error("Error while generating embeddings: {inner}")] #[error("Error while generating embeddings: {inner}")]
@ -310,3 +312,68 @@ pub enum NewEmbedderErrorKind {
#[error("loading model failed: {0}")] #[error("loading model failed: {0}")]
LoadModel(candle_core::Error), LoadModel(candle_core::Error),
} }
pub struct PossibleEmbeddingMistakes {
vectors_mistakes: BTreeMap<String, u64>,
}
impl PossibleEmbeddingMistakes {
pub fn new(field_distribution: &FieldDistribution) -> Self {
let mut vectors_mistakes = BTreeMap::new();
let builder = levenshtein_automata::LevenshteinAutomatonBuilder::new(2, true);
let automata = builder.build_dfa("_vectors");
for (field, count) in field_distribution {
if *count == 0 {
continue;
}
if field.contains('.') {
continue;
}
match automata.eval(field) {
levenshtein_automata::Distance::Exact(0) => continue,
levenshtein_automata::Distance::Exact(_) => {
vectors_mistakes.insert(field.to_string(), *count);
}
levenshtein_automata::Distance::AtLeast(_) => continue,
}
}
Self { vectors_mistakes }
}
pub fn vector_mistakes(&self) -> impl Iterator<Item = (&str, u64)> {
self.vectors_mistakes.iter().map(|(misspelling, count)| (misspelling.as_str(), *count))
}
pub fn embedder_mistakes<'a>(
&'a self,
embedder_name: &'a str,
unused_vectors_distributions: &'a UnusedVectorsDistribution,
) -> impl Iterator<Item = (&'a str, u64)> + 'a {
let builder = levenshtein_automata::LevenshteinAutomatonBuilder::new(2, true);
let automata = builder.build_dfa(embedder_name);
unused_vectors_distributions.0.iter().filter_map(move |(field, count)| {
match automata.eval(field) {
levenshtein_automata::Distance::Exact(0) => None,
levenshtein_automata::Distance::Exact(_) => Some((field.as_str(), *count)),
levenshtein_automata::Distance::AtLeast(_) => None,
}
})
}
}
#[derive(Default)]
pub struct UnusedVectorsDistribution(BTreeMap<String, u64>);
impl UnusedVectorsDistribution {
pub fn new() -> Self {
Self::default()
}
pub fn append(&mut self, parsed_vectors_diff: ParsedVectorsDiff) {
for name in parsed_vectors_diff.into_new_vectors_keys_iter() {
*self.0.entry(name).or_default() += 1;
}
}
}

View File

@ -179,6 +179,15 @@ impl ParsedVectorsDiff {
(old, new) (old, new)
} }
pub fn into_new_vectors_keys_iter(self) -> impl Iterator<Item = String> {
let maybe_it = match self.new {
VectorsState::NoVectorsFid => None,
VectorsState::NoVectorsFieldInDocument => None,
VectorsState::Vectors(vectors) => Some(vectors.into_keys()),
};
maybe_it.into_iter().flatten()
}
} }
pub struct ParsedVectors(pub BTreeMap<String, Vectors>); pub struct ParsedVectors(pub BTreeMap<String, Vectors>);