diff --git a/crates/milli/src/vector/settings.rs b/crates/milli/src/vector/settings.rs index 712c1faa5..93de37290 100644 --- a/crates/milli/src/vector/settings.rs +++ b/crates/milli/src/vector/settings.rs @@ -2,6 +2,8 @@ use std::collections::BTreeMap; use std::num::NonZeroUsize; use deserr::Deserr; +use either::Either; +use itertools::Itertools; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; @@ -229,6 +231,35 @@ pub struct EmbeddingSettings { /// - 🏗️ When modified for sources `ollama` and `rest`, embeddings are always regenerated pub url: Setting, + /// Template fragments that will be reassembled and sent to the remote embedder at indexing time. + /// + /// # Availability + /// + /// - This parameter is available for sources `rest`. + /// + /// # 🔄 Reindexing + /// + /// - 🏗️ When a fragment is deleted by passing `null` to its name, the corresponding embeddings are removed from documents. + /// - 🏗️ When a fragment is modified, the corresponding embeddings are regenerated if their rendered version changes. + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[deserr(default)] + #[schema(value_type = Option>)] + pub indexing_fragments: Setting>>, + + /// Template fragments that will be reassembled and sent to the remote embedder at search time. + /// + /// # Availability + /// + /// - This parameter is available for sources `rest`. + /// + /// # 🔄 Reindexing + /// + /// - 🌱 Changing the value of this parameter never regenerates embeddings + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[deserr(default)] + #[schema(value_type = Option>)] + pub search_fragments: Setting>>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] #[deserr(default)] #[schema(value_type = Option)] @@ -483,6 +514,36 @@ pub struct SubEmbeddingSettings { /// - 🌱 When modified for source `openAi`, embeddings are never regenerated /// - 🏗️ When modified for sources `ollama` and `rest`, embeddings are always regenerated pub url: Setting, + + /// Template fragments that will be reassembled and sent to the remote embedder at indexing time. + /// + /// # Availability + /// + /// - This parameter is available for sources `rest`. + /// + /// # 🔄 Reindexing + /// + /// - 🏗️ When a fragment is deleted by passing `null` to its name, the corresponding embeddings are removed from documents. + /// - 🏗️ When a fragment is modified, the corresponding embeddings are regenerated if their rendered version changes. + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[deserr(default)] + #[schema(value_type = Option>)] + pub indexing_fragments: Setting>>, + + /// Template fragments that will be reassembled and sent to the remote embedder at search time. + /// + /// # Availability + /// + /// - This parameter is available for sources `rest`. + /// + /// # 🔄 Reindexing + /// + /// - 🌱 Changing the value of this parameter never regenerates embeddings + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[deserr(default)] + #[schema(value_type = Option>)] + pub search_fragments: Setting>>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] #[deserr(default)] #[schema(value_type = Option)] @@ -555,16 +616,24 @@ pub struct SubEmbeddingSettings { } /// Indicates what action should take place during a reindexing operation for an embedder -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ReindexAction { /// An indexing operation should take place for this embedder, keeping existing vectors /// and checking whether the document template changed or not RegeneratePrompts, + RegenerateFragments(Vec<(String, RegenerateFragment)>), /// An indexing operation should take place for all documents for this embedder, removing existing vectors /// (except userProvided ones) FullReindex, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum RegenerateFragment { + Update, + Remove, + Add, +} + pub enum SettingsDiff { Remove, Reindex { action: ReindexAction, updated_settings: EmbeddingSettings, quantize: bool }, @@ -577,6 +646,12 @@ pub struct EmbedderAction { pub is_being_quantized: bool, pub write_back: Option, pub reindex: Option, + pub remove_fragments: Option, +} + +#[derive(Debug)] +pub struct RemoveFragments { + pub fragment_ids: Vec, } impl EmbedderAction { @@ -592,6 +667,10 @@ impl EmbedderAction { self.reindex.as_ref() } + pub fn remove_fragments(&self) -> Option<&RemoveFragments> { + self.remove_fragments.as_ref() + } + pub fn with_is_being_quantized(mut self, quantize: bool) -> Self { self.is_being_quantized = quantize; self @@ -603,11 +682,23 @@ impl EmbedderAction { is_being_quantized: false, write_back: Some(write_back), reindex: None, + remove_fragments: None, } } pub fn with_reindex(reindex: ReindexAction, was_quantized: bool) -> Self { - Self { was_quantized, is_being_quantized: false, write_back: None, reindex: Some(reindex) } + Self { + was_quantized, + is_being_quantized: false, + write_back: None, + reindex: Some(reindex), + remove_fragments: None, + } + } + + pub fn with_remove_fragments(mut self, remove_fragments: RemoveFragments) -> Self { + self.remove_fragments = Some(remove_fragments); + self } } @@ -634,6 +725,8 @@ impl SettingsDiff { mut dimensions, mut document_template, mut url, + mut indexing_fragments, + mut search_fragments, mut request, mut response, mut search_embedder, @@ -653,6 +746,8 @@ impl SettingsDiff { dimensions: new_dimensions, document_template: new_document_template, url: new_url, + indexing_fragments: new_indexing_fragments, + search_fragments: new_search_fragments, request: new_request, response: new_response, search_embedder: new_search_embedder, @@ -684,6 +779,8 @@ impl SettingsDiff { &mut document_template, &mut document_template_max_bytes, &mut url, + &mut indexing_fragments, + &mut search_fragments, &mut request, &mut response, &mut headers, @@ -696,6 +793,8 @@ impl SettingsDiff { new_document_template, new_document_template_max_bytes, new_url, + new_indexing_fragments, + new_search_fragments, new_request, new_response, new_headers, @@ -722,6 +821,8 @@ impl SettingsDiff { dimensions, document_template, url, + indexing_fragments, + search_fragments, request, response, search_embedder, @@ -769,6 +870,8 @@ impl SettingsDiff { mut document_template, mut document_template_max_bytes, mut url, + mut indexing_fragments, + mut search_fragments, mut request, mut response, mut headers, @@ -794,6 +897,8 @@ impl SettingsDiff { document_template: new_document_template, document_template_max_bytes: new_document_template_max_bytes, url: new_url, + indexing_fragments: new_indexing_fragments, + search_fragments: new_search_fragments, request: new_request, response: new_response, headers: new_headers, @@ -814,6 +919,8 @@ impl SettingsDiff { &mut document_template, &mut document_template_max_bytes, &mut url, + &mut indexing_fragments, + &mut search_fragments, &mut request, &mut response, &mut headers, @@ -826,6 +933,8 @@ impl SettingsDiff { new_document_template, new_document_template_max_bytes, new_url, + new_indexing_fragments, + new_search_fragments, new_request, new_response, new_headers, @@ -846,6 +955,8 @@ impl SettingsDiff { dimensions, document_template, url, + indexing_fragments, + search_fragments, request, response, headers, @@ -875,6 +986,8 @@ impl SettingsDiff { document_template: &mut Setting, document_template_max_bytes: &mut Setting, url: &mut Setting, + indexing_fragments: &mut Setting>>, + search_fragments: &mut Setting>>, request: &mut Setting, response: &mut Setting, headers: &mut Setting>, @@ -887,6 +1000,8 @@ impl SettingsDiff { new_document_template: Setting, new_document_template_max_bytes: Setting, new_url: Setting, + new_indexing_fragments: Setting>>, + new_search_fragments: Setting>>, new_request: Setting, new_response: Setting, new_headers: Setting>, @@ -902,6 +1017,8 @@ impl SettingsDiff { pooling, dimensions, url, + indexing_fragments, + search_fragments, request, response, document_template, @@ -941,6 +1058,104 @@ impl SettingsDiff { } } } + + *search_fragments = match (std::mem::take(search_fragments), new_search_fragments) { + (Setting::Set(search_fragments), Setting::Set(new_search_fragments)) => { + Setting::Set( + search_fragments + .into_iter() + .merge_join_by(new_search_fragments, |(left, _), (right, _)| { + left.cmp(right) + }) + .map(|eob| { + match eob { + // merge fragments + itertools::EitherOrBoth::Both((name, _), (_, right)) => { + (name, right) + } + // unchanged fragment + itertools::EitherOrBoth::Left(left) => left, + // new fragment + itertools::EitherOrBoth::Right(right) => right, + } + }) + .collect(), + ) + } + (_, Setting::Reset) => Setting::Reset, + (left, Setting::NotSet) => left, + (Setting::NotSet | Setting::Reset, Setting::Set(new_search_fragments)) => { + Setting::Set(new_search_fragments) + } + }; + + let mut regenerate_fragments = Vec::new(); + *indexing_fragments = match (std::mem::take(indexing_fragments), new_indexing_fragments) { + (Setting::Set(fragments), Setting::Set(new_fragments)) => { + Setting::Set( + fragments + .into_iter() + .merge_join_by(new_fragments, |(left, _), (right, _)| left.cmp(right)) + .map(|eob| { + match eob { + // merge fragments + itertools::EitherOrBoth::Both( + (name, left), + (other_name, right), + ) => { + if left == right { + (name, left) + } else { + match right { + Some(right) => { + regenerate_fragments + .push((other_name, RegenerateFragment::Update)); + (name, Some(right)) + } + None => { + regenerate_fragments + .push((other_name, RegenerateFragment::Remove)); + (name, None) + } + } + } + } + // unchanged fragment + itertools::EitherOrBoth::Left(left) => left, + // new fragment + itertools::EitherOrBoth::Right((name, right)) => { + if right.is_some() { + regenerate_fragments + .push((name.clone(), RegenerateFragment::Add)); + } + (name, right) + } + } + }) + .collect(), + ) + } + // remove all fragments => move to document template + (_, Setting::Reset) => { + ReindexAction::push_action(reindex_action, ReindexAction::FullReindex); + Setting::Reset + } + // add all fragments + (Setting::NotSet | Setting::Reset, Setting::Set(new_fragments)) => { + ReindexAction::push_action(reindex_action, ReindexAction::FullReindex); + + Setting::Set(new_fragments) + } + // no change + (left, Setting::NotSet) => left, + }; + if !regenerate_fragments.is_empty() { + ReindexAction::push_action( + reindex_action, + ReindexAction::RegenerateFragments(regenerate_fragments), + ); + } + if request.apply(new_request) { ReindexAction::push_action(reindex_action, ReindexAction::FullReindex); } @@ -972,10 +1187,16 @@ impl SettingsDiff { impl ReindexAction { fn push_action(this: &mut Option, other: Self) { - *this = match (*this, other) { - (_, ReindexAction::FullReindex) => Some(ReindexAction::FullReindex), - (Some(ReindexAction::FullReindex), _) => Some(ReindexAction::FullReindex), - (_, ReindexAction::RegeneratePrompts) => Some(ReindexAction::RegeneratePrompts), + use ReindexAction::*; + *this = match (this.take(), other) { + (_, FullReindex) => Some(FullReindex), + (Some(FullReindex), _) => Some(FullReindex), + (_, RegenerateFragments(fragments)) => Some(RegenerateFragments(fragments)), + (Some(RegenerateFragments(fragments)), RegeneratePrompts) => { + Some(RegenerateFragments(fragments)) + } + (Some(RegeneratePrompts), RegeneratePrompts) => Some(RegeneratePrompts), + (None, RegeneratePrompts) => Some(RegeneratePrompts), } } } @@ -988,6 +1209,8 @@ fn apply_default_for_source( pooling: &mut Setting, dimensions: &mut Setting, url: &mut Setting, + indexing_fragments: &mut Setting>>, + search_fragments: &mut Setting>>, request: &mut Setting, response: &mut Setting, document_template: &mut Setting, @@ -1003,6 +1226,8 @@ fn apply_default_for_source( *pooling = Setting::Reset; *dimensions = Setting::NotSet; *url = Setting::NotSet; + *indexing_fragments = Setting::NotSet; + *search_fragments = Setting::NotSet; *request = Setting::NotSet; *response = Setting::NotSet; *headers = Setting::NotSet; @@ -1015,6 +1240,8 @@ fn apply_default_for_source( *pooling = Setting::NotSet; *dimensions = Setting::Reset; *url = Setting::NotSet; + *indexing_fragments = Setting::NotSet; + *search_fragments = Setting::NotSet; *request = Setting::NotSet; *response = Setting::NotSet; *headers = Setting::NotSet; @@ -1027,6 +1254,8 @@ fn apply_default_for_source( *pooling = Setting::NotSet; *dimensions = Setting::NotSet; *url = Setting::Reset; + *indexing_fragments = Setting::NotSet; + *search_fragments = Setting::NotSet; *request = Setting::NotSet; *response = Setting::NotSet; *headers = Setting::NotSet; @@ -1039,6 +1268,8 @@ fn apply_default_for_source( *pooling = Setting::NotSet; *dimensions = Setting::Reset; *url = Setting::Reset; + *indexing_fragments = Setting::Reset; + *search_fragments = Setting::Reset; *request = Setting::Reset; *response = Setting::Reset; *headers = Setting::Reset; @@ -1051,6 +1282,8 @@ fn apply_default_for_source( *pooling = Setting::NotSet; *dimensions = Setting::Reset; *url = Setting::NotSet; + *indexing_fragments = Setting::NotSet; + *search_fragments = Setting::NotSet; *request = Setting::NotSet; *response = Setting::NotSet; *document_template = Setting::NotSet; @@ -1065,6 +1298,8 @@ fn apply_default_for_source( *pooling = Setting::NotSet; *dimensions = Setting::NotSet; *url = Setting::NotSet; + *indexing_fragments = Setting::NotSet; + *search_fragments = Setting::NotSet; *request = Setting::NotSet; *response = Setting::NotSet; *document_template = Setting::NotSet; @@ -1131,6 +1366,8 @@ pub enum MetaEmbeddingSetting { DocumentTemplate, DocumentTemplateMaxBytes, Url, + IndexingFragments, + SearchFragments, Request, Response, Headers, @@ -1153,6 +1390,8 @@ impl MetaEmbeddingSetting { DocumentTemplate => "documentTemplate", DocumentTemplateMaxBytes => "documentTemplateMaxBytes", Url => "url", + IndexingFragments => "indexingFragments", + SearchFragments => "searchFragments", Request => "request", Response => "response", Headers => "headers", @@ -1176,6 +1415,8 @@ impl EmbeddingSettings { dimensions: &Setting, api_key: &Setting, url: &Setting, + indexing_fragments: &Setting>>, + search_fragments: &Setting>>, request: &Setting, response: &Setting, document_template: &Setting, @@ -1210,6 +1451,20 @@ impl EmbeddingSettings { )?; Self::check_setting(embedder_name, source, MetaEmbeddingSetting::ApiKey, context, api_key)?; Self::check_setting(embedder_name, source, MetaEmbeddingSetting::Url, context, url)?; + Self::check_setting( + embedder_name, + source, + MetaEmbeddingSetting::IndexingFragments, + context, + indexing_fragments, + )?; + Self::check_setting( + embedder_name, + source, + MetaEmbeddingSetting::SearchFragments, + context, + search_fragments, + )?; Self::check_setting( embedder_name, source, @@ -1348,8 +1603,8 @@ impl EmbeddingSettings { ) => FieldStatus::Allowed, ( OpenAi, - Revision | Pooling | Request | Response | Headers | SearchEmbedder - | IndexingEmbedder, + Revision | Pooling | IndexingFragments | SearchFragments | Request | Response + | Headers | SearchEmbedder | IndexingEmbedder, _, ) => FieldStatus::Disallowed, ( @@ -1359,8 +1614,8 @@ impl EmbeddingSettings { ) => FieldStatus::Allowed, ( HuggingFace, - ApiKey | Dimensions | Url | Request | Response | Headers | SearchEmbedder - | IndexingEmbedder, + ApiKey | Dimensions | Url | IndexingFragments | SearchFragments | Request + | Response | Headers | SearchEmbedder | IndexingEmbedder, _, ) => FieldStatus::Disallowed, (Ollama, Model, _) => FieldStatus::Mandatory, @@ -1371,8 +1626,8 @@ impl EmbeddingSettings { ) => FieldStatus::Allowed, ( Ollama, - Revision | Pooling | Request | Response | Headers | SearchEmbedder - | IndexingEmbedder, + Revision | Pooling | IndexingFragments | SearchFragments | Request | Response + | Headers | SearchEmbedder | IndexingEmbedder, _, ) => FieldStatus::Disallowed, (UserProvided, Dimensions, _) => FieldStatus::Mandatory, @@ -1386,6 +1641,8 @@ impl EmbeddingSettings { | DocumentTemplate | DocumentTemplateMaxBytes | Url + | IndexingFragments + | SearchFragments | Request | Response | Headers @@ -1404,6 +1661,10 @@ impl EmbeddingSettings { | Headers, _, ) => FieldStatus::Allowed, + (Rest, IndexingFragments, NotNested | Indexing) => FieldStatus::Allowed, + (Rest, IndexingFragments, Search) => FieldStatus::Disallowed, + (Rest, SearchFragments, NotNested | Search) => FieldStatus::Allowed, + (Rest, SearchFragments, Indexing) => FieldStatus::Disallowed, (Rest, Model | Revision | Pooling | SearchEmbedder | IndexingEmbedder, _) => { FieldStatus::Disallowed } @@ -1419,6 +1680,8 @@ impl EmbeddingSettings { | DocumentTemplate | DocumentTemplateMaxBytes | Url + | IndexingFragments + | SearchFragments | Request | Response | Headers, @@ -1512,6 +1775,11 @@ impl std::fmt::Display for EmbedderSource { } } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, Deserr, ToSchema)] +pub struct Fragment { + pub value: serde_json::Value, +} + impl EmbeddingSettings { fn from_hugging_face( super::hf::EmbedderOptions { @@ -1534,6 +1802,8 @@ impl EmbeddingSettings { document_template, document_template_max_bytes, url: Setting::NotSet, + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, request: Setting::NotSet, response: Setting::NotSet, headers: Setting::NotSet, @@ -1566,6 +1836,8 @@ impl EmbeddingSettings { document_template, document_template_max_bytes, url: Setting::some_or_not_set(url), + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, request: Setting::NotSet, response: Setting::NotSet, headers: Setting::NotSet, @@ -1598,6 +1870,8 @@ impl EmbeddingSettings { document_template, document_template_max_bytes, url: Setting::some_or_not_set(url), + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, request: Setting::NotSet, response: Setting::NotSet, headers: Setting::NotSet, @@ -1622,6 +1896,8 @@ impl EmbeddingSettings { document_template: Setting::NotSet, document_template_max_bytes: Setting::NotSet, url: Setting::NotSet, + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, request: Setting::NotSet, response: Setting::NotSet, headers: Setting::NotSet, @@ -1638,6 +1914,8 @@ impl EmbeddingSettings { dimensions, url, request, + indexing_fragments, + search_fragments, response, distribution, headers, @@ -1656,6 +1934,26 @@ impl EmbeddingSettings { document_template, document_template_max_bytes, url: Setting::Set(url), + indexing_fragments: if indexing_fragments.is_empty() { + Setting::NotSet + } else { + Setting::Set( + indexing_fragments + .into_iter() + .map(|(name, fragment)| (name, Some(Fragment { value: fragment }))) + .collect(), + ) + }, + search_fragments: if search_fragments.is_empty() { + Setting::NotSet + } else { + Setting::Set( + search_fragments + .into_iter() + .map(|(name, fragment)| (name, Some(Fragment { value: fragment }))) + .collect(), + ) + }, request: Setting::Set(request), response: Setting::Set(response), distribution: Setting::some_or_not_set(distribution), @@ -1714,6 +2012,8 @@ impl From for EmbeddingSettings { document_template: Setting::NotSet, document_template_max_bytes: Setting::NotSet, url: Setting::NotSet, + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, request: Setting::NotSet, response: Setting::NotSet, headers: Setting::NotSet, @@ -1786,6 +2086,8 @@ impl From for SubEmbeddingSettings { document_template, document_template_max_bytes, url, + indexing_fragments, + search_fragments, request, response, headers, @@ -1804,6 +2106,8 @@ impl From for SubEmbeddingSettings { document_template, document_template_max_bytes, url, + indexing_fragments, + search_fragments, request, response, headers, @@ -1828,6 +2132,8 @@ impl From for EmbeddingConfig { document_template, document_template_max_bytes, url, + indexing_fragments, + search_fragments, request, response, distribution, @@ -1879,6 +2185,8 @@ impl From for EmbeddingConfig { EmbedderSource::Rest => SubEmbedderOptions::rest( url.set().unwrap(), api_key, + indexing_fragments, + search_fragments, request.set().unwrap(), response.set().unwrap(), headers, @@ -1922,6 +2230,8 @@ impl SubEmbedderOptions { document_template: _, document_template_max_bytes: _, url, + indexing_fragments, + search_fragments, request, response, headers, @@ -1944,6 +2254,8 @@ impl SubEmbedderOptions { EmbedderSource::Rest => Self::rest( url.set().unwrap(), api_key, + indexing_fragments, + search_fragments, request.set().unwrap(), response.set().unwrap(), headers, @@ -2010,9 +2322,13 @@ impl SubEmbedderOptions { distribution: distribution.set(), }) } + + #[allow(clippy::too_many_arguments)] fn rest( url: String, api_key: Setting, + indexing_fragments: Setting>>, + search_fragments: Setting>>, request: serde_json::Value, response: serde_json::Value, headers: Setting>, @@ -2027,6 +2343,22 @@ impl SubEmbedderOptions { response, distribution: distribution.set(), headers: headers.set().unwrap_or_default(), + search_fragments: search_fragments + .set() + .unwrap_or_default() + .into_iter() + .filter_map(|(name, fragment)| { + Some((name, fragment.map(|fragment| fragment.value)?)) + }) + .collect(), + indexing_fragments: indexing_fragments + .set() + .unwrap_or_default() + .into_iter() + .filter_map(|(name, fragment)| { + Some((name, fragment.map(|fragment| fragment.value)?)) + }) + .collect(), }) } fn ollama( @@ -2066,3 +2398,20 @@ impl From for EmbedderOptions { } } } + +pub(crate) fn fragments_from_settings( + setting: &Setting, +) -> impl Iterator + '_ { + let Some(setting) = setting.as_ref().set() else { return Either::Left(None.into_iter()) }; + if let Some(setting) = setting.indexing_fragments.as_ref().set() { + Either::Right(setting.keys().cloned()) + } else { + let Some(setting) = setting.indexing_embedder.as_ref().set() else { + return Either::Left(None.into_iter()); + }; + let Some(setting) = setting.indexing_fragments.as_ref().set() else { + return Either::Left(None.into_iter()); + }; + Either::Right(setting.keys().cloned()) + } +}