diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index 48e3e697a..fdf8649f4 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -17,9 +17,10 @@ use crate::index::IndexEmbeddingConfig; use crate::prompt::Prompt; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::settings::InnerIndexSettingsDiff; -use crate::vector::parsed_vectors::{ParsedVectorsDiff, RESERVED_VECTORS_FIELD_NAME}; +use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME}; +use crate::vector::settings::{EmbedderAction, ReindexAction}; use crate::vector::Embedder; -use crate::{try_split_array_at, DocumentId, Result, ThreadPoolNoAbort}; +use crate::{try_split_array_at, DocumentId, FieldId, FieldsIdsMap, Result, ThreadPoolNoAbort}; /// The length of the elements that are always in the buffer when inserting new values. const TRUNCATE_SIZE: usize = size_of::(); @@ -35,7 +36,7 @@ pub struct ExtractedVectorPoints { // embedder pub embedder_name: String, pub embedder: Arc, - pub user_provided: RoaringBitmap, + pub add_to_user_provided: RoaringBitmap, pub remove_from_user_provided: RoaringBitmap, } @@ -44,12 +45,7 @@ enum VectorStateDelta { // Remove all vectors, generated or manual, from this document NowRemoved, - // Add the manually specified vectors, passed in the other grenad - // Remove any previously generated vectors - // Note: changing the value of the manually specified vector **should not record** this delta - WasGeneratedNowManual(Vec>), - - ManualDelta(Vec>), + NowManual(Vec>), // Add the vector computed from the specified prompt // Remove any previous vector @@ -62,9 +58,8 @@ impl VectorStateDelta { match self { VectorStateDelta::NoChange => Default::default(), VectorStateDelta::NowRemoved => (true, Default::default(), Default::default()), - VectorStateDelta::WasGeneratedNowManual(add) => (true, Default::default(), add), // We always delete the previous vectors - VectorStateDelta::ManualDelta(add) => (true, Default::default(), add), + VectorStateDelta::NowManual(add) => (true, Default::default(), add), VectorStateDelta::NowGenerated(prompt) => (true, prompt, Default::default()), } } @@ -75,19 +70,29 @@ struct EmbedderVectorExtractor { embedder: Arc, prompt: Arc, - // (docid, _index) -> KvWriterDelAdd -> Vector - manual_vectors_writer: Writer>, // (docid) -> (prompt) prompts_writer: Writer>, // (docid) -> () remove_vectors_writer: Writer>, - + // (docid, _index) -> KvWriterDelAdd -> Vector + manual_vectors_writer: Writer>, // The docids of the documents that contains a user defined embedding - user_provided: RoaringBitmap, + add_to_user_provided: RoaringBitmap, + + action: ExtractionAction, +} + +struct DocumentOperation { // The docids of the documents that contains an auto-generated embedding remove_from_user_provided: RoaringBitmap, } +enum ExtractionAction { + SettingsFullReindex, + SettingsRegeneratePrompts { old_prompt: Arc }, + DocumentOperation(DocumentOperation), +} + /// Extracts the embedding vector contained in each document under the `_vectors` field. /// /// Returns the generated grenad reader containing the docid as key associated to the Vec @@ -104,46 +109,109 @@ pub fn extract_vector_points( let new_fields_ids_map = &settings_diff.new.fields_ids_map; // the vector field id may have changed let old_vectors_fid = old_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME); - // filter the old vector fid if the settings has been changed forcing reindexing. - let old_vectors_fid = old_vectors_fid.filter(|_| !reindex_vectors); let new_vectors_fid = new_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME); let mut extractors = Vec::new(); - for (embedder_name, (embedder, prompt)) in - settings_diff.new.embedding_configs.clone().into_iter() - { - // (docid, _index) -> KvWriterDelAdd -> Vector - let manual_vectors_writer = create_writer( - indexer.chunk_compression_type, - indexer.chunk_compression_level, - tempfile::tempfile()?, - ); - // (docid) -> (prompt) - let prompts_writer = create_writer( - indexer.chunk_compression_type, - indexer.chunk_compression_level, - tempfile::tempfile()?, - ); + let mut configs = settings_diff.new.embedding_configs.clone().into_inner(); + let old_configs = &settings_diff.old.embedding_configs; - // (docid) -> () - let remove_vectors_writer = create_writer( - indexer.chunk_compression_type, - indexer.chunk_compression_level, - tempfile::tempfile()?, - ); + if reindex_vectors { + for (name, action) in settings_diff.embedding_config_updates.iter() { + match action { + EmbedderAction::WriteBackToDocuments(_) => continue, // already deleted + EmbedderAction::Reindex(action) => { + let Some((embedder_name, (embedder, prompt))) = configs.remove_entry(name) + else { + tracing::error!(embedder = name, "Requested embedder config not found"); + continue; + }; - extractors.push(EmbedderVectorExtractor { - embedder_name, - embedder, - prompt, - manual_vectors_writer, - prompts_writer, - remove_vectors_writer, - user_provided: RoaringBitmap::new(), - remove_from_user_provided: RoaringBitmap::new(), - }); + // (docid, _index) -> KvWriterDelAdd -> Vector + let manual_vectors_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + // (docid) -> (prompt) + let prompts_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + // (docid) -> () + let remove_vectors_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + let action = match action { + ReindexAction::FullReindex => ExtractionAction::SettingsFullReindex, + ReindexAction::RegeneratePrompts => { + let Some((_, old_prompt)) = old_configs.get(name) else { + tracing::error!(embedder = name, "Old embedder config not found"); + continue; + }; + + ExtractionAction::SettingsRegeneratePrompts { old_prompt } + } + }; + + extractors.push(EmbedderVectorExtractor { + embedder_name, + embedder, + prompt, + prompts_writer, + remove_vectors_writer, + manual_vectors_writer, + add_to_user_provided: RoaringBitmap::new(), + action, + }); + } + } + } + } else { + // document operation + + for (embedder_name, (embedder, prompt)) in configs.into_iter() { + // (docid, _index) -> KvWriterDelAdd -> Vector + let manual_vectors_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + // (docid) -> (prompt) + let prompts_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + // (docid) -> () + let remove_vectors_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + extractors.push(EmbedderVectorExtractor { + embedder_name, + embedder, + prompt, + prompts_writer, + remove_vectors_writer, + manual_vectors_writer, + add_to_user_provided: RoaringBitmap::new(), + action: ExtractionAction::DocumentOperation(DocumentOperation { + remove_from_user_provided: RoaringBitmap::new(), + }), + }); + } } let mut key_buffer = Vec::new(); @@ -177,111 +245,66 @@ pub fn extract_vector_points( embedder_name, embedder: _, prompt, - manual_vectors_writer, prompts_writer, remove_vectors_writer, - user_provided, - remove_from_user_provided, + manual_vectors_writer, + add_to_user_provided, + action, } in extractors.iter_mut() { - let delta = match parsed_vectors.remove(embedder_name) { - (Some(old), Some(new)) => { - match (old.map_or(true, |old| old.is_user_provided()), new.is_user_provided()) { - (true, true) | (false, false) => (), - (true, false) => { - remove_from_user_provided.insert(docid); + let (old, new) = parsed_vectors.remove(embedder_name); + let delta = match action { + ExtractionAction::SettingsFullReindex => match old { + // A full reindex can be triggered either by: + // 1. a new embedder + // 2. an existing embedder changed so that it must regenerate all generated embeddings. + // For a new embedder, there can be `_vectors.embedder` embeddings to add to the DB + VectorState::Inline(vectors) => { + if vectors.is_user_provided() { + add_to_user_provided.insert(docid); } - (false, true) => { - user_provided.insert(docid); + let add_vectors = vectors.into_array_of_vectors(); + + if add_vectors.len() > usize::from(u8::MAX) { + return Err(crate::Error::UserError(crate::UserError::TooManyVectors( + document_id().to_string(), + add_vectors.len(), + ))); } + + VectorStateDelta::NowManual(add_vectors) } - - // no autogeneration - let add_vectors = new.into_array_of_vectors(); - - if add_vectors.len() > usize::from(u8::MAX) { - return Err(crate::Error::UserError(crate::UserError::TooManyVectors( - document_id().to_string(), - add_vectors.len(), - ))); - } - - VectorStateDelta::ManualDelta(add_vectors) - } - (Some(old), None) => { - // Do we keep this document? - let document_is_kept = obkv - .iter() - .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) - .any(|deladd| deladd.get(DelAdd::Addition).is_some()); - if document_is_kept && old.is_some() { - remove_from_user_provided.insert(docid); - // becomes autogenerated - VectorStateDelta::NowGenerated(prompt.render( + // this happens only when an existing embedder changed. We cannot regenerate userProvided vectors + VectorState::InDb => VectorStateDelta::NoChange, + // generated vectors must be regenerated + VectorState::Generated => regenerate_prompt(obkv, prompt, new_fields_ids_map)?, + }, + // prompt regeneration is only triggered for existing embedders + ExtractionAction::SettingsRegeneratePrompts { old_prompt } => { + if !old.is_user_provided() { + regenerate_if_prompt_changed( obkv, - DelAdd::Addition, - new_fields_ids_map, - )?) - } else if document_is_kept && old.is_none() { + (old_prompt, prompt), + (&old_fields_ids_map, &new_fields_ids_map), + )? + } else { + // we can simply ignore user provided vectors as they are not regenerated and are + // already in the DB since this is an existing embedder VectorStateDelta::NoChange - } else { - remove_from_user_provided.insert(docid); - VectorStateDelta::NowRemoved - } - } - (None, Some(new)) => { - if new.is_user_provided() { - user_provided.insert(docid); - } else { - remove_from_user_provided.insert(docid); - } - // was possibly autogenerated, remove all vectors for that document - let add_vectors = new.into_array_of_vectors(); - if add_vectors.len() > usize::from(u8::MAX) { - return Err(crate::Error::UserError(crate::UserError::TooManyVectors( - document_id().to_string(), - add_vectors.len(), - ))); - } - - VectorStateDelta::WasGeneratedNowManual(add_vectors) - } - (None, None) => { - // Do we keep this document? - let document_is_kept = obkv - .iter() - .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) - .any(|deladd| deladd.get(DelAdd::Addition).is_some()); - - if document_is_kept { - // Don't give up if the old prompt was failing - let old_prompt = Some(&prompt) - // TODO: this filter works because we erase the vec database when a embedding setting changes. - // When vector pipeline will be optimized, this should be removed. - .filter(|_| !settings_diff.reindex_vectors()) - .map(|p| { - p.render(obkv, DelAdd::Deletion, old_fields_ids_map) - .unwrap_or_default() - }); - let new_prompt = - prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?; - if old_prompt.as_ref() != Some(&new_prompt) { - let old_prompt = old_prompt.unwrap_or_default(); - tracing::trace!( - "🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}" - ); - VectorStateDelta::NowGenerated(new_prompt) - } else { - tracing::trace!("⏭️ Prompt unmodified, skipping"); - VectorStateDelta::NoChange - } - } else { - remove_from_user_provided.remove(docid); - VectorStateDelta::NowRemoved } } + ExtractionAction::DocumentOperation(DocumentOperation { + remove_from_user_provided, + }) => extract_vector_document_diff( + docid, + obkv, + prompt, + (add_to_user_provided, remove_from_user_provided), + (old, new), + (&old_fields_ids_map, &new_fields_ids_map), + document_id, + )?, }; - // and we finally push the unique vectors into the writer push_vectors_diff( remove_vectors_writer, @@ -289,7 +312,6 @@ pub fn extract_vector_points( manual_vectors_writer, &mut key_buffer, delta, - reindex_vectors, )?; } } @@ -300,20 +322,30 @@ pub fn extract_vector_points( embedder_name, embedder, prompt: _, - manual_vectors_writer, prompts_writer, remove_vectors_writer, - user_provided, - remove_from_user_provided, + action, + manual_vectors_writer, + add_to_user_provided, } in extractors { + let remove_from_user_provided = + if let ExtractionAction::DocumentOperation(DocumentOperation { + remove_from_user_provided, + }) = action + { + remove_from_user_provided + } else { + Default::default() + }; + results.push(ExtractedVectorPoints { manual_vectors: writer_into_reader(manual_vectors_writer)?, remove_vectors: writer_into_reader(remove_vectors_writer)?, prompts: writer_into_reader(prompts_writer)?, embedder, embedder_name, - user_provided, + add_to_user_provided, remove_from_user_provided, }) } @@ -321,6 +353,136 @@ pub fn extract_vector_points( Ok(results) } +fn extract_vector_document_diff( + docid: DocumentId, + obkv: obkv::KvReader<'_, FieldId>, + prompt: &Prompt, + (add_to_user_provided, remove_from_user_provided): (&mut RoaringBitmap, &mut RoaringBitmap), + (old, new): (VectorState, VectorState), + (old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap), + document_id: impl Fn() -> Value, +) -> Result { + match (old.is_user_provided(), new.is_user_provided()) { + (true, true) | (false, false) => {} + (true, false) => { + remove_from_user_provided.insert(docid); + } + (false, true) => { + add_to_user_provided.insert(docid); + } + } + + let delta = match (old, new) { + // regardless of the previous state, if a document now contains inline _vectors, they must + // be extracted manually + (_old, VectorState::Inline(new)) => { + let add_vectors = new.into_array_of_vectors(); + + if add_vectors.len() > usize::from(u8::MAX) { + return Err(crate::Error::UserError(crate::UserError::TooManyVectors( + document_id().to_string(), + add_vectors.len(), + ))); + } + + VectorStateDelta::NowManual(add_vectors) + } + // no `_vectors` anywhere, we check for document removal and otherwise we regenerate the prompt if the + // document changed + (VectorState::Generated, VectorState::Generated) => { + // Do we keep this document? + let document_is_kept = obkv + .iter() + .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) + .any(|deladd| deladd.get(DelAdd::Addition).is_some()); + + if document_is_kept { + // Don't give up if the old prompt was failing + let old_prompt = Some(&prompt).map(|p| { + p.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or_default() + }); + let new_prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?; + if old_prompt.as_ref() != Some(&new_prompt) { + let old_prompt = old_prompt.unwrap_or_default(); + tracing::trace!( + "🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}" + ); + VectorStateDelta::NowGenerated(new_prompt) + } else { + tracing::trace!("⏭️ Prompt unmodified, skipping"); + VectorStateDelta::NoChange + } + } else { + VectorStateDelta::NowRemoved + } + } + // when the vectors are no longer user-provided, + // we generate the prompt unconditionally + (_not_generated, VectorState::Generated) => { + // Do we keep this document? + let document_is_kept = obkv + .iter() + .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) + .any(|deladd| deladd.get(DelAdd::Addition).is_some()); + if document_is_kept { + // becomes autogenerated + VectorStateDelta::NowGenerated(prompt.render( + obkv, + DelAdd::Addition, + new_fields_ids_map, + )?) + } else { + // make sure the document is always removed from user provided on removal + remove_from_user_provided.insert(docid); + VectorStateDelta::NowRemoved + } + } + (_old, VectorState::InDb) => { + // Do we keep this document? + let document_is_kept = obkv + .iter() + .map(|(_, deladd)| KvReaderDelAdd::new(deladd)) + .any(|deladd| deladd.get(DelAdd::Addition).is_some()); + if document_is_kept { + // if the new version of documents has the vectors in the DB, + // then they are user-provided and nothing possibly changed + VectorStateDelta::NoChange + } else { + // make sure the document is always removed from user provided on removal + remove_from_user_provided.insert(docid); + VectorStateDelta::NowRemoved + } + } + }; + + Ok(delta) +} + +fn regenerate_if_prompt_changed( + obkv: obkv::KvReader<'_, FieldId>, + (old_prompt, new_prompt): (&Prompt, &Prompt), + (old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap), +) -> Result { + let old_prompt = + old_prompt.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or(Default::default()); + let new_prompt = new_prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?; + + if new_prompt == old_prompt { + return Ok(VectorStateDelta::NoChange); + } + Ok(VectorStateDelta::NowGenerated(new_prompt)) +} + +fn regenerate_prompt( + obkv: obkv::KvReader<'_, FieldId>, + prompt: &Prompt, + new_fields_ids_map: &FieldsIdsMap, +) -> Result { + let prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?; + + Ok(VectorStateDelta::NowGenerated(prompt)) +} + /// We cannot compute the diff between both Del and Add vectors. /// We'll push every vector and compute the difference later in TypedChunk. fn push_vectors_diff( @@ -329,14 +491,9 @@ fn push_vectors_diff( manual_vectors_writer: &mut Writer>, key_buffer: &mut Vec, delta: VectorStateDelta, - reindex_vectors: bool, ) -> Result<()> { let (must_remove, prompt, mut add_vectors) = delta.into_values(); - if must_remove - // TODO: the below condition works because we erase the vec database when a embedding setting changes. - // When vector pipeline will be optimized, this should be removed. - && !reindex_vectors - { + if must_remove { key_buffer.truncate(TRUNCATE_SIZE); remove_vectors_writer.insert(&key_buffer, [])?; } diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 2babe330f..9da3983fc 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -248,7 +248,7 @@ fn send_original_documents_data( prompts, embedder_name, embedder, - user_provided, + add_to_user_provided, remove_from_user_provided, } in extracted_vectors { @@ -274,7 +274,7 @@ fn send_original_documents_data( expected_dimension: embedder.dimensions(), manual_vectors, embedder_name, - user_provided, + add_to_user_provided, remove_from_user_provided, })); } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index a533f1984..3586c9c6d 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -503,7 +503,7 @@ where embeddings, manual_vectors, embedder_name, - user_provided, + add_to_user_provided, remove_from_user_provided, } => { dimension.insert(embedder_name.clone(), expected_dimension); @@ -513,7 +513,7 @@ where expected_dimension, manual_vectors, embedder_name, - user_provided, + add_to_user_provided, remove_from_user_provided, } } diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 0cb5e58af..4737c6b42 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -91,7 +91,7 @@ pub(crate) enum TypedChunk { expected_dimension: usize, manual_vectors: grenad::Reader>, embedder_name: String, - user_provided: RoaringBitmap, + add_to_user_provided: RoaringBitmap, remove_from_user_provided: RoaringBitmap, }, ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>), @@ -625,7 +625,7 @@ pub(crate) fn write_typed_chunk_into_index( let mut remove_vectors_builder = MergerBuilder::new(keep_first as MergeFn); let mut manual_vectors_builder = MergerBuilder::new(keep_first as MergeFn); let mut embeddings_builder = MergerBuilder::new(keep_first as MergeFn); - let mut user_provided = RoaringBitmap::new(); + let mut add_to_user_provided = RoaringBitmap::new(); let mut remove_from_user_provided = RoaringBitmap::new(); let mut params = None; for typed_chunk in typed_chunks { @@ -635,7 +635,7 @@ pub(crate) fn write_typed_chunk_into_index( embeddings, expected_dimension, embedder_name, - user_provided: ud, + add_to_user_provided: aud, remove_from_user_provided: rud, } = typed_chunk else { @@ -649,7 +649,7 @@ pub(crate) fn write_typed_chunk_into_index( if let Some(embeddings) = embeddings { embeddings_builder.push(embeddings.into_cursor()?); } - user_provided |= ud; + add_to_user_provided |= aud; remove_from_user_provided |= rud; } @@ -662,7 +662,7 @@ pub(crate) fn write_typed_chunk_into_index( .find(|IndexEmbeddingConfig { name, .. }| name == &embedder_name) .unwrap(); index_embedder_config.user_provided -= remove_from_user_provided; - index_embedder_config.user_provided |= user_provided; + index_embedder_config.user_provided |= add_to_user_provided; index.put_embedding_configs(wtxn, embedding_configs)?;