Rework extraction to use EmbedderAction

This commit is contained in:
Louis Dureuil 2024-06-12 14:11:29 +02:00
parent d1dd7e5d09
commit f5cf01e7d1
No known key found for this signature in database
4 changed files with 318 additions and 161 deletions

View File

@ -17,9 +17,10 @@ use crate::index::IndexEmbeddingConfig;
use crate::prompt::Prompt;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::parsed_vectors::{ParsedVectorsDiff, RESERVED_VECTORS_FIELD_NAME};
use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME};
use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::Embedder;
use crate::{try_split_array_at, DocumentId, Result, ThreadPoolNoAbort};
use crate::{try_split_array_at, DocumentId, FieldId, FieldsIdsMap, Result, ThreadPoolNoAbort};
/// The length of the elements that are always in the buffer when inserting new values.
const TRUNCATE_SIZE: usize = size_of::<DocumentId>();
@ -35,7 +36,7 @@ pub struct ExtractedVectorPoints {
// embedder
pub embedder_name: String,
pub embedder: Arc<Embedder>,
pub user_provided: RoaringBitmap,
pub add_to_user_provided: RoaringBitmap,
pub remove_from_user_provided: RoaringBitmap,
}
@ -44,12 +45,7 @@ enum VectorStateDelta {
// Remove all vectors, generated or manual, from this document
NowRemoved,
// Add the manually specified vectors, passed in the other grenad
// Remove any previously generated vectors
// Note: changing the value of the manually specified vector **should not record** this delta
WasGeneratedNowManual(Vec<Vec<f32>>),
ManualDelta(Vec<Vec<f32>>),
NowManual(Vec<Vec<f32>>),
// Add the vector computed from the specified prompt
// Remove any previous vector
@ -62,9 +58,8 @@ impl VectorStateDelta {
match self {
VectorStateDelta::NoChange => Default::default(),
VectorStateDelta::NowRemoved => (true, Default::default(), Default::default()),
VectorStateDelta::WasGeneratedNowManual(add) => (true, Default::default(), add),
// We always delete the previous vectors
VectorStateDelta::ManualDelta(add) => (true, Default::default(), add),
VectorStateDelta::NowManual(add) => (true, Default::default(), add),
VectorStateDelta::NowGenerated(prompt) => (true, prompt, Default::default()),
}
}
@ -75,19 +70,29 @@ struct EmbedderVectorExtractor {
embedder: Arc<Embedder>,
prompt: Arc<Prompt>,
// (docid, _index) -> KvWriterDelAdd -> Vector
manual_vectors_writer: Writer<BufWriter<File>>,
// (docid) -> (prompt)
prompts_writer: Writer<BufWriter<File>>,
// (docid) -> ()
remove_vectors_writer: Writer<BufWriter<File>>,
// (docid, _index) -> KvWriterDelAdd -> Vector
manual_vectors_writer: Writer<BufWriter<File>>,
// The docids of the documents that contains a user defined embedding
user_provided: RoaringBitmap,
add_to_user_provided: RoaringBitmap,
action: ExtractionAction,
}
struct DocumentOperation {
// The docids of the documents that contains an auto-generated embedding
remove_from_user_provided: RoaringBitmap,
}
enum ExtractionAction {
SettingsFullReindex,
SettingsRegeneratePrompts { old_prompt: Arc<Prompt> },
DocumentOperation(DocumentOperation),
}
/// Extracts the embedding vector contained in each document under the `_vectors` field.
///
/// Returns the generated grenad reader containing the docid as key associated to the Vec<f32>
@ -104,46 +109,109 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
let new_fields_ids_map = &settings_diff.new.fields_ids_map;
// the vector field id may have changed
let old_vectors_fid = old_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME);
// filter the old vector fid if the settings has been changed forcing reindexing.
let old_vectors_fid = old_vectors_fid.filter(|_| !reindex_vectors);
let new_vectors_fid = new_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME);
let mut extractors = Vec::new();
for (embedder_name, (embedder, prompt)) in
settings_diff.new.embedding_configs.clone().into_iter()
{
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> (prompt)
let prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let mut configs = settings_diff.new.embedding_configs.clone().into_inner();
let old_configs = &settings_diff.old.embedding_configs;
// (docid) -> ()
let remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
if reindex_vectors {
for (name, action) in settings_diff.embedding_config_updates.iter() {
match action {
EmbedderAction::WriteBackToDocuments(_) => continue, // already deleted
EmbedderAction::Reindex(action) => {
let Some((embedder_name, (embedder, prompt))) = configs.remove_entry(name)
else {
tracing::error!(embedder = name, "Requested embedder config not found");
continue;
};
extractors.push(EmbedderVectorExtractor {
embedder_name,
embedder,
prompt,
manual_vectors_writer,
prompts_writer,
remove_vectors_writer,
user_provided: RoaringBitmap::new(),
remove_from_user_provided: RoaringBitmap::new(),
});
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> (prompt)
let prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> ()
let remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let action = match action {
ReindexAction::FullReindex => ExtractionAction::SettingsFullReindex,
ReindexAction::RegeneratePrompts => {
let Some((_, old_prompt)) = old_configs.get(name) else {
tracing::error!(embedder = name, "Old embedder config not found");
continue;
};
ExtractionAction::SettingsRegeneratePrompts { old_prompt }
}
};
extractors.push(EmbedderVectorExtractor {
embedder_name,
embedder,
prompt,
prompts_writer,
remove_vectors_writer,
manual_vectors_writer,
add_to_user_provided: RoaringBitmap::new(),
action,
});
}
}
}
} else {
// document operation
for (embedder_name, (embedder, prompt)) in configs.into_iter() {
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> (prompt)
let prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> ()
let remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
extractors.push(EmbedderVectorExtractor {
embedder_name,
embedder,
prompt,
prompts_writer,
remove_vectors_writer,
manual_vectors_writer,
add_to_user_provided: RoaringBitmap::new(),
action: ExtractionAction::DocumentOperation(DocumentOperation {
remove_from_user_provided: RoaringBitmap::new(),
}),
});
}
}
let mut key_buffer = Vec::new();
@ -177,111 +245,66 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
embedder_name,
embedder: _,
prompt,
manual_vectors_writer,
prompts_writer,
remove_vectors_writer,
user_provided,
remove_from_user_provided,
manual_vectors_writer,
add_to_user_provided,
action,
} in extractors.iter_mut()
{
let delta = match parsed_vectors.remove(embedder_name) {
(Some(old), Some(new)) => {
match (old.map_or(true, |old| old.is_user_provided()), new.is_user_provided()) {
(true, true) | (false, false) => (),
(true, false) => {
remove_from_user_provided.insert(docid);
let (old, new) = parsed_vectors.remove(embedder_name);
let delta = match action {
ExtractionAction::SettingsFullReindex => match old {
// A full reindex can be triggered either by:
// 1. a new embedder
// 2. an existing embedder changed so that it must regenerate all generated embeddings.
// For a new embedder, there can be `_vectors.embedder` embeddings to add to the DB
VectorState::Inline(vectors) => {
if vectors.is_user_provided() {
add_to_user_provided.insert(docid);
}
(false, true) => {
user_provided.insert(docid);
let add_vectors = vectors.into_array_of_vectors();
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::NowManual(add_vectors)
}
// no autogeneration
let add_vectors = new.into_array_of_vectors();
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::ManualDelta(add_vectors)
}
(Some(old), None) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept && old.is_some() {
remove_from_user_provided.insert(docid);
// becomes autogenerated
VectorStateDelta::NowGenerated(prompt.render(
// this happens only when an existing embedder changed. We cannot regenerate userProvided vectors
VectorState::InDb => VectorStateDelta::NoChange,
// generated vectors must be regenerated
VectorState::Generated => regenerate_prompt(obkv, prompt, new_fields_ids_map)?,
},
// prompt regeneration is only triggered for existing embedders
ExtractionAction::SettingsRegeneratePrompts { old_prompt } => {
if !old.is_user_provided() {
regenerate_if_prompt_changed(
obkv,
DelAdd::Addition,
new_fields_ids_map,
)?)
} else if document_is_kept && old.is_none() {
(old_prompt, prompt),
(&old_fields_ids_map, &new_fields_ids_map),
)?
} else {
// we can simply ignore user provided vectors as they are not regenerated and are
// already in the DB since this is an existing embedder
VectorStateDelta::NoChange
} else {
remove_from_user_provided.insert(docid);
VectorStateDelta::NowRemoved
}
}
(None, Some(new)) => {
if new.is_user_provided() {
user_provided.insert(docid);
} else {
remove_from_user_provided.insert(docid);
}
// was possibly autogenerated, remove all vectors for that document
let add_vectors = new.into_array_of_vectors();
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::WasGeneratedNowManual(add_vectors)
}
(None, None) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// Don't give up if the old prompt was failing
let old_prompt = Some(&prompt)
// TODO: this filter works because we erase the vec database when a embedding setting changes.
// When vector pipeline will be optimized, this should be removed.
.filter(|_| !settings_diff.reindex_vectors())
.map(|p| {
p.render(obkv, DelAdd::Deletion, old_fields_ids_map)
.unwrap_or_default()
});
let new_prompt =
prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
if old_prompt.as_ref() != Some(&new_prompt) {
let old_prompt = old_prompt.unwrap_or_default();
tracing::trace!(
"🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}"
);
VectorStateDelta::NowGenerated(new_prompt)
} else {
tracing::trace!("⏭️ Prompt unmodified, skipping");
VectorStateDelta::NoChange
}
} else {
remove_from_user_provided.remove(docid);
VectorStateDelta::NowRemoved
}
}
ExtractionAction::DocumentOperation(DocumentOperation {
remove_from_user_provided,
}) => extract_vector_document_diff(
docid,
obkv,
prompt,
(add_to_user_provided, remove_from_user_provided),
(old, new),
(&old_fields_ids_map, &new_fields_ids_map),
document_id,
)?,
};
// and we finally push the unique vectors into the writer
push_vectors_diff(
remove_vectors_writer,
@ -289,7 +312,6 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
manual_vectors_writer,
&mut key_buffer,
delta,
reindex_vectors,
)?;
}
}
@ -300,20 +322,30 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
embedder_name,
embedder,
prompt: _,
manual_vectors_writer,
prompts_writer,
remove_vectors_writer,
user_provided,
remove_from_user_provided,
action,
manual_vectors_writer,
add_to_user_provided,
} in extractors
{
let remove_from_user_provided =
if let ExtractionAction::DocumentOperation(DocumentOperation {
remove_from_user_provided,
}) = action
{
remove_from_user_provided
} else {
Default::default()
};
results.push(ExtractedVectorPoints {
manual_vectors: writer_into_reader(manual_vectors_writer)?,
remove_vectors: writer_into_reader(remove_vectors_writer)?,
prompts: writer_into_reader(prompts_writer)?,
embedder,
embedder_name,
user_provided,
add_to_user_provided,
remove_from_user_provided,
})
}
@ -321,6 +353,136 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
Ok(results)
}
fn extract_vector_document_diff(
docid: DocumentId,
obkv: obkv::KvReader<'_, FieldId>,
prompt: &Prompt,
(add_to_user_provided, remove_from_user_provided): (&mut RoaringBitmap, &mut RoaringBitmap),
(old, new): (VectorState, VectorState),
(old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap),
document_id: impl Fn() -> Value,
) -> Result<VectorStateDelta> {
match (old.is_user_provided(), new.is_user_provided()) {
(true, true) | (false, false) => {}
(true, false) => {
remove_from_user_provided.insert(docid);
}
(false, true) => {
add_to_user_provided.insert(docid);
}
}
let delta = match (old, new) {
// regardless of the previous state, if a document now contains inline _vectors, they must
// be extracted manually
(_old, VectorState::Inline(new)) => {
let add_vectors = new.into_array_of_vectors();
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::NowManual(add_vectors)
}
// no `_vectors` anywhere, we check for document removal and otherwise we regenerate the prompt if the
// document changed
(VectorState::Generated, VectorState::Generated) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// Don't give up if the old prompt was failing
let old_prompt = Some(&prompt).map(|p| {
p.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or_default()
});
let new_prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
if old_prompt.as_ref() != Some(&new_prompt) {
let old_prompt = old_prompt.unwrap_or_default();
tracing::trace!(
"🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}"
);
VectorStateDelta::NowGenerated(new_prompt)
} else {
tracing::trace!("⏭️ Prompt unmodified, skipping");
VectorStateDelta::NoChange
}
} else {
VectorStateDelta::NowRemoved
}
}
// when the vectors are no longer user-provided,
// we generate the prompt unconditionally
(_not_generated, VectorState::Generated) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// becomes autogenerated
VectorStateDelta::NowGenerated(prompt.render(
obkv,
DelAdd::Addition,
new_fields_ids_map,
)?)
} else {
// make sure the document is always removed from user provided on removal
remove_from_user_provided.insert(docid);
VectorStateDelta::NowRemoved
}
}
(_old, VectorState::InDb) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// if the new version of documents has the vectors in the DB,
// then they are user-provided and nothing possibly changed
VectorStateDelta::NoChange
} else {
// make sure the document is always removed from user provided on removal
remove_from_user_provided.insert(docid);
VectorStateDelta::NowRemoved
}
}
};
Ok(delta)
}
fn regenerate_if_prompt_changed(
obkv: obkv::KvReader<'_, FieldId>,
(old_prompt, new_prompt): (&Prompt, &Prompt),
(old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap),
) -> Result<VectorStateDelta> {
let old_prompt =
old_prompt.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or(Default::default());
let new_prompt = new_prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
if new_prompt == old_prompt {
return Ok(VectorStateDelta::NoChange);
}
Ok(VectorStateDelta::NowGenerated(new_prompt))
}
fn regenerate_prompt(
obkv: obkv::KvReader<'_, FieldId>,
prompt: &Prompt,
new_fields_ids_map: &FieldsIdsMap,
) -> Result<VectorStateDelta> {
let prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
Ok(VectorStateDelta::NowGenerated(prompt))
}
/// We cannot compute the diff between both Del and Add vectors.
/// We'll push every vector and compute the difference later in TypedChunk.
fn push_vectors_diff(
@ -329,14 +491,9 @@ fn push_vectors_diff(
manual_vectors_writer: &mut Writer<BufWriter<File>>,
key_buffer: &mut Vec<u8>,
delta: VectorStateDelta,
reindex_vectors: bool,
) -> Result<()> {
let (must_remove, prompt, mut add_vectors) = delta.into_values();
if must_remove
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
// When vector pipeline will be optimized, this should be removed.
&& !reindex_vectors
{
if must_remove {
key_buffer.truncate(TRUNCATE_SIZE);
remove_vectors_writer.insert(&key_buffer, [])?;
}

View File

@ -248,7 +248,7 @@ fn send_original_documents_data(
prompts,
embedder_name,
embedder,
user_provided,
add_to_user_provided,
remove_from_user_provided,
} in extracted_vectors
{
@ -274,7 +274,7 @@ fn send_original_documents_data(
expected_dimension: embedder.dimensions(),
manual_vectors,
embedder_name,
user_provided,
add_to_user_provided,
remove_from_user_provided,
}));
}

View File

@ -503,7 +503,7 @@ where
embeddings,
manual_vectors,
embedder_name,
user_provided,
add_to_user_provided,
remove_from_user_provided,
} => {
dimension.insert(embedder_name.clone(), expected_dimension);
@ -513,7 +513,7 @@ where
expected_dimension,
manual_vectors,
embedder_name,
user_provided,
add_to_user_provided,
remove_from_user_provided,
}
}

View File

@ -91,7 +91,7 @@ pub(crate) enum TypedChunk {
expected_dimension: usize,
manual_vectors: grenad::Reader<BufReader<File>>,
embedder_name: String,
user_provided: RoaringBitmap,
add_to_user_provided: RoaringBitmap,
remove_from_user_provided: RoaringBitmap,
},
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
@ -625,7 +625,7 @@ pub(crate) fn write_typed_chunk_into_index(
let mut remove_vectors_builder = MergerBuilder::new(keep_first as MergeFn);
let mut manual_vectors_builder = MergerBuilder::new(keep_first as MergeFn);
let mut embeddings_builder = MergerBuilder::new(keep_first as MergeFn);
let mut user_provided = RoaringBitmap::new();
let mut add_to_user_provided = RoaringBitmap::new();
let mut remove_from_user_provided = RoaringBitmap::new();
let mut params = None;
for typed_chunk in typed_chunks {
@ -635,7 +635,7 @@ pub(crate) fn write_typed_chunk_into_index(
embeddings,
expected_dimension,
embedder_name,
user_provided: ud,
add_to_user_provided: aud,
remove_from_user_provided: rud,
} = typed_chunk
else {
@ -649,7 +649,7 @@ pub(crate) fn write_typed_chunk_into_index(
if let Some(embeddings) = embeddings {
embeddings_builder.push(embeddings.into_cursor()?);
}
user_provided |= ud;
add_to_user_provided |= aud;
remove_from_user_provided |= rud;
}
@ -662,7 +662,7 @@ pub(crate) fn write_typed_chunk_into_index(
.find(|IndexEmbeddingConfig { name, .. }| name == &embedder_name)
.unwrap();
index_embedder_config.user_provided -= remove_from_user_provided;
index_embedder_config.user_provided |= user_provided;
index_embedder_config.user_provided |= add_to_user_provided;
index.put_embedding_configs(wtxn, embedding_configs)?;