From cab5e35ff7b133b0743a852a76fdeac92c4b3f3f Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 30 Jun 2025 00:01:05 +0200 Subject: [PATCH] Implement in old settings indexer and old dump import indexer --- .../extract/extract_vector_points.rs | 771 ++++++++++++++---- .../src/update/index_documents/extract/mod.rs | 53 +- .../milli/src/update/index_documents/mod.rs | 100 ++- .../src/update/index_documents/transform.rs | 41 +- .../src/update/index_documents/typed_chunk.rs | 93 ++- crates/milli/src/vector/parsed_vectors.rs | 12 +- 6 files changed, 824 insertions(+), 246 deletions(-) diff --git a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs index e1981a615..0a179cfa5 100644 --- a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -1,4 +1,5 @@ use std::cmp::Ordering; +use std::collections::{BTreeMap, VecDeque}; use std::convert::{TryFrom, TryInto}; use std::fs::File; use std::io::{self, BufReader, BufWriter}; @@ -6,25 +7,29 @@ use std::mem::size_of; use std::str::from_utf8; use std::sync::Arc; +use bumpalo::Bump; use bytemuck::cast_slice; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use grenad::Writer; +use obkv::KvReaderU16; use ordered_float::OrderedFloat; -use roaring::RoaringBitmap; use serde_json::Value; use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; use crate::constants::RESERVED_VECTORS_FIELD_NAME; use crate::error::FaultSource; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; -use crate::index::IndexEmbeddingConfig; use crate::progress::EmbedderStats; use crate::prompt::Prompt; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::settings::InnerIndexSettingsDiff; +use crate::vector::db::{EmbedderInfo, EmbeddingStatus, EmbeddingStatusDelta}; use crate::vector::error::{EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistribution}; +use crate::vector::extractor::{Extractor, ExtractorDiff, RequestFragmentExtractor}; use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState}; +use crate::vector::session::{EmbedSession, Metadata, OnEmbed}; use crate::vector::settings::ReindexAction; -use crate::vector::{Embedder, Embedding}; +use crate::vector::{Embedder, Embedding, RuntimeEmbedder, RuntimeFragment}; use crate::{try_split_array_at, DocumentId, FieldId, Result, ThreadPoolNoAbort}; /// The length of the elements that are always in the buffer when inserting new values. @@ -37,12 +42,13 @@ pub struct ExtractedVectorPoints { pub remove_vectors: grenad::Reader>, // docid -> prompt pub prompts: grenad::Reader>, + // docid, extractor_id -> Option + pub inputs: grenad::Reader>, // embedder pub embedder_name: String, - pub embedder: Arc, - pub add_to_user_provided: RoaringBitmap, - pub remove_from_user_provided: RoaringBitmap, + pub runtime: Arc, + pub embedding_status_delta: EmbeddingStatusDelta, } enum VectorStateDelta { @@ -56,46 +62,74 @@ enum VectorStateDelta { // Remove any previous vector // Note: changing the value of the prompt **does require** recording this delta NowGenerated(String), + + // Add and remove the vectors computed from the fragments. + UpdateGeneratedFromFragments(Vec<(String, ExtractorDiff)>), + + /// Wasn't generated from fragments, but now is. + /// Delete any previous vectors and add the new vectors + NowGeneratedFromFragments(Vec<(String, Value)>), } impl VectorStateDelta { - fn into_values(self) -> (bool, String, Vec>) { + fn into_values(self) -> (bool, String, BTreeMap>, Vec>) { match self { VectorStateDelta::NoChange => Default::default(), - VectorStateDelta::NowRemoved => (true, Default::default(), Default::default()), - // We always delete the previous vectors - VectorStateDelta::NowManual(add) => (true, Default::default(), add), - VectorStateDelta::NowGenerated(prompt) => (true, prompt, Default::default()), + VectorStateDelta::NowRemoved => { + (true, Default::default(), Default::default(), Default::default()) + } + VectorStateDelta::NowManual(add) => (true, Default::default(), Default::default(), add), + VectorStateDelta::NowGenerated(prompt) => { + (true, prompt, Default::default(), Default::default()) + } + VectorStateDelta::UpdateGeneratedFromFragments(fragments) => ( + false, + Default::default(), + ExtractorDiff::into_list_of_changes(fragments), + Default::default(), + ), + VectorStateDelta::NowGeneratedFromFragments(items) => ( + true, + Default::default(), + ExtractorDiff::into_list_of_changes( + items.into_iter().map(|(name, value)| (name, ExtractorDiff::Added(value))), + ), + Default::default(), + ), } } } -struct EmbedderVectorExtractor { +struct EmbedderVectorExtractor<'a> { embedder_name: String, - embedder: Arc, - prompt: Arc, + embedder_info: &'a EmbedderInfo, + runtime: Arc, // (docid) -> (prompt) prompts_writer: Writer>, + // (docid, extractor_id) -> (Option) + inputs_writer: Writer>, // (docid) -> () remove_vectors_writer: Writer>, // (docid, _index) -> KvWriterDelAdd -> Vector manual_vectors_writer: Writer>, - // The docids of the documents that contains a user defined embedding - add_to_user_provided: RoaringBitmap, + embedding_status_delta: EmbeddingStatusDelta, action: ExtractionAction, } -struct DocumentOperation { - // The docids of the documents that contains an auto-generated embedding - remove_from_user_provided: RoaringBitmap, -} - enum ExtractionAction { SettingsFullReindex, - SettingsRegeneratePrompts { old_prompt: Arc }, - DocumentOperation(DocumentOperation), + SettingsRegeneratePrompts { + old_runtime: Arc, + }, + /// List of fragments to update/add + SettingsRegenerateFragments { + // name and indices, respectively in old and new runtime, of the fragments to examine. + must_regenerate_fragments: BTreeMap, usize)>, + old_runtime: Arc, + }, + DocumentOperation, } struct ManualEmbedderErrors { @@ -183,8 +217,8 @@ impl ManualEmbedderErrors { pub fn extract_vector_points( obkv_documents: grenad::Reader, indexer: GrenadParameters, - embedders_configs: &[IndexEmbeddingConfig], settings_diff: &InnerIndexSettingsDiff, + embedder_info: &[(String, EmbedderInfo)], possible_embedding_mistakes: &PossibleEmbeddingMistakes, ) -> Result<(Vec, UnusedVectorsDistribution)> { let mut unused_vectors_distribution = UnusedVectorsDistribution::new(); @@ -204,13 +238,13 @@ pub fn extract_vector_points( let mut configs = settings_diff.new.embedding_configs.clone().into_inner(); let old_configs = &settings_diff.old.embedding_configs; - if reindex_vectors { for (name, action) in settings_diff.embedding_config_updates.iter() { if let Some(action) = action.reindex() { - let Some((embedder_name, (embedder, prompt, _quantized))) = - configs.remove_entry(name) - else { + let (_, embedder_info) = + embedder_info.iter().find(|(embedder_name, _)| embedder_name == name).unwrap(); + + let Some((embedder_name, runtime)) = configs.remove_entry(name) else { tracing::error!(embedder = name, "Requested embedder config not found"); continue; }; @@ -229,6 +263,12 @@ pub fn extract_vector_points( tempfile::tempfile()?, ); + let inputs_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + // (docid) -> () let remove_vectors_writer = create_writer( indexer.chunk_compression_type, @@ -238,24 +278,66 @@ pub fn extract_vector_points( let action = match action { ReindexAction::FullReindex => ExtractionAction::SettingsFullReindex, - ReindexAction::RegeneratePrompts => { - let Some((_, old_prompt, _quantized)) = old_configs.get(name) else { + ReindexAction::RegenerateFragments(regenerate_fragments) => { + let Some(old_runtime) = old_configs.get(name) else { tracing::error!(embedder = name, "Old embedder config not found"); continue; }; - ExtractionAction::SettingsRegeneratePrompts { old_prompt } + let fragments = regenerate_fragments + .iter() + .filter_map(|(name, fragment)| match fragment { + crate::vector::settings::RegenerateFragment::Update => { + let old_value = old_runtime + .fragments + .binary_search_by_key(&name, |fragment| &fragment.name) + .ok(); + let Ok(new_value) = runtime + .fragments + .binary_search_by_key(&name, |fragment| &fragment.name) + else { + return None; + }; + Some((name.clone(), (old_value, new_value))) + } + // was already handled in transform + crate::vector::settings::RegenerateFragment::Remove => None, + crate::vector::settings::RegenerateFragment::Add => { + let Ok(new_value) = runtime + .fragments + .binary_search_by_key(&name, |fragment| &fragment.name) + else { + return None; + }; + Some((name.clone(), (None, new_value))) + } + }) + .collect(); + ExtractionAction::SettingsRegenerateFragments { + old_runtime, + must_regenerate_fragments: fragments, + } + } + + ReindexAction::RegeneratePrompts => { + let Some(old_runtime) = old_configs.get(name) else { + tracing::error!(embedder = name, "Old embedder config not found"); + continue; + }; + + ExtractionAction::SettingsRegeneratePrompts { old_runtime } } }; extractors.push(EmbedderVectorExtractor { embedder_name, - embedder, - prompt, + runtime, + embedder_info, prompts_writer, + inputs_writer, remove_vectors_writer, manual_vectors_writer, - add_to_user_provided: RoaringBitmap::new(), + embedding_status_delta: Default::default(), action, }); } else { @@ -264,8 +346,12 @@ pub fn extract_vector_points( } } else { // document operation + for (embedder_name, runtime) in configs.into_iter() { + let (_, embedder_info) = embedder_info + .iter() + .find(|(name, _)| embedder_name.as_str() == name.as_str()) + .unwrap(); - for (embedder_name, (embedder, prompt, _quantized)) in configs.into_iter() { // (docid, _index) -> KvWriterDelAdd -> Vector let manual_vectors_writer = create_writer( indexer.chunk_compression_type, @@ -280,6 +366,12 @@ pub fn extract_vector_points( tempfile::tempfile()?, ); + let inputs_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + // (docid) -> () let remove_vectors_writer = create_writer( indexer.chunk_compression_type, @@ -289,22 +381,23 @@ pub fn extract_vector_points( extractors.push(EmbedderVectorExtractor { embedder_name, - embedder, - prompt, + runtime, + embedder_info, prompts_writer, + inputs_writer, remove_vectors_writer, manual_vectors_writer, - add_to_user_provided: RoaringBitmap::new(), - action: ExtractionAction::DocumentOperation(DocumentOperation { - remove_from_user_provided: RoaringBitmap::new(), - }), + embedding_status_delta: Default::default(), + action: ExtractionAction::DocumentOperation, }); } } let mut key_buffer = Vec::new(); let mut cursor = obkv_documents.into_cursor()?; + let mut doc_alloc = Bump::new(); while let Some((key, value)) = cursor.move_on_next()? { + doc_alloc.reset(); // this must always be serialized as (docid, external_docid); const SIZE_OF_DOCUMENTID: usize = std::mem::size_of::(); let (docid_bytes, external_id_bytes) = @@ -320,9 +413,12 @@ pub fn extract_vector_points( // lazily get it when needed let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() }; + let regenerate_for_embedders = embedder_info + .iter() + .filter(|&(_, infos)| infos.embedding_status.must_regenerate(docid)) + .map(|(name, _)| name.clone()); let mut parsed_vectors = ParsedVectorsDiff::new( - docid, - embedders_configs, + regenerate_for_embedders, obkv, old_vectors_fid, new_vectors_fid, @@ -331,44 +427,40 @@ pub fn extract_vector_points( for EmbedderVectorExtractor { embedder_name, - embedder, - prompt, + runtime, + embedder_info, prompts_writer, + inputs_writer, remove_vectors_writer, manual_vectors_writer, - add_to_user_provided, + embedding_status_delta, action, } in extractors.iter_mut() { - let embedder_is_manual = matches!(**embedder, Embedder::UserProvided(_)); + let embedder_is_manual = matches!(*runtime.embedder, Embedder::UserProvided(_)); let (old, new) = parsed_vectors.remove(embedder_name); + let new_must_regenerate = new.must_regenerate(); let delta = match action { ExtractionAction::SettingsFullReindex => match old { // A full reindex can be triggered either by: // 1. a new embedder // 2. an existing embedder changed so that it must regenerate all generated embeddings. // For a new embedder, there can be `_vectors.embedder` embeddings to add to the DB - VectorState::Inline(vectors) => { - if !vectors.must_regenerate() { - add_to_user_provided.insert(docid); - } - - match vectors.into_array_of_vectors() { - Some(add_vectors) => { - if add_vectors.len() > usize::from(u8::MAX) { - return Err(crate::Error::UserError( - crate::UserError::TooManyVectors( - document_id().to_string(), - add_vectors.len(), - ), - )); - } - VectorStateDelta::NowManual(add_vectors) + VectorState::Inline(vectors) => match vectors.into_array_of_vectors() { + Some(add_vectors) => { + if add_vectors.len() > usize::from(u8::MAX) { + return Err(crate::Error::UserError( + crate::UserError::TooManyVectors( + document_id().to_string(), + add_vectors.len(), + ), + )); } - None => VectorStateDelta::NoChange, + VectorStateDelta::NowManual(add_vectors) } - } + None => VectorStateDelta::NoChange, + }, // this happens only when an existing embedder changed. We cannot regenerate userProvided vectors VectorState::Manual => VectorStateDelta::NoChange, // generated vectors must be regenerated @@ -381,11 +473,79 @@ pub fn extract_vector_points( ); continue; } - regenerate_prompt(obkv, prompt, new_fields_ids_map)? + let has_fragments = !runtime.fragments.is_empty(); + + if has_fragments { + regenerate_all_fragments( + &runtime.fragments, + &doc_alloc, + new_fields_ids_map, + obkv, + ) + } else { + regenerate_prompt(obkv, &runtime.document_template, new_fields_ids_map)? + } } }, + ExtractionAction::SettingsRegenerateFragments { + must_regenerate_fragments, + old_runtime, + } => { + if old.must_regenerate() { + let has_fragments = !runtime.fragments.is_empty(); + let old_has_fragments = !old_runtime.fragments.is_empty(); + + let is_adding_fragments = has_fragments && !old_has_fragments; + + if is_adding_fragments { + regenerate_all_fragments( + &runtime.fragments, + &doc_alloc, + new_fields_ids_map, + obkv, + ) + } else if !has_fragments { + // removing fragments + regenerate_prompt(obkv, &runtime.document_template, new_fields_ids_map)? + } else { + let mut fragment_diff = Vec::new(); + let new_fields_ids_map = new_fields_ids_map.as_fields_ids_map(); + + let obkv_document = crate::update::new::document::KvDelAddDocument::new( + obkv, + DelAdd::Addition, + new_fields_ids_map, + ); + for (name, (old_index, new_index)) in must_regenerate_fragments { + let Some(new) = runtime.fragments.get(*new_index) else { continue }; + + let new = + RequestFragmentExtractor::new(new, &doc_alloc).ignore_errors(); + + let diff = { + let old = old_index.as_ref().and_then(|old| { + let old = old_runtime.fragments.get(*old)?; + Some( + RequestFragmentExtractor::new(old, &doc_alloc) + .ignore_errors(), + ) + }); + let old = old.as_ref(); + Extractor::diff_settings(&new, &obkv_document, &(), old) + } + .expect("ignoring errors so this cannot fail"); + fragment_diff.push((name.clone(), diff)); + } + VectorStateDelta::UpdateGeneratedFromFragments(fragment_diff) + } + } else { + // we can simply ignore user provided vectors as they are not regenerated and are + // already in the DB since this is an existing embedder + VectorStateDelta::NoChange + } + } // prompt regeneration is only triggered for existing embedders - ExtractionAction::SettingsRegeneratePrompts { old_prompt } => { + ExtractionAction::SettingsRegeneratePrompts { old_runtime } => { if old.must_regenerate() { if embedder_is_manual { ManualEmbedderErrors::push_error( @@ -395,24 +555,32 @@ pub fn extract_vector_points( ); continue; } - regenerate_if_prompt_changed( - obkv, - (old_prompt, prompt), - (old_fields_ids_map, new_fields_ids_map), - )? + let has_fragments = !runtime.fragments.is_empty(); + + if has_fragments { + regenerate_all_fragments( + &runtime.fragments, + &doc_alloc, + new_fields_ids_map, + obkv, + ) + } else { + regenerate_if_prompt_changed( + obkv, + (&old_runtime.document_template, &runtime.document_template), + (old_fields_ids_map, new_fields_ids_map), + )? + } } else { // we can simply ignore user provided vectors as they are not regenerated and are // already in the DB since this is an existing embedder VectorStateDelta::NoChange } } - ExtractionAction::DocumentOperation(DocumentOperation { - remove_from_user_provided, - }) => extract_vector_document_diff( - docid, + ExtractionAction::DocumentOperation => extract_vector_document_diff( obkv, - prompt, - (add_to_user_provided, remove_from_user_provided), + runtime, + &doc_alloc, (old, new), (old_fields_ids_map, new_fields_ids_map), document_id, @@ -421,13 +589,25 @@ pub fn extract_vector_points( &mut manual_errors, )?, }; + + // update the embedding status + push_embedding_status_delta( + embedding_status_delta, + docid, + &delta, + new_must_regenerate, + &embedder_info.embedding_status, + ); + // and we finally push the unique vectors into the writer push_vectors_diff( remove_vectors_writer, prompts_writer, + inputs_writer, manual_vectors_writer, &mut key_buffer, delta, + &runtime.fragments, )?; } @@ -444,45 +624,65 @@ pub fn extract_vector_points( for EmbedderVectorExtractor { embedder_name, - embedder, - prompt: _, + runtime, + embedder_info: _, prompts_writer, + inputs_writer, remove_vectors_writer, - action, + action: _, manual_vectors_writer, - add_to_user_provided, + embedding_status_delta, } in extractors { - let remove_from_user_provided = - if let ExtractionAction::DocumentOperation(DocumentOperation { - remove_from_user_provided, - }) = action - { - remove_from_user_provided - } else { - Default::default() - }; - results.push(ExtractedVectorPoints { manual_vectors: writer_into_reader(manual_vectors_writer)?, remove_vectors: writer_into_reader(remove_vectors_writer)?, prompts: writer_into_reader(prompts_writer)?, - embedder, + inputs: writer_into_reader(inputs_writer)?, + runtime, embedder_name, - add_to_user_provided, - remove_from_user_provided, + embedding_status_delta, }) } Ok((results, unused_vectors_distribution)) } +fn push_embedding_status_delta( + embedding_status_delta: &mut EmbeddingStatusDelta, + docid: DocumentId, + delta: &VectorStateDelta, + new_must_regenerate: bool, + embedding_status: &EmbeddingStatus, +) { + let (old_is_user_provided, old_must_regenerate) = + embedding_status.is_user_provided_must_regenerate(docid); + let new_is_user_provided = match delta { + VectorStateDelta::NoChange => old_is_user_provided, + VectorStateDelta::NowRemoved => { + embedding_status_delta.clear_docid(docid, old_is_user_provided, old_must_regenerate); + return; + } + VectorStateDelta::NowManual(_) => true, + VectorStateDelta::NowGenerated(_) + | VectorStateDelta::UpdateGeneratedFromFragments(_) + | VectorStateDelta::NowGeneratedFromFragments(_) => false, + }; + + embedding_status_delta.push_delta( + docid, + old_is_user_provided, + old_must_regenerate, + new_is_user_provided, + new_must_regenerate, + ); +} + #[allow(clippy::too_many_arguments)] // feel free to find efficient way to factor arguments fn extract_vector_document_diff( - docid: DocumentId, obkv: &obkv::KvReader, - prompt: &Prompt, - (add_to_user_provided, remove_from_user_provided): (&mut RoaringBitmap, &mut RoaringBitmap), + runtime: &RuntimeEmbedder, + doc_alloc: &Bump, (old, new): (VectorState, VectorState), (old_fields_ids_map, new_fields_ids_map): (&FieldIdMapWithMetadata, &FieldIdMapWithMetadata), document_id: impl Fn() -> Value, @@ -490,16 +690,6 @@ fn extract_vector_document_diff( embedder_is_manual: bool, manual_errors: &mut Option, ) -> Result { - match (old.must_regenerate(), new.must_regenerate()) { - (true, true) | (false, false) => {} - (true, false) => { - add_to_user_provided.insert(docid); - } - (false, true) => { - remove_from_user_provided.insert(docid); - } - } - let delta = match (old, new) { // regardless of the previous state, if a document now contains inline _vectors, they must // be extracted manually @@ -530,19 +720,52 @@ fn extract_vector_document_diff( ManualEmbedderErrors::push_error(manual_errors, embedder_name, document_id); return Ok(VectorStateDelta::NoChange); } - // Don't give up if the old prompt was failing - let old_prompt = Some(&prompt).map(|p| { - p.render_kvdeladd(obkv, DelAdd::Deletion, old_fields_ids_map) - .unwrap_or_default() - }); - let new_prompt = - prompt.render_kvdeladd(obkv, DelAdd::Addition, new_fields_ids_map)?; - if old_prompt.as_ref() != Some(&new_prompt) { - let old_prompt = old_prompt.unwrap_or_default(); - tracing::trace!( - "🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}" - ); - VectorStateDelta::NowGenerated(new_prompt) + let has_fragments = !runtime.fragments.is_empty(); + if has_fragments { + let prompt = &runtime.document_template; + // Don't give up if the old prompt was failing + let old_prompt = Some(&prompt).map(|p| { + p.render_kvdeladd(obkv, DelAdd::Deletion, old_fields_ids_map) + .unwrap_or_default() + }); + let new_prompt = + prompt.render_kvdeladd(obkv, DelAdd::Addition, new_fields_ids_map)?; + if old_prompt.as_ref() != Some(&new_prompt) { + let old_prompt = old_prompt.unwrap_or_default(); + tracing::trace!( + "🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}" + ); + VectorStateDelta::NowGenerated(new_prompt) + } else { + let mut fragment_diff = Vec::new(); + let old_fields_ids_map = old_fields_ids_map.as_fields_ids_map(); + let new_fields_ids_map = new_fields_ids_map.as_fields_ids_map(); + + let old_document = crate::update::new::document::KvDelAddDocument::new( + obkv, + DelAdd::Deletion, + old_fields_ids_map, + ); + + let new_document = crate::update::new::document::KvDelAddDocument::new( + obkv, + DelAdd::Addition, + new_fields_ids_map, + ); + + for new in &runtime.fragments { + let name = &new.name; + let fragment = + RequestFragmentExtractor::new(new, doc_alloc).ignore_errors(); + + let diff = fragment + .diff_documents(&old_document, &new_document, &()) + .expect("ignoring errors so this cannot fail"); + + fragment_diff.push((name.clone(), diff)); + } + VectorStateDelta::UpdateGeneratedFromFragments(fragment_diff) + } } else { tracing::trace!("⏭️ Prompt unmodified, skipping"); VectorStateDelta::NoChange @@ -567,15 +790,25 @@ fn extract_vector_document_diff( ManualEmbedderErrors::push_error(manual_errors, embedder_name, document_id); return Ok(VectorStateDelta::NoChange); } - // becomes autogenerated - VectorStateDelta::NowGenerated(prompt.render_kvdeladd( - obkv, - DelAdd::Addition, - new_fields_ids_map, - )?) + + let has_fragments = !runtime.fragments.is_empty(); + + if has_fragments { + regenerate_all_fragments( + &runtime.fragments, + doc_alloc, + new_fields_ids_map, + obkv, + ) + } else { + // becomes autogenerated + VectorStateDelta::NowGenerated(runtime.document_template.render_kvdeladd( + obkv, + DelAdd::Addition, + new_fields_ids_map, + )?) + } } else { - // make sure the document is always removed from user provided on removal - remove_from_user_provided.insert(docid); VectorStateDelta::NowRemoved } } @@ -593,8 +826,6 @@ fn extract_vector_document_diff( // then they are user-provided and nothing possibly changed VectorStateDelta::NoChange } else { - // make sure the document is always removed from user provided on removal - remove_from_user_provided.insert(docid); VectorStateDelta::NowRemoved } } @@ -629,16 +860,45 @@ fn regenerate_prompt( Ok(VectorStateDelta::NowGenerated(prompt)) } +fn regenerate_all_fragments<'a>( + fragments: impl IntoIterator, + doc_alloc: &Bump, + new_fields_ids_map: &FieldIdMapWithMetadata, + obkv: &KvReaderU16, +) -> VectorStateDelta { + let mut fragment_diff = Vec::new(); + let new_fields_ids_map = new_fields_ids_map.as_fields_ids_map(); + + let obkv_document = crate::update::new::document::KvDelAddDocument::new( + obkv, + DelAdd::Addition, + new_fields_ids_map, + ); + for new in fragments { + let name = &new.name; + let new = RequestFragmentExtractor::new(new, doc_alloc).ignore_errors(); + + let diff = + { new.extract(&obkv_document, &()) }.expect("ignoring errors so this cannot fail"); + if let Some(value) = diff { + fragment_diff.push((name.clone(), value)); + } + } + VectorStateDelta::NowGeneratedFromFragments(fragment_diff) +} + /// We cannot compute the diff between both Del and Add vectors. /// We'll push every vector and compute the difference later in TypedChunk. fn push_vectors_diff( remove_vectors_writer: &mut Writer>, prompts_writer: &mut Writer>, + inputs_writer: &mut Writer>, manual_vectors_writer: &mut Writer>, key_buffer: &mut Vec, delta: VectorStateDelta, + fragments: &[RuntimeFragment], ) -> Result<()> { - let (must_remove, prompt, mut add_vectors) = delta.into_values(); + let (must_remove, prompt, mut fragment_delta, mut add_vectors) = delta.into_values(); if must_remove { key_buffer.truncate(TRUNCATE_SIZE); remove_vectors_writer.insert(&key_buffer, [])?; @@ -648,23 +908,49 @@ fn push_vectors_diff( prompts_writer.insert(&key_buffer, prompt.as_bytes())?; } - // We sort and dedup the vectors - add_vectors.sort_unstable_by(|a, b| compare_vectors(a, b)); - add_vectors.dedup_by(|a, b| compare_vectors(a, b).is_eq()); + if !fragment_delta.is_empty() { + let mut scratch = Vec::new(); + let mut fragment_delta: Vec<_> = fragments + .iter() + .filter_map(|fragment| { + let delta = fragment_delta.remove(&fragment.name)?; + Some((fragment.id, delta)) + }) + .collect(); - // insert vectors into the writer - for (i, vector) in add_vectors.into_iter().enumerate().take(u16::MAX as usize) { - // Generate the key by extending the unique index to it. - key_buffer.truncate(TRUNCATE_SIZE); - let index = u16::try_from(i).unwrap(); - key_buffer.extend_from_slice(&index.to_be_bytes()); + fragment_delta.sort_unstable_by_key(|(id, _)| *id); + for (id, value) in fragment_delta { + key_buffer.truncate(TRUNCATE_SIZE); + key_buffer.push(id); + if let Some(value) = value { + scratch.clear(); + serde_json::to_writer(&mut scratch, &value).unwrap(); + inputs_writer.insert(&key_buffer, &scratch)?; + } else { + inputs_writer.insert(&key_buffer, [])?; + } + } + } - // We insert only the Add part of the Obkv to inform - // that we only want to remove all those vectors. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Addition, cast_slice(&vector))?; - let bytes = obkv.into_inner()?; - manual_vectors_writer.insert(&key_buffer, bytes)?; + if !add_vectors.is_empty() { + // We sort and dedup the vectors + add_vectors.sort_unstable_by(|a, b| compare_vectors(a, b)); + add_vectors.dedup_by(|a, b| compare_vectors(a, b).is_eq()); + + // insert vectors into the writer + for (i, vector) in add_vectors.into_iter().enumerate().take(u16::MAX as usize) { + // Generate the key by extending the unique index to it. + key_buffer.truncate(TRUNCATE_SIZE); + let index = u16::try_from(i).unwrap(); + key_buffer.extend_from_slice(&index.to_be_bytes()); + + // We insert only the Add part of the Obkv to inform + // that we only want to remove all those vectors. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, cast_slice(&vector))?; + let bytes = obkv.into_inner()?; + manual_vectors_writer.insert(&key_buffer, bytes)?; + } } Ok(()) @@ -677,17 +963,18 @@ fn compare_vectors(a: &[f32], b: &[f32]) -> Ordering { #[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] -pub fn extract_embeddings( +pub fn extract_embeddings_from_prompts( // docid, prompt prompt_reader: grenad::Reader, indexer: GrenadParameters, - embedder: Arc, + runtime: Arc, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { + let embedder = &runtime.embedder; let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism let n_vectors_per_chunk = embedder.prompt_count_in_chunk_hint(); // number of vectors in a single chunk @@ -723,7 +1010,7 @@ pub fn extract_embeddings( if chunks.len() == chunks.capacity() { let chunked_embeds = embed_chunks( - &embedder, + embedder, std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), embedder_name, possible_embedding_mistakes, @@ -746,7 +1033,7 @@ pub fn extract_embeddings( // send last chunk if !chunks.is_empty() { let chunked_embeds = embed_chunks( - &embedder, + embedder, std::mem::take(&mut chunks), embedder_name, possible_embedding_mistakes, @@ -765,7 +1052,7 @@ pub fn extract_embeddings( if !current_chunk.is_empty() { let embeds = embed_chunks( - &embedder, + embedder, vec![std::mem::take(&mut current_chunk)], embedder_name, possible_embedding_mistakes, @@ -838,3 +1125,183 @@ fn embed_chunks( } } } + +#[allow(clippy::too_many_arguments)] +#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] +pub fn extract_embeddings_from_fragments( + // (docid, extractor_id) -> (Option) + inputs_reader: grenad::Reader, + indexer: GrenadParameters, + runtime: Arc, + embedder_name: &str, + possible_embedding_mistakes: &PossibleEmbeddingMistakes, + embedder_stats: &EmbedderStats, + unused_vectors_distribution: &UnusedVectorsDistribution, + request_threads: &ThreadPoolNoAbort, +) -> Result>> { + let doc_alloc = Bump::new(); + + // (docid, extractor_id) -> (Option) + let vector_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + if inputs_reader.is_empty() { + return writer_into_reader(vector_writer); + } + + let on_embed = WriteGrenadOnEmbed { + waiting_responses: Default::default(), + vector_writer, + scratch: Default::default(), + possible_embedding_mistakes, + }; + + let mut session = EmbedSession::new( + &runtime.embedder, + embedder_name, + request_threads, + &doc_alloc, + embedder_stats, + on_embed, + ); + + let mut cursor = inputs_reader.into_cursor()?; + + while let Some((mut key, value)) = cursor.move_on_next()? { + let docid = key.read_u32::().unwrap(); + let extractor_id = key.read_u8().unwrap(); + + if value.is_empty() { + // no value => removed fragment + session.on_embed_mut().push_response(docid, extractor_id); + } else { + // unwrap: the grenad value was saved as a serde_json::Value + let value: Value = serde_json::from_slice(value).unwrap(); + session.request_embedding( + Metadata { docid, external_docid: "", extractor_id }, + value, + unused_vectors_distribution, + )?; + } + } + + // send last chunk + let on_embed = session.drain(unused_vectors_distribution)?; + on_embed.finish() +} + +struct WriteGrenadOnEmbed<'a> { + // list of (document_id, extractor_id) for which vectors should be removed. + // these are written whenever a response arrives that has a larger (docid, extractor_id). + waiting_responses: VecDeque<(DocumentId, u8)>, + + // grenad of (docid, extractor_id) -> (Option) + vector_writer: Writer>, + + possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, + + // scratch buffer used to write keys + scratch: Vec, +} + +impl WriteGrenadOnEmbed<'_> { + pub fn push_response(&mut self, docid: DocumentId, extractor_id: u8) { + self.waiting_responses.push_back((docid, extractor_id)); + } + + pub fn finish(mut self) -> Result>> { + for (docid, extractor_id) in self.waiting_responses { + self.scratch.clear(); + self.scratch.write_u32::(docid).unwrap(); + self.scratch.write_u8(extractor_id).unwrap(); + self.vector_writer.insert(&self.scratch, []).unwrap(); + } + writer_into_reader(self.vector_writer) + } +} + +impl<'doc> OnEmbed<'doc> for WriteGrenadOnEmbed<'_> { + type ErrorMetadata = UnusedVectorsDistribution; + fn process_embedding_response( + &mut self, + response: crate::vector::session::EmbeddingResponse<'doc>, + ) { + let (docid, extractor_id) = (response.metadata.docid, response.metadata.extractor_id); + while let Some(waiting_response) = self.waiting_responses.pop_front() { + if (docid, extractor_id) > waiting_response { + self.scratch.clear(); + self.scratch.write_u32::(docid).unwrap(); + self.scratch.write_u8(extractor_id).unwrap(); + self.vector_writer.insert(&self.scratch, []).unwrap(); + } else { + self.waiting_responses.push_front(waiting_response); + break; + } + } + + if let Some(embedding) = response.embedding { + self.scratch.clear(); + self.scratch.write_u32::(docid).unwrap(); + self.scratch.write_u8(extractor_id).unwrap(); + self.vector_writer.insert(&self.scratch, cast_slice(embedding.as_slice())).unwrap(); + } + } + + fn process_embedding_error( + &mut self, + error: crate::vector::error::EmbedError, + embedder_name: &'doc str, + unused_vectors_distribution: &crate::vector::error::UnusedVectorsDistribution, + _metadata: &[crate::vector::session::Metadata<'doc>], + ) -> crate::Error { + if let FaultSource::Bug = error.fault { + crate::Error::InternalError(crate::InternalError::VectorEmbeddingError(error.into())) + } else { + let mut msg = + format!(r"While embedding documents for embedder `{embedder_name}`: {error}"); + + if let EmbedErrorKind::ManualEmbed(_) = &error.kind { + msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`."); + } + + let mut hint_count = 0; + + for (vector_misspelling, count) in + self.possible_embedding_mistakes.vector_mistakes().take(2) + { + msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s)."); + hint_count += 1; + } + + for (embedder_misspelling, count) in self + .possible_embedding_mistakes + .embedder_mistakes(embedder_name, unused_vectors_distribution) + .take(2) + { + msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s)."); + hint_count += 1; + } + + if hint_count == 0 { + if let EmbedErrorKind::ManualEmbed(_) = &error.kind { + msg += &format!( + "\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`" + ); + } + } + + crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)) + } + } + + fn process_embeddings( + &mut self, + _metadata: crate::vector::session::Metadata<'doc>, + _embeddings: Vec, + ) { + unimplemented!("unused") + } +} diff --git a/crates/milli/src/update/index_documents/extract/mod.rs b/crates/milli/src/update/index_documents/extract/mod.rs index d640bc075..cbf4ceba2 100644 --- a/crates/milli/src/update/index_documents/extract/mod.rs +++ b/crates/milli/src/update/index_documents/extract/mod.rs @@ -23,16 +23,17 @@ use self::extract_fid_docid_facet_values::{extract_fid_docid_facet_values, Extra use self::extract_fid_word_count_docids::extract_fid_word_count_docids; use self::extract_geo_points::extract_geo_points; use self::extract_vector_points::{ - extract_embeddings, extract_vector_points, ExtractedVectorPoints, + extract_embeddings_from_prompts, extract_vector_points, ExtractedVectorPoints, }; use self::extract_word_docids::extract_word_docids; use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids; use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; -use crate::index::IndexEmbeddingConfig; use crate::progress::EmbedderStats; +use crate::update::index_documents::extract::extract_vector_points::extract_embeddings_from_fragments; use crate::update::settings::InnerIndexSettingsDiff; +use crate::vector::db::EmbedderInfo; use crate::vector::error::PossibleEmbeddingMistakes; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; @@ -46,9 +47,9 @@ pub(crate) fn data_from_obkv_documents( indexer: GrenadParameters, lmdb_writer_sx: Sender>, primary_key_id: FieldId, - embedders_configs: Arc>, settings_diff: Arc, max_positions_per_attributes: Option, + embedder_info: Arc>, possible_embedding_mistakes: Arc, embedder_stats: &Arc, ) -> Result<()> { @@ -61,8 +62,8 @@ pub(crate) fn data_from_obkv_documents( original_documents_chunk, indexer, lmdb_writer_sx.clone(), - embedders_configs.clone(), settings_diff.clone(), + embedder_info.clone(), possible_embedding_mistakes.clone(), embedder_stats.clone(), ) @@ -231,8 +232,8 @@ fn send_original_documents_data( original_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, - embedders_configs: Arc>, settings_diff: Arc, + embedder_info: Arc>, possible_embedding_mistakes: Arc, embedder_stats: Arc, ) -> Result<()> { @@ -245,7 +246,6 @@ fn send_original_documents_data( if index_vectors { let settings_diff = settings_diff.clone(); - let embedders_configs = embedders_configs.clone(); let original_documents_chunk = original_documents_chunk.clone(); let lmdb_writer_sx = lmdb_writer_sx.clone(); @@ -253,8 +253,8 @@ fn send_original_documents_data( match extract_vector_points( original_documents_chunk.clone(), indexer, - &embedders_configs, &settings_diff, + embedder_info.as_slice(), &possible_embedding_mistakes, ) { Ok((extracted_vectors, unused_vectors_distribution)) => { @@ -262,16 +262,16 @@ fn send_original_documents_data( manual_vectors, remove_vectors, prompts, + inputs, embedder_name, - embedder, - add_to_user_provided, - remove_from_user_provided, + runtime, + embedding_status_delta, } in extracted_vectors { - let embeddings = match extract_embeddings( + let embeddings_from_prompts = match extract_embeddings_from_prompts( prompts, indexer, - embedder.clone(), + runtime.clone(), &embedder_name, &possible_embedding_mistakes, &embedder_stats, @@ -284,18 +284,37 @@ fn send_original_documents_data( None } }; + + let embeddings_from_fragments = match extract_embeddings_from_fragments( + inputs, + indexer, + runtime.clone(), + &embedder_name, + &possible_embedding_mistakes, + &embedder_stats, + &unused_vectors_distribution, + request_threads(), + ) { + Ok(results) => Some(results), + Err(error) => { + let _ = lmdb_writer_sx.send(Err(error)); + None + } + }; + if !(remove_vectors.is_empty() && manual_vectors.is_empty() - && embeddings.as_ref().is_none_or(|e| e.is_empty())) + && embeddings_from_prompts.as_ref().is_none_or(|e| e.is_empty()) + && embeddings_from_fragments.as_ref().is_none_or(|e| e.is_empty())) { let _ = lmdb_writer_sx.send(Ok(TypedChunk::VectorPoints { remove_vectors, - embeddings, - expected_dimension: embedder.dimensions(), + embeddings_from_prompts, + embeddings_from_fragments, + expected_dimension: runtime.embedder.dimensions(), manual_vectors, embedder_name, - add_to_user_provided, - remove_from_user_provided, + embedding_status_delta, })); } } diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 5ec6910f7..055b8bbad 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -38,7 +38,8 @@ pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; -use crate::vector::{ArroyWrapper, EmbeddingConfigs}; +use crate::vector::db::EmbedderInfo; +use crate::vector::{ArroyWrapper, RuntimeEmbedders}; use crate::{CboRoaringBitmapCodec, Index, Result, UserError}; static MERGED_DATABASE_COUNT: usize = 7; @@ -81,7 +82,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> { should_abort: FA, added_documents: u64, deleted_documents: u64, - embedders: EmbeddingConfigs, + embedders: RuntimeEmbedders, embedder_stats: &'t Arc, } @@ -172,7 +173,7 @@ where Ok((self, Ok(indexed_documents))) } - pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { + pub fn with_embedders(mut self, embedders: RuntimeEmbedders) -> Self { self.embedders = embedders; self } @@ -226,7 +227,13 @@ where settings_diff.new.recompute_searchables(self.wtxn, self.index)?; let settings_diff = Arc::new(settings_diff); - let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?); + let embedder_infos: heed::Result> = self + .index + .embedding_configs() + .iter_embedder_info(self.wtxn)? + .map(|res| res.map(|(name, info)| (name.to_owned(), info))) + .collect(); + let embedder_infos = Arc::new(embedder_infos?); let possible_embedding_mistakes = crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); @@ -328,9 +335,9 @@ where pool_params, lmdb_writer_sx.clone(), primary_key_id, - embedders_configs.clone(), settings_diff_cloned, max_positions_per_attributes, + embedder_infos, Arc::new(possible_embedding_mistakes), &embedder_stats ) @@ -430,21 +437,21 @@ where TypedChunk::VectorPoints { expected_dimension, remove_vectors, - embeddings, + embeddings_from_prompts, + embeddings_from_fragments, manual_vectors, embedder_name, - add_to_user_provided, - remove_from_user_provided, + embedding_status_delta, } => { dimension.insert(embedder_name.clone(), expected_dimension); TypedChunk::VectorPoints { remove_vectors, - embeddings, + embeddings_from_prompts, + embeddings_from_fragments, expected_dimension, manual_vectors, embedder_name, - add_to_user_provided, - remove_from_user_provided, + embedding_status_delta, } } otherwise => otherwise, @@ -480,7 +487,7 @@ where // we should insert it in `dimension` for (name, action) in settings_diff.embedding_config_updates.iter() { if action.is_being_quantized && !dimension.contains_key(name.as_str()) { - let index = self.index.embedder_category_id.get(self.wtxn, name)?.ok_or( + let index = self.index.embedding_configs().embedder_id(self.wtxn, name)?.ok_or( InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None, @@ -488,7 +495,9 @@ where )?; let reader = ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized); - let dim = reader.dimensions(self.wtxn)?; + let Some(dim) = reader.dimensions(self.wtxn)? else { + continue; + }; dimension.insert(name.to_string(), dim); } } @@ -498,12 +507,19 @@ where let vector_arroy = self.index.vector_arroy; let cancel = &self.should_abort; - let embedder_index = self.index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or( - InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None }, - )?; + let embedder_index = + self.index.embedding_configs().embedder_id(wtxn, &embedder_name)?.ok_or( + InternalError::DatabaseMissingEntry { + db_name: "embedder_category_id", + key: None, + }, + )?; let embedder_config = settings_diff.embedding_config_updates.get(&embedder_name); - let was_quantized = - settings_diff.old.embedding_configs.get(&embedder_name).is_some_and(|conf| conf.2); + let was_quantized = settings_diff + .old + .embedding_configs + .get(&embedder_name) + .is_some_and(|conf| conf.is_quantized); let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized); pool.install(|| { @@ -773,11 +789,11 @@ mod tests { use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::documents::mmap_from_objects; use crate::index::tests::TempIndex; - use crate::index::IndexEmbeddingConfig; use crate::progress::Progress; use crate::search::TermsMatchingStrategy; use crate::update::new::indexer; use crate::update::Setting; + use crate::vector::db::IndexEmbeddingConfig; use crate::{all_obkv_to_json, db_snap, Filter, FilterableAttributesRule, Search, UserError}; #[test] @@ -2028,7 +2044,7 @@ mod tests { new_fields_ids_map, primary_key, &document_changes, - EmbeddingConfigs::default(), + RuntimeEmbedders::default(), &|| false, &Progress::default(), &Default::default(), @@ -2116,7 +2132,7 @@ mod tests { new_fields_ids_map, primary_key, &document_changes, - EmbeddingConfigs::default(), + RuntimeEmbedders::default(), &|| false, &Progress::default(), &Default::default(), @@ -2277,7 +2293,7 @@ mod tests { ]); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.replace_documents(&documents).unwrap(); indexer.delete_documents(&["2"]); @@ -2343,7 +2359,7 @@ mod tests { indexer.delete_documents(&["1", "2"]); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let (document_changes, _operation_stats, primary_key) = indexer .into_changes( &indexer_alloc, @@ -2394,7 +2410,7 @@ mod tests { { "id": 3, "name": "jean", "age": 25 }, ]); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.update_documents(&documents).unwrap(); @@ -2446,7 +2462,7 @@ mod tests { { "id": 3, "legs": 4 }, ]); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.update_documents(&documents).unwrap(); indexer.delete_documents(&["1", "2"]); @@ -2496,7 +2512,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.delete_documents(&["1", "2"]); @@ -2552,7 +2568,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.delete_documents(&["1", "2", "1", "2"]); @@ -2611,7 +2627,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); let documents = documents!([ @@ -2661,7 +2677,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.delete_documents(&["1"]); @@ -2775,6 +2791,8 @@ mod tests { document_template: Setting::NotSet, document_template_max_bytes: Setting::NotSet, url: Setting::NotSet, + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, request: Setting::NotSet, response: Setting::NotSet, distribution: Setting::NotSet, @@ -2801,17 +2819,27 @@ mod tests { .unwrap(); let rtxn = index.read_txn().unwrap(); - let mut embedding_configs = index.embedding_configs(&rtxn).unwrap(); - let IndexEmbeddingConfig { name: embedder_name, config: embedder, user_provided } = + let embedders = index.embedding_configs(); + let mut embedding_configs = embedders.embedding_configs(&rtxn).unwrap(); + let IndexEmbeddingConfig { name: embedder_name, config: embedder, fragments } = embedding_configs.pop().unwrap(); + let info = embedders.embedder_info(&rtxn, &embedder_name).unwrap().unwrap(); + insta::assert_snapshot!(info.embedder_id, @"0"); + insta::assert_debug_snapshot!(info.embedding_status.user_provided_docids(), @"RoaringBitmap<[0, 1, 2]>"); + insta::assert_debug_snapshot!(info.embedding_status.skip_regenerate_docids(), @"RoaringBitmap<[0, 1, 2]>"); insta::assert_snapshot!(embedder_name, @"manual"); - insta::assert_debug_snapshot!(user_provided, @"RoaringBitmap<[0, 1, 2]>"); + insta::assert_debug_snapshot!(fragments, @r###" + FragmentConfigs( + [], + ) + "###); + let embedder = std::sync::Arc::new( crate::vector::Embedder::new(embedder.embedder_options, 0).unwrap(), ); let res = index .search(&rtxn) - .semantic(embedder_name, embedder, false, Some([0.0, 1.0, 2.0].to_vec())) + .semantic(embedder_name, embedder, false, Some([0.0, 1.0, 2.0].to_vec()), None) .execute() .unwrap(); assert_eq!(res.documents_ids.len(), 3); @@ -2860,7 +2888,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); // OP @@ -2921,7 +2949,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); indexer.delete_documents(&["1"]); @@ -2980,7 +3008,7 @@ mod tests { let mut new_fields_ids_map = db_fields_ids_map.clone(); let indexer_alloc = Bump::new(); - let embedders = EmbeddingConfigs::default(); + let embedders = RuntimeEmbedders::default(); let mut indexer = indexer::DocumentOperation::new(); let documents = documents!([ diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index e17625ad4..e07483aff 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -31,7 +31,7 @@ use crate::update::index_documents::GrenadParameters; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; use crate::update::{AvailableIds, UpdateIndexingStep}; use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; -use crate::vector::settings::WriteBackToDocuments; +use crate::vector::settings::{RemoveFragments, WriteBackToDocuments}; use crate::vector::ArroyWrapper; use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, Index, Result}; @@ -933,10 +933,47 @@ impl<'a, 'i> Transform<'a, 'i> { // delete all vectors from the embedders that need removal for (_, (reader, _)) in readers { - let dimensions = reader.dimensions(wtxn)?; + let Some(dimensions) = reader.dimensions(wtxn)? else { + continue; + }; reader.clear(wtxn, dimensions)?; } + // remove all vectors for the specified fragments + for (embedder_name, RemoveFragments { fragment_ids }, was_quantized) in + settings_diff.embedding_config_updates.iter().filter_map(|(name, action)| { + action.remove_fragments().map(|fragments| (name, fragments, action.was_quantized)) + }) + { + let Some(infos) = self.index.embedding_configs().embedder_info(wtxn, embedder_name)? + else { + continue; + }; + let arroy = + ArroyWrapper::new(self.index.vector_arroy, infos.embedder_id, was_quantized); + let Some(dimensions) = arroy.dimensions(wtxn)? else { + continue; + }; + for fragment_id in fragment_ids { + // we must keep the user provided embeddings that ended up in this store + + if infos.embedding_status.user_provided_docids().is_empty() { + // no user provided: clear store + arroy.clear_store(wtxn, *fragment_id, dimensions)?; + continue; + } + + // some user provided, remove only the ids that are not user provided + let to_delete = arroy.items_in_store(wtxn, *fragment_id, |items| { + items - infos.embedding_status.user_provided_docids() + })?; + + for to_delete in to_delete { + arroy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?; + } + } + } + let grenad_params = GrenadParameters { chunk_compression_type: self.indexer_settings.chunk_compression_type, chunk_compression_level: self.indexer_settings.chunk_compression_level, diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index 6d575a98b..370579a6c 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -4,6 +4,7 @@ use std::fs::File; use std::io::{self, BufReader}; use bytemuck::allocation::pod_collect_to_vec; +use byteorder::{BigEndian, ReadBytesExt as _}; use grenad::{MergeFunction, Merger, MergerBuilder}; use heed::types::Bytes; use heed::{BytesDecode, RwTxn}; @@ -18,7 +19,6 @@ use super::helpers::{ use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; use crate::index::db_name::DOCUMENTS; -use crate::index::IndexEmbeddingConfig; use crate::proximity::MAX_DISTANCE; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; @@ -26,6 +26,7 @@ use crate::update::index_documents::helpers::{ as_cloneable_grenad, try_split_array_at, KeepLatestObkv, }; use crate::update::settings::InnerIndexSettingsDiff; +use crate::vector::db::{EmbeddingStatusDelta, IndexEmbeddingConfig}; use crate::vector::ArroyWrapper; use crate::{ lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, @@ -86,12 +87,14 @@ pub(crate) enum TypedChunk { GeoPoints(grenad::Reader>), VectorPoints { remove_vectors: grenad::Reader>, - embeddings: Option>>, + // docid -> vector + embeddings_from_prompts: Option>>, + // docid, extractor_id -> Option, + embeddings_from_fragments: Option>>, expected_dimension: usize, manual_vectors: grenad::Reader>, embedder_name: String, - add_to_user_provided: RoaringBitmap, - remove_from_user_provided: RoaringBitmap, + embedding_status_delta: EmbeddingStatusDelta, }, } @@ -155,6 +158,7 @@ pub(crate) fn write_typed_chunk_into_index( let mut iter = merger.into_stream_merger_iter()?; let embedders: BTreeSet<_> = index + .embedding_configs() .embedding_configs(wtxn)? .into_iter() .map(|IndexEmbeddingConfig { name, .. }| name) @@ -614,57 +618,66 @@ pub(crate) fn write_typed_chunk_into_index( let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); let _entered = span.enter(); + let embedders = index.embedding_configs(); + let mut remove_vectors_builder = MergerBuilder::new(KeepFirst); let mut manual_vectors_builder = MergerBuilder::new(KeepFirst); - let mut embeddings_builder = MergerBuilder::new(KeepFirst); - let mut add_to_user_provided = RoaringBitmap::new(); - let mut remove_from_user_provided = RoaringBitmap::new(); + let mut embeddings_from_prompts_builder = MergerBuilder::new(KeepFirst); + let mut embeddings_from_fragments_builder = MergerBuilder::new(KeepFirst); let mut params = None; + let mut infos = None; for typed_chunk in typed_chunks { let TypedChunk::VectorPoints { remove_vectors, manual_vectors, - embeddings, + embeddings_from_prompts, + embeddings_from_fragments, expected_dimension, embedder_name, - add_to_user_provided: aud, - remove_from_user_provided: rud, + embedding_status_delta, } = typed_chunk else { unreachable!(); }; + if infos.is_none() { + infos = Some(embedders.embedder_info(wtxn, &embedder_name)?.ok_or( + InternalError::DatabaseMissingEntry { + db_name: "embedder_category_id", + key: None, + }, + )?); + } + params = Some((expected_dimension, embedder_name)); remove_vectors_builder.push(remove_vectors.into_cursor()?); manual_vectors_builder.push(manual_vectors.into_cursor()?); - if let Some(embeddings) = embeddings { - embeddings_builder.push(embeddings.into_cursor()?); + if let Some(embeddings) = embeddings_from_prompts { + embeddings_from_prompts_builder.push(embeddings.into_cursor()?); + } + if let Some(embeddings) = embeddings_from_fragments { + embeddings_from_fragments_builder.push(embeddings.into_cursor()?); + } + + if let Some(infos) = &mut infos { + embedding_status_delta.apply_to(&mut infos.embedding_status); } - add_to_user_provided |= aud; - remove_from_user_provided |= rud; } // typed chunks has always at least 1 chunk. let Some((expected_dimension, embedder_name)) = params else { unreachable!() }; + let Some(infos) = infos else { unreachable!() }; - let mut embedding_configs = index.embedding_configs(wtxn)?; - let index_embedder_config = embedding_configs - .iter_mut() - .find(|IndexEmbeddingConfig { name, .. }| name == &embedder_name) - .unwrap(); - index_embedder_config.user_provided -= remove_from_user_provided; - index_embedder_config.user_provided |= add_to_user_provided; + embedders.put_embedder_info(wtxn, &embedder_name, &infos)?; - index.put_embedding_configs(wtxn, embedding_configs)?; - - let embedder_index = index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or( - InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None }, - )?; - let binary_quantized = - settings_diff.old.embedding_configs.get(&embedder_name).is_some_and(|conf| conf.2); + let binary_quantized = settings_diff + .old + .embedding_configs + .get(&embedder_name) + .is_some_and(|conf| conf.is_quantized); // FIXME: allow customizing distance - let writer = ArroyWrapper::new(index.vector_arroy, embedder_index, binary_quantized); + let writer = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, binary_quantized); // remove vectors for docids we want them removed let merger = remove_vectors_builder.build(); @@ -674,8 +687,8 @@ pub(crate) fn write_typed_chunk_into_index( writer.del_items(wtxn, expected_dimension, docid)?; } - // add generated embeddings - let merger = embeddings_builder.build(); + // add generated embeddings -- from prompts + let merger = embeddings_from_prompts_builder.build(); let mut iter = merger.into_stream_merger_iter()?; while let Some((key, value)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); @@ -702,6 +715,24 @@ pub(crate) fn write_typed_chunk_into_index( writer.add_items(wtxn, docid, &embeddings)?; } + // add generated embeddings -- from fragments + let merger = embeddings_from_fragments_builder.build(); + let mut iter = merger.into_stream_merger_iter()?; + while let Some((mut key, value)) = iter.next()? { + let docid = key.read_u32::().unwrap(); + let extractor_id = key.read_u8().unwrap(); + if value.is_empty() { + writer.del_item_in_store(wtxn, docid, extractor_id, expected_dimension)?; + } else { + let data = pod_collect_to_vec(value); + // it is a code error to have embeddings and not expected_dimension + if data.len() != expected_dimension { + panic!("wrong dimensions") + } + writer.add_item_in_store(wtxn, docid, extractor_id, &data)?; + } + } + // perform the manual diff let merger = manual_vectors_builder.build(); let mut iter = merger.into_stream_merger_iter()?; diff --git a/crates/milli/src/vector/parsed_vectors.rs b/crates/milli/src/vector/parsed_vectors.rs index 5fcb2912b..36e80677a 100644 --- a/crates/milli/src/vector/parsed_vectors.rs +++ b/crates/milli/src/vector/parsed_vectors.rs @@ -6,9 +6,8 @@ use serde_json::value::RawValue; use serde_json::{from_slice, Value}; use super::Embedding; -use crate::index::IndexEmbeddingConfig; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; -use crate::{DocumentId, FieldId, InternalError, UserError}; +use crate::{FieldId, InternalError, UserError}; #[derive(serde::Serialize, Debug)] #[serde(untagged)] @@ -374,8 +373,7 @@ pub struct ParsedVectorsDiff { impl ParsedVectorsDiff { pub fn new( - docid: DocumentId, - embedders_configs: &[IndexEmbeddingConfig], + regenerate_for_embedders: impl Iterator, documents_diff: &KvReader, old_vectors_fid: Option, new_vectors_fid: Option, @@ -396,10 +394,8 @@ impl ParsedVectorsDiff { } } .flatten().map_or(BTreeMap::default(), |del| del.into_iter().map(|(name, vec)| (name, VectorState::Inline(vec))).collect()); - for embedding_config in embedders_configs { - if embedding_config.user_provided.contains(docid) { - old.entry(embedding_config.name.to_string()).or_insert(VectorState::Manual); - } + for name in regenerate_for_embedders { + old.entry(name).or_insert(VectorState::Generated); } let new = 'new: {