Implement in new document indexer

This commit is contained in:
Louis Dureuil 2025-06-30 00:00:22 +02:00
parent 22d363c05a
commit f8232976ed
No known key found for this signature in database
10 changed files with 886 additions and 391 deletions

View file

@ -138,6 +138,7 @@ pub enum ReceiverAction {
WakeUp,
LargeEntry(LargeEntry),
LargeVectors(LargeVectors),
LargeVector(LargeVector),
}
/// An entry that cannot fit in the BBQueue buffers has been
@ -174,6 +175,24 @@ impl LargeVectors {
}
}
#[derive(Debug)]
pub struct LargeVector {
/// The document id associated to the large embedding.
pub docid: DocumentId,
/// The embedder id in which to insert the large embedding.
pub embedder_id: u8,
/// The extractor id in which to insert the large embedding.
pub extractor_id: u8,
/// The large embedding that must be written.
pub embedding: Mmap,
}
impl LargeVector {
pub fn read_embedding(&self, dimensions: usize) -> &[f32] {
self.embedding.chunks_exact(dimensions).map(bytemuck::cast_slice).next().unwrap()
}
}
impl<'a> WriterBbqueueReceiver<'a> {
/// Tries to receive an action to do until the timeout occurs
/// and if it does, consider it as a spurious wake up.
@ -238,6 +257,7 @@ pub enum EntryHeader {
DbOperation(DbOperation),
ArroyDeleteVector(ArroyDeleteVector),
ArroySetVectors(ArroySetVectors),
ArroySetVector(ArroySetVector),
}
impl EntryHeader {
@ -250,6 +270,7 @@ impl EntryHeader {
EntryHeader::DbOperation(_) => 0,
EntryHeader::ArroyDeleteVector(_) => 1,
EntryHeader::ArroySetVectors(_) => 2,
EntryHeader::ArroySetVector(_) => 3,
}
}
@ -274,11 +295,17 @@ impl EntryHeader {
Self::variant_size() + mem::size_of::<ArroySetVectors>() + embedding_size * count
}
fn total_set_vector_size(dimensions: usize) -> usize {
let embedding_size = dimensions * mem::size_of::<f32>();
Self::variant_size() + mem::size_of::<ArroySetVector>() + embedding_size
}
fn header_size(&self) -> usize {
let payload_size = match self {
EntryHeader::DbOperation(op) => mem::size_of_val(op),
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs),
EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv),
};
Self::variant_size() + payload_size
}
@ -301,6 +328,11 @@ impl EntryHeader {
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::ArroySetVectors(header)
}
3 => {
let header_bytes = &remaining[..mem::size_of::<ArroySetVector>()];
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::ArroySetVector(header)
}
id => panic!("invalid variant id: {id}"),
}
}
@ -311,6 +343,7 @@ impl EntryHeader {
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs),
EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv),
};
*first = self.variant_id();
remaining.copy_from_slice(payload_bytes);
@ -379,6 +412,37 @@ impl ArroySetVectors {
}
}
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
#[repr(C)]
/// The embeddings are in the remaining space and represents
/// non-aligned [f32] each with dimensions f32s.
pub struct ArroySetVector {
pub docid: DocumentId,
pub embedder_id: u8,
pub extractor_id: u8,
_padding: [u8; 2],
}
impl ArroySetVector {
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
&frame[skip..]
}
/// Read the embedding and write it into an aligned `f32` Vec.
pub fn read_all_embeddings_into_vec<'v>(
&self,
frame: &FrameGrantR<'_>,
vec: &'v mut Vec<f32>,
) -> &'v [f32] {
let embeddings_bytes = Self::embeddings_bytes(frame);
let embeddings_count = embeddings_bytes.len() / mem::size_of::<f32>();
vec.resize(embeddings_count, 0.0);
bytemuck::cast_slice_mut(vec.as_mut()).copy_from_slice(embeddings_bytes);
&vec[..]
}
}
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
#[repr(u16)]
pub enum Database {
@ -398,6 +462,7 @@ pub enum Database {
FacetIdStringDocids,
FieldIdDocidFacetStrings,
FieldIdDocidFacetF64s,
VectorEmbedderCategoryId,
}
impl Database {
@ -419,6 +484,7 @@ impl Database {
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(),
}
}
@ -440,6 +506,7 @@ impl Database {
Database::FacetIdStringDocids => db_name::FACET_ID_STRING_DOCIDS,
Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS,
Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S,
Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID,
}
}
}
@ -568,6 +635,82 @@ impl<'b> ExtractorBbqueueSender<'b> {
Ok(())
}
fn set_vector_for_extractor(
&self,
docid: u32,
embedder_id: u8,
extractor_id: u8,
embedding: Option<Embedding>,
) -> crate::Result<()> {
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
// If there are no vectors we specify the dimensions
// to zero to allocate no extra space at all
let dimensions = embedding.as_ref().map_or(0, |emb| emb.len());
let arroy_set_vector =
ArroySetVector { docid, embedder_id, extractor_id, _padding: [0; 2] };
let payload_header = EntryHeader::ArroySetVector(arroy_set_vector);
let total_length = EntryHeader::total_set_vector_size(dimensions);
if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
let embedding = embedding.expect("set_vector without a vector does not fit in RAM");
let mut embedding_bytes = bytemuck::cast_slice(&embedding);
io::copy(&mut embedding_bytes, &mut value_file)?;
let value_file = value_file.into_inner().map_err(|ie| ie.into_error())?;
let embedding = unsafe { Mmap::map(&value_file)? };
let large_vectors = LargeVector { docid, embedder_id, extractor_id, embedding };
self.sender.send(ReceiverAction::LargeVector(large_vectors)).unwrap();
return Ok(());
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
if dimensions != 0 {
let output_iter =
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embedding.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
}
}
Ok(())
},
)?;
Ok(())
}
fn embedding_status(
&self,
name: &str,
infos: crate::vector::db::EmbedderInfo,
) -> crate::Result<()> {
let bytes = infos.to_bytes().map_err(|_| {
InternalError::Serialization(crate::SerializationError::Encoding {
db_name: Some(Database::VectorEmbedderCategoryId.database_name()),
})
})?;
self.write_key_value(Database::VectorEmbedderCategoryId, name.as_bytes(), &bytes)
}
fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> {
let key_length = key.len().try_into().ok().and_then(NonZeroU16::new).ok_or_else(|| {
InternalError::StorePut {
@ -942,9 +1085,18 @@ impl EmbeddingSender<'_, '_> {
&self,
docid: DocumentId,
embedder_id: u8,
embedding: Embedding,
extractor_id: u8,
embedding: Option<Embedding>,
) -> crate::Result<()> {
self.0.set_vectors(docid, embedder_id, &[embedding])
self.0.set_vector_for_extractor(docid, embedder_id, extractor_id, embedding)
}
pub(crate) fn embedding_status(
&self,
name: &str,
infos: crate::vector::db::EmbedderInfo,
) -> crate::Result<()> {
self.0.embedding_status(name, infos)
}
}

View file

@ -12,6 +12,7 @@ use super::vector_document::VectorDocument;
use super::{KvReaderFieldId, KvWriterFieldId};
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::documents::FieldIdMapper;
use crate::update::del_add::KvReaderDelAdd;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::new::vector_document::VectorDocumentFromDb;
use crate::vector::settings::EmbedderAction;
@ -469,6 +470,110 @@ impl<'doc> Versions<'doc> {
}
}
#[derive(Debug)]
pub struct KvDelAddDocument<'a, Mapper: FieldIdMapper> {
document: &'a obkv::KvReaderU16,
side: crate::update::del_add::DelAdd,
fields_ids_map: &'a Mapper,
}
impl<'a, Mapper: FieldIdMapper> KvDelAddDocument<'a, Mapper> {
pub fn new(
document: &'a obkv::KvReaderU16,
side: crate::update::del_add::DelAdd,
fields_ids_map: &'a Mapper,
) -> Self {
Self { document, side, fields_ids_map }
}
fn get(&self, k: &str) -> Result<Option<&'a RawValue>> {
let Some(id) = self.fields_ids_map.id(k) else { return Ok(None) };
let Some(value) = self.document.get(id) else { return Ok(None) };
let Some(value) = KvReaderDelAdd::from_slice(value).get(self.side) else { return Ok(None) };
let value = serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
Ok(Some(value))
}
}
impl<'a, Mapper: FieldIdMapper> Document<'a> for KvDelAddDocument<'a, Mapper> {
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'a str, &'a RawValue)>> {
let mut it = self.document.iter();
std::iter::from_fn(move || loop {
let (fid, value) = it.next()?;
let Some(value) = KvReaderDelAdd::from_slice(value).get(self.side) else {
continue;
};
let name = match self.fields_ids_map.name(fid).ok_or(
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
field_id: fid,
process: "getting current document",
}),
) {
Ok(name) => name,
Err(error) => return Some(Err(error.into())),
};
if name == RESERVED_VECTORS_FIELD_NAME || name == RESERVED_GEO_FIELD_NAME {
continue;
}
let res = (|| {
let value =
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
Ok((name, value))
})();
return Some(res);
})
}
fn top_level_fields_count(&self) -> usize {
let mut it = self.document.iter();
std::iter::from_fn(move || loop {
let (fid, value) = it.next()?;
let Some(_) = KvReaderDelAdd::from_slice(value).get(self.side) else {
continue;
};
let name = match self.fields_ids_map.name(fid).ok_or(
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
field_id: fid,
process: "getting current document",
}),
) {
Ok(name) => name,
Err(_) => return Some(()),
};
if name == RESERVED_VECTORS_FIELD_NAME || name == RESERVED_GEO_FIELD_NAME {
continue;
}
return Some(());
})
.count()
}
fn top_level_field(&self, k: &str) -> Result<Option<&'a RawValue>> {
if k == RESERVED_VECTORS_FIELD_NAME || k == RESERVED_GEO_FIELD_NAME {
return Ok(None);
}
self.get(k)
}
fn vectors_field(&self) -> Result<Option<&'a RawValue>> {
self.get(RESERVED_VECTORS_FIELD_NAME)
}
fn geo_field(&self) -> Result<Option<&'a RawValue>> {
self.get(RESERVED_GEO_FIELD_NAME)
}
}
pub struct DocumentIdentifiers<'doc> {
docid: DocumentId,
external_document_id: &'doc str,

View file

@ -11,7 +11,7 @@ use super::vector_document::{
use crate::attribute_patterns::PatternMatch;
use crate::documents::FieldIdMapper;
use crate::update::new::document::DocumentIdentifiers;
use crate::vector::EmbeddingConfigs;
use crate::vector::RuntimeEmbedders;
use crate::{DocumentId, Index, InternalError, Result};
pub enum DocumentChange<'doc> {
@ -70,7 +70,7 @@ impl<'doc> Insertion<'doc> {
pub fn inserted_vectors(
&self,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
) -> Result<Option<VectorDocumentFromVersions<'doc>>> {
VectorDocumentFromVersions::new(self.external_document_id, &self.new, doc_alloc, embedders)
}
@ -241,7 +241,7 @@ impl<'doc> Update<'doc> {
pub fn only_changed_vectors(
&self,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
) -> Result<Option<VectorDocumentFromVersions<'doc>>> {
VectorDocumentFromVersions::new(self.external_document_id, &self.new, doc_alloc, embedders)
}
@ -252,7 +252,7 @@ impl<'doc> Update<'doc> {
index: &'doc Index,
mapper: &'doc Mapper,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
) -> Result<Option<MergedVectorDocument<'doc>>> {
if self.from_scratch {
MergedVectorDocument::without_db(

View file

@ -7,8 +7,7 @@ use hashbrown::HashMap;
use super::DelAddRoaringBitmap;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
use crate::update::new::document::{write_to_obkv, Document};
use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
use crate::update::new::document::{write_to_obkv, Document, DocumentContext, DocumentIdentifiers};
use crate::update::new::indexer::document_changes::{Extractor, IndexingContext};
use crate::update::new::indexer::settings_changes::{
settings_change_extract, DocumentsIndentifiers, SettingsChangeExtractor,
@ -19,16 +18,16 @@ use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange;
use crate::update::settings::SettingsDelta;
use crate::vector::settings::EmbedderAction;
use crate::vector::EmbeddingConfigs;
use crate::vector::RuntimeEmbedders;
use crate::Result;
pub struct DocumentsExtractor<'a, 'b> {
document_sender: DocumentsSender<'a, 'b>,
embedders: &'a EmbeddingConfigs,
embedders: &'a RuntimeEmbedders,
}
impl<'a, 'b> DocumentsExtractor<'a, 'b> {
pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self {
pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a RuntimeEmbedders) -> Self {
Self { document_sender, embedders }
}
}

View file

@ -1,30 +1,35 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt::Debug;
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
use hashbrown::{DefaultHashBuilder, HashMap};
use super::cache::DelAddRoaringBitmap;
use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender;
use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
use crate::update::new::document::{Document, DocumentContext, DocumentIdentifiers};
use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::indexer::settings_changes::SettingsChangeExtractor;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange;
use crate::vector::db::{EmbedderInfo, EmbeddingStatus, EmbeddingStatusDelta};
use crate::vector::error::{
EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump,
};
use crate::vector::extractor::{
DocumentTemplateExtractor, Extractor as VectorExtractor, RequestFragmentExtractor,
};
use crate::vector::session::{EmbedSession, Input, Metadata, OnEmbed};
use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::{Embedder, Embedding, EmbeddingConfigs};
use crate::vector::{Embedding, RuntimeEmbedder, RuntimeEmbedders, RuntimeFragment};
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
pub struct EmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs,
embedders: &'a RuntimeEmbedders,
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
@ -33,7 +38,7 @@ pub struct EmbeddingExtractor<'a, 'b> {
impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
pub fn new(
embedders: &'a EmbeddingConfigs,
embedders: &'a RuntimeEmbedders,
sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution,
embedder_stats: &'a EmbedderStats,
@ -45,7 +50,7 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
}
pub struct EmbeddingExtractorData<'extractor>(
pub HashMap<String, DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>,
pub HashMap<String, EmbeddingStatusDelta, DefaultHashBuilder, &'extractor Bump>,
);
unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
@ -67,19 +72,18 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
UnusedVectorsDistributionBump::new_in(&context.doc_alloc);
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
let embedder_id =
context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else(
|| InternalError::DatabaseMissingEntry {
let embedder_db = context.index.embedding_configs();
for (embedder_name, runtime) in embedders {
let embedder_info = embedder_db
.embedder_info(&context.rtxn, embedder_name)?
.ok_or_else(|| InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
},
)?;
})?;
all_chunks.push(Chunks::new(
embedder,
embedder_id,
runtime,
embedder_info,
embedder_name,
prompt,
context.data,
&self.possible_embedding_mistakes,
self.embedder_stats,
@ -94,19 +98,14 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
match change {
DocumentChange::Deletion(deletion) => {
// vector deletion is handled by document sender,
// we still need to accomodate deletion from user_provided
// we still need to accomodate deletion from embedding_status
for chunks in &mut all_chunks {
// regenerate: true means we delete from user_provided
chunks.set_regenerate(deletion.docid(), true);
let (is_user_provided, must_regenerate) =
chunks.is_user_provided_must_regenerate(deletion.docid());
chunks.clear_status(deletion.docid(), is_user_provided, must_regenerate);
}
}
DocumentChange::Update(update) => {
let old_vectors = update.current_vectors(
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?;
let new_vectors =
update.only_changed_vectors(&context.doc_alloc, self.embedders)?;
@ -115,19 +114,16 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
}
for chunks in &mut all_chunks {
let embedder_name = chunks.embedder_name();
let prompt = chunks.prompt();
let (old_is_user_provided, old_must_regenerate) =
chunks.is_user_provided_must_regenerate(update.docid());
let old_vectors = old_vectors.vectors_for_key(embedder_name)?.unwrap();
let embedder_name = chunks.embedder_name();
// case where we have a `_vectors` field in the updated document
if let Some(new_vectors) = new_vectors.as_ref().and_then(|new_vectors| {
new_vectors.vectors_for_key(embedder_name).transpose()
}) {
let new_vectors = new_vectors?;
if old_vectors.regenerate != new_vectors.regenerate {
chunks.set_regenerate(update.docid(), new_vectors.regenerate);
}
// do we have set embeddings?
if let Some(embeddings) = new_vectors.embeddings {
chunks.set_vectors(
@ -139,97 +135,62 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
document_id: update.external_document_id().to_string(),
error: error.to_string(),
})?,
old_is_user_provided,
old_must_regenerate,
new_vectors.regenerate,
)?;
// regenerate if the new `_vectors` fields is set to.
} else if new_vectors.regenerate {
let new_rendered = prompt.render_document(
update.external_document_id(),
update.merged(
let new_document = update.merged(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
let must_regenerate = if !old_vectors.regenerate {
// we just enabled `regenerate`
true
} else {
let old_rendered = prompt.render_document(
update.external_document_id(),
update.current(
let old_document = update.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
);
if let Ok(old_rendered) = old_rendered {
// must regenerate if the rendered changed
new_rendered != old_rendered
} else {
// cannot check previous rendered, better regenerate
true
}
};
if must_regenerate {
chunks.set_autogenerated(
)?;
chunks.update_autogenerated(
update.docid(),
update.external_document_id(),
new_rendered,
old_document,
new_document,
context.new_fields_ids_map,
&unused_vectors_distribution,
old_is_user_provided,
old_must_regenerate,
true,
)?;
}
}
// no `_vectors` field, so only regenerate if the document is already set to in the DB.
} else if old_vectors.regenerate {
let new_rendered = prompt.render_document(
update.external_document_id(),
update.merged(
} else if old_must_regenerate {
let new_document = update.merged(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
let must_regenerate = {
let old_rendered = prompt.render_document(
update.external_document_id(),
update.current(
let old_document = update.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
);
if let Ok(old_rendered) = old_rendered {
// regenerate if the rendered version changed
new_rendered != old_rendered
} else {
// if we cannot render the previous version of the documents, let's regenerate
true
}
};
if must_regenerate {
chunks.set_autogenerated(
)?;
chunks.update_autogenerated(
update.docid(),
update.external_document_id(),
new_rendered,
old_document,
new_document,
context.new_fields_ids_map,
&unused_vectors_distribution,
old_is_user_provided,
old_must_regenerate,
true,
)?;
}
}
}
}
DocumentChange::Insertion(insertion) => {
let (default_is_user_provided, default_must_regenerate) = (false, true);
let new_vectors =
insertion.inserted_vectors(&context.doc_alloc, self.embedders)?;
if let Some(new_vectors) = &new_vectors {
@ -238,13 +199,11 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
for chunks in &mut all_chunks {
let embedder_name = chunks.embedder_name();
let prompt = chunks.prompt();
// if no inserted vectors, then regenerate: true + no embeddings => autogenerate
if let Some(new_vectors) = new_vectors.as_ref().and_then(|new_vectors| {
new_vectors.vectors_for_key(embedder_name).transpose()
}) {
let new_vectors = new_vectors?;
chunks.set_regenerate(insertion.docid(), new_vectors.regenerate);
if let Some(embeddings) = new_vectors.embeddings {
chunks.set_vectors(
insertion.external_document_id(),
@ -257,33 +216,36 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
.to_string(),
error: error.to_string(),
})?,
default_is_user_provided,
default_must_regenerate,
new_vectors.regenerate,
)?;
} else if new_vectors.regenerate {
let rendered = prompt.render_document(
chunks.insert_autogenerated(
insertion.docid(),
insertion.external_document_id(),
insertion.inserted(),
context.new_fields_ids_map,
&context.doc_alloc,
)?;
chunks.set_autogenerated(
insertion.docid(),
insertion.external_document_id(),
rendered,
&unused_vectors_distribution,
true,
)?;
} else {
chunks.set_status(
insertion.docid(),
default_is_user_provided,
default_must_regenerate,
false,
false,
);
}
} else {
let rendered = prompt.render_document(
chunks.insert_autogenerated(
insertion.docid(),
insertion.external_document_id(),
insertion.inserted(),
context.new_fields_ids_map,
&context.doc_alloc,
)?;
chunks.set_autogenerated(
insertion.docid(),
insertion.external_document_id(),
rendered,
&unused_vectors_distribution,
true,
)?;
}
}
@ -501,265 +463,489 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
}
}
// **Warning**: the destructor of this struct is not normally run, make sure that all its fields:
// 1. don't have side effects tied to they destructors
// 2. if allocated, are allocated inside of the bumpalo
//
// Currently this is the case as:
// 1. BVec are inside of the bumaplo
// 2. All other fields are either trivial (u8) or references.
struct Chunks<'a, 'b, 'extractor> {
texts: BVec<'a, &'a str>,
ids: BVec<'a, DocumentId>,
embedder: &'a Embedder,
pub struct OnEmbeddingDocumentUpdates<'doc, 'b> {
embedder_id: u8,
embedder_name: &'a str,
sender: EmbeddingSender<'doc, 'b>,
possible_embedding_mistakes: &'doc PossibleEmbeddingMistakes,
}
impl OnEmbeddingDocumentUpdates<'_, '_> {
fn clear_vectors(&self, docid: DocumentId) {
self.sender.set_vectors(docid, self.embedder_id, vec![]).unwrap();
}
}
impl<'doc> OnEmbed<'doc> for OnEmbeddingDocumentUpdates<'doc, '_> {
type ErrorMetadata = UnusedVectorsDistributionBump<'doc>;
fn process_embedding_response(
&mut self,
response: crate::vector::session::EmbeddingResponse<'doc>,
) {
self.sender
.set_vector(
response.metadata.docid,
self.embedder_id,
response.metadata.extractor_id,
response.embedding,
)
.unwrap();
}
fn process_embeddings(&mut self, metadata: Metadata<'doc>, embeddings: Vec<Embedding>) {
self.sender.set_vectors(metadata.docid, self.embedder_id, embeddings).unwrap();
}
fn process_embedding_error(
&mut self,
error: crate::vector::hf::EmbedError,
embedder_name: &'doc str,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
metadata: &[Metadata<'doc>],
) -> crate::Error {
if let FaultSource::Bug = error.fault {
crate::Error::InternalError(crate::InternalError::VectorEmbeddingError(error.into()))
} else {
let mut msg = if let EmbedErrorKind::ManualEmbed(_) = &error.kind {
format!(
r"While embedding documents for embedder `{embedder_name}`: no vectors provided for document `{}`{}
- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`.",
if let Some(first) = metadata.first() { first.external_docid } else { "???" },
if metadata.len() > 1 {
format!(" and at least {} other document(s)", metadata.len() - 1)
} else {
"".to_string()
}
)
} else {
format!(r"While embedding documents for embedder `{embedder_name}`: {error}")
};
let mut hint_count = 0;
for (vector_misspelling, count) in
self.possible_embedding_mistakes.vector_mistakes().take(2)
{
msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s).");
hint_count += 1;
}
for (embedder_misspelling, count) in self
.possible_embedding_mistakes
.embedder_mistakes_bump(embedder_name, unused_vectors_distribution)
.take(2)
{
msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s).");
hint_count += 1;
}
if hint_count == 0 {
if let EmbedErrorKind::ManualEmbed(_) = &error.kind {
msg += &format!(
"\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`"
);
}
}
crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg))
}
}
}
struct Chunks<'a, 'b, 'extractor> {
dimensions: usize,
prompt: &'a Prompt,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
has_manual_generation: Option<&'a str>,
status_delta: &'a RefCell<EmbeddingExtractorData<'extractor>>,
status: EmbeddingStatus,
kind: ChunkType<'a, 'b>,
}
enum ChunkType<'a, 'b> {
DocumentTemplate {
document_template: &'a Prompt,
session: EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, &'a str>,
},
Fragments {
fragments: &'a [RuntimeFragment],
session: EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, serde_json::Value>,
},
}
impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
#[allow(clippy::too_many_arguments)]
pub fn new(
embedder: &'a Embedder,
embedder_id: u8,
runtime: &'a RuntimeEmbedder,
embedder_info: EmbedderInfo,
embedder_name: &'a str,
prompt: &'a Prompt,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
status_delta: &'a RefCell<EmbeddingExtractorData<'extractor>>,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump,
) -> Self {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
let texts = BVec::with_capacity_in(capacity, doc_alloc);
let ids = BVec::with_capacity_in(capacity, doc_alloc);
let embedder = &runtime.embedder;
let dimensions = embedder.dimensions();
Self {
texts,
ids,
embedder,
prompt,
possible_embedding_mistakes,
embedder_stats,
threads,
sender,
embedder_id,
let fragments = runtime.fragments.as_slice();
let kind = if fragments.is_empty() {
ChunkType::DocumentTemplate {
document_template: &runtime.document_template,
session: EmbedSession::new(
&runtime.embedder,
embedder_name,
user_provided,
has_manual_generation: None,
dimensions,
threads,
doc_alloc,
embedder_stats,
OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id,
sender,
possible_embedding_mistakes,
},
),
}
} else {
ChunkType::Fragments {
fragments,
session: EmbedSession::new(
&runtime.embedder,
embedder_name,
threads,
doc_alloc,
embedder_stats,
OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id,
sender,
possible_embedding_mistakes,
},
),
}
};
Self { dimensions, status: embedder_info.embedding_status, status_delta, kind }
}
pub fn set_autogenerated(
&mut self,
docid: DocumentId,
external_docid: &'a str,
rendered: &'a str,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
) -> Result<()> {
let is_manual = matches!(&self.embedder, &Embedder::UserProvided(_));
if is_manual {
self.has_manual_generation.get_or_insert(external_docid);
}
if self.texts.len() < self.texts.capacity() {
self.texts.push(rendered);
self.ids.push(docid);
return Ok(());
}
Self::embed_chunks(
&mut self.texts,
&mut self.ids,
self.embedder,
self.embedder_id,
self.embedder_name,
self.possible_embedding_mistakes,
self.embedder_stats,
unused_vectors_distribution,
self.threads,
self.sender,
self.has_manual_generation.take(),
)
}
pub fn drain(
mut self,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
) -> Result<()> {
let res = Self::embed_chunks(
&mut self.texts,
&mut self.ids,
self.embedder,
self.embedder_id,
self.embedder_name,
self.possible_embedding_mistakes,
self.embedder_stats,
unused_vectors_distribution,
self.threads,
self.sender,
self.has_manual_generation,
);
// optimization: don't run bvec dtors as they only contain bumpalo allocated stuff
std::mem::forget(self);
res
pub fn is_user_provided_must_regenerate(&self, docid: DocumentId) -> (bool, bool) {
self.status.is_user_provided_must_regenerate(docid)
}
#[allow(clippy::too_many_arguments)]
pub fn embed_chunks(
texts: &mut BVec<'a, &'a str>,
ids: &mut BVec<'a, DocumentId>,
embedder: &Embedder,
embedder_id: u8,
embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: &EmbedderStats,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
threads: &ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
has_manual_generation: Option<&'a str>,
) -> Result<()> {
if let Some(external_docid) = has_manual_generation {
let mut msg = format!(
r"While embedding documents for embedder `{embedder_name}`: no vectors provided for document `{}`{}",
pub fn update_autogenerated<'doc, OD: Document<'doc> + Debug, ND: Document<'doc> + Debug>(
&mut self,
docid: DocumentId,
external_docid: &'a str,
old_document: OD,
new_document: ND,
new_fields_ids_map: &'a RefCell<crate::GlobalFieldsIdsMap>,
unused_vectors_distribution: &UnusedVectorsDistributionBump<'a>,
old_is_user_provided: bool,
old_must_regenerate: bool,
new_must_regenerate: bool,
) -> Result<()>
where
'a: 'doc,
{
let extracted = match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => {
let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new(
document_template,
doc_alloc,
new_fields_ids_map,
);
if old_is_user_provided {
session.on_embed_mut().clear_vectors(docid);
}
update_autogenerated(
docid,
external_docid,
if ids.len() > 1 {
format!(" and at least {} other document(s)", ids.len() - 1)
} else {
"".to_string()
[ex],
old_document,
new_document,
&external_docid,
old_must_regenerate,
session,
unused_vectors_distribution,
)?
}
ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc();
let extractors = fragments.iter().map(|fragment| {
RequestFragmentExtractor::new(fragment, doc_alloc).ignore_errors()
});
if old_is_user_provided {
session.on_embed_mut().clear_vectors(docid);
}
update_autogenerated(
docid,
external_docid,
extractors,
old_document,
new_document,
&(),
old_must_regenerate,
session,
unused_vectors_distribution,
)?
}
};
self.set_status(
docid,
old_is_user_provided,
old_must_regenerate,
old_is_user_provided && !extracted,
new_must_regenerate,
);
msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`.");
let mut hint_count = 0;
for (vector_misspelling, count) in possible_embedding_mistakes.vector_mistakes().take(2)
{
msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s).");
hint_count += 1;
Ok(())
}
for (embedder_misspelling, count) in possible_embedding_mistakes
.embedder_mistakes_bump(embedder_name, unused_vectors_distribution)
.take(2)
{
msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s).");
hint_count += 1;
}
if hint_count == 0 {
msg += &format!(
"\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`"
#[allow(clippy::too_many_arguments)]
pub fn insert_autogenerated<D: Document<'a> + Debug>(
&mut self,
docid: DocumentId,
external_docid: &'a str,
new_document: D,
new_fields_ids_map: &'a RefCell<crate::GlobalFieldsIdsMap>,
unused_vectors_distribution: &UnusedVectorsDistributionBump<'a>,
new_must_regenerate: bool,
) -> Result<()> {
let (default_is_user_provided, default_must_regenerate) = (false, true);
self.set_status(
docid,
default_is_user_provided,
default_must_regenerate,
false,
new_must_regenerate,
);
}
return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)));
}
match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => {
let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new(
document_template,
doc_alloc,
new_fields_ids_map,
);
let res = match embedder.embed_index_ref(texts.as_slice(), threads, embedder_stats) {
Ok(embeddings) => {
for (docid, embedding) in ids.into_iter().zip(embeddings) {
sender.set_vector(*docid, embedder_id, embedding).unwrap();
insert_autogenerated(
docid,
external_docid,
[ex],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
}
ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc();
let extractors = fragments.iter().map(|fragment| {
RequestFragmentExtractor::new(fragment, doc_alloc).ignore_errors()
});
insert_autogenerated(
docid,
external_docid,
extractors,
new_document,
&(),
session,
unused_vectors_distribution,
)?;
}
}
Ok(())
}
Err(error) => {
if let FaultSource::Bug = error.fault {
Err(crate::Error::InternalError(crate::InternalError::VectorEmbeddingError(
error.into(),
)))
} else {
let mut msg = format!(
r"While embedding documents for embedder `{embedder_name}`: {error}"
);
if let EmbedErrorKind::ManualEmbed(_) = &error.kind {
msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`.");
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.drain(unused_vectors_distribution)?;
}
let mut hint_count = 0;
for (vector_misspelling, count) in
possible_embedding_mistakes.vector_mistakes().take(2)
{
msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s).");
hint_count += 1;
}
for (embedder_misspelling, count) in possible_embedding_mistakes
.embedder_mistakes_bump(embedder_name, unused_vectors_distribution)
.take(2)
{
msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s).");
hint_count += 1;
}
if hint_count == 0 {
if let EmbedErrorKind::ManualEmbed(_) = &error.kind {
msg += &format!(
"\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`"
);
ChunkType::Fragments { fragments: _, session } => {
session.drain(unused_vectors_distribution)?;
}
}
Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)))
}
}
};
texts.clear();
ids.clear();
res
}
pub fn prompt(&self) -> &'a Prompt {
self.prompt
Ok(())
}
pub fn embedder_name(&self) -> &'a str {
self.embedder_name
match &self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.embedder_name()
}
fn set_regenerate(&self, docid: DocumentId, regenerate: bool) {
let mut user_provided = self.user_provided.borrow_mut();
let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default();
if regenerate {
// regenerate == !user_provided
user_provided.insert_del_u32(docid);
} else {
user_provided.insert_add_u32(docid);
ChunkType::Fragments { fragments: _, session } => session.embedder_name(),
}
}
fn set_vectors(
fn set_status(
&self,
docid: DocumentId,
old_is_user_provided: bool,
old_must_regenerate: bool,
new_is_user_provided: bool,
new_must_regenerate: bool,
) {
if EmbeddingStatusDelta::needs_change(
old_is_user_provided,
old_must_regenerate,
new_is_user_provided,
new_must_regenerate,
) {
let mut status_delta = self.status_delta.borrow_mut();
let status_delta = status_delta.0.entry_ref(self.embedder_name()).or_default();
status_delta.push_delta(
docid,
old_is_user_provided,
old_must_regenerate,
new_is_user_provided,
new_must_regenerate,
);
}
}
pub fn clear_status(&self, docid: DocumentId, is_user_provided: bool, must_regenerate: bool) {
// these value ensure both roaring are at 0.
if EmbeddingStatusDelta::needs_clear(is_user_provided, must_regenerate) {
let mut status_delta = self.status_delta.borrow_mut();
let status_delta = status_delta.0.entry_ref(self.embedder_name()).or_default();
status_delta.clear_docid(docid, is_user_provided, must_regenerate);
}
}
pub fn set_vectors(
&mut self,
external_docid: &'a str,
docid: DocumentId,
embeddings: Vec<Embedding>,
old_is_user_provided: bool,
old_must_regenerate: bool,
new_must_regenerate: bool,
) -> Result<()> {
self.set_status(
docid,
old_is_user_provided,
old_must_regenerate,
true,
new_must_regenerate,
);
for (embedding_index, embedding) in embeddings.iter().enumerate() {
if embedding.len() != self.dimensions {
return Err(UserError::InvalidIndexingVectorDimensions {
expected: self.dimensions,
found: embedding.len(),
embedder_name: self.embedder_name.to_string(),
embedder_name: self.embedder_name().to_string(),
document_id: external_docid.to_string(),
embedding_index,
}
.into());
}
}
self.sender.set_vectors(docid, self.embedder_id, embeddings).unwrap();
match &mut self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.on_embed_mut().process_embeddings(
Metadata { docid, external_docid, extractor_id: 0 },
embeddings,
);
}
ChunkType::Fragments { fragments: _, session } => {
session.on_embed_mut().process_embeddings(
Metadata { docid, external_docid, extractor_id: 0 },
embeddings,
);
}
}
Ok(())
}
}
#[allow(clippy::too_many_arguments)]
fn update_autogenerated<'doc, 'a: 'doc, 'b, E, OD, ND>(
docid: DocumentId,
external_docid: &'a str,
extractors: impl IntoIterator<Item = E>,
old_document: OD,
new_document: ND,
meta: &E::DocumentMetadata,
old_must_regenerate: bool,
session: &mut EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, E::Input>,
unused_vectors_distribution: &UnusedVectorsDistributionBump<'a>,
) -> Result<bool>
where
OD: Document<'doc> + Debug,
ND: Document<'doc> + Debug,
E: VectorExtractor<'a>,
E::Input: Input,
crate::Error: From<E::Error>,
{
let mut extracted = false;
for extractor in extractors {
let new_rendered = extractor.extract(&new_document, meta)?;
let must_regenerate = if !old_must_regenerate {
// we just enabled `regenerate`
true
} else {
let old_rendered = extractor.extract(&old_document, meta);
if let Ok(old_rendered) = old_rendered {
// must regenerate if the rendered changed
new_rendered != old_rendered
} else {
// cannot check previous rendered, better regenerate
true
}
};
if must_regenerate {
extracted = true;
let metadata =
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
if let Some(new_rendered) = new_rendered {
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
} else {
// remove any existing embedding
OnEmbed::process_embedding_response(
session.on_embed_mut(),
crate::vector::session::EmbeddingResponse { metadata, embedding: None },
);
}
}
}
Ok(extracted)
}
fn insert_autogenerated<'a, 'b, E, D: Document<'a> + Debug>(
docid: DocumentId,
external_docid: &'a str,
extractors: impl IntoIterator<Item = E>,
new_document: D,
meta: &E::DocumentMetadata,
session: &mut EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, E::Input>,
unused_vectors_distribution: &UnusedVectorsDistributionBump<'a>,
) -> Result<()>
where
E: VectorExtractor<'a>,
E::Input: Input,
crate::Error: From<E::Error>,
{
for extractor in extractors {
let new_rendered = extractor.extract(&new_document, meta)?;
if let Some(new_rendered) = new_rendered {
session.request_embedding(
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
new_rendered,
unused_vectors_distribution,
)?;
}
}
Ok(())
}

View file

@ -13,21 +13,17 @@ use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext};
use super::settings_changes::settings_change_extract;
use crate::documents::FieldIdMapper;
use crate::documents::PrimaryKey;
use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::progress::MergingWordCache;
use crate::documents::{FieldIdMapper, PrimaryKey};
use crate::progress::{EmbedderStats, MergingWordCache};
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::indexer::settings_changes::DocumentsIndentifiers;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
use crate::vector::EmbeddingConfigs;
use crate::Index;
use crate::InternalError;
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use crate::vector::db::IndexEmbeddingConfig;
use crate::vector::RuntimeEmbedders;
use crate::{Index, InternalError, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[allow(clippy::too_many_arguments)]
pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
@ -35,7 +31,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
indexing_context: IndexingContext<MSP>,
indexer_span: Span,
extractor_sender: ExtractorBbqueueSender,
embedders: &EmbeddingConfigs,
embedders: &RuntimeEmbedders,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
finished_extraction: &AtomicBool,
field_distribution: &mut BTreeMap<String, u64>,
@ -275,14 +271,19 @@ where
let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors");
let _entered = span.enter();
let embedder_configs = index.embedding_configs();
for config in &mut index_embeddings {
let mut infos = embedder_configs.embedder_info(&rtxn, &config.name)?.unwrap();
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else {
let Some(delta) = data.remove(&config.name) else {
continue 'data;
};
deladd.apply_to(&mut config.user_provided, modified_docids);
delta.apply_to(&mut infos.embedding_status);
}
extractor_sender.embeddings().embedding_status(&config.name, infos).unwrap();
}
}
}

View file

@ -24,7 +24,7 @@ use crate::progress::{EmbedderStats, Progress};
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters;
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs};
use crate::vector::{ArroyWrapper, Embedder, RuntimeEmbedders};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
pub(crate) mod de;
@ -54,7 +54,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>(
new_fields_ids_map: FieldsIdsMap,
new_primary_key: Option<PrimaryKey<'pl>>,
document_changes: &DC,
embedders: EmbeddingConfigs,
embedders: RuntimeEmbedders,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
embedder_stats: &'indexer EmbedderStats,
@ -93,7 +93,7 @@ where
grenad_parameters: &grenad_parameters,
};
let index_embeddings = index.embedding_configs(wtxn)?;
let index_embeddings = index.embedding_configs().embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?;
let mut modified_docids = roaring::RoaringBitmap::new();
@ -133,20 +133,21 @@ where
let arroy_writers: Result<HashMap<_, _>> = embedders
.inner_as_ref()
.iter()
.map(|(embedder_name, (embedder, _, was_quantized))| {
let embedder_index = index.embedder_category_id.get(wtxn, embedder_name)?.ok_or(
InternalError::DatabaseMissingEntry {
.map(|(embedder_name, runtime)| {
let embedder_index = index
.embedding_configs()
.embedder_id(wtxn, embedder_name)?
.ok_or(InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
},
)?;
})?;
let dimensions = embedder.dimensions();
let writer = ArroyWrapper::new(vector_arroy, embedder_index, *was_quantized);
let dimensions = runtime.embedder.dimensions();
let writer = ArroyWrapper::new(vector_arroy, embedder_index, runtime.is_quantized);
Ok((
embedder_index,
(embedder_name.as_str(), embedder.as_ref(), writer, dimensions),
(embedder_name.as_str(), &*runtime.embedder, writer, dimensions),
))
})
.collect();

View file

@ -11,11 +11,11 @@ use super::super::channel::*;
use crate::database_stats::DatabaseStats;
use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig;
use crate::progress::Progress;
use crate::update::settings::InnerIndexSettings;
use crate::vector::db::IndexEmbeddingConfig;
use crate::vector::settings::EmbedderAction;
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
use crate::vector::{ArroyWrapper, Embedder, Embeddings, RuntimeEmbedders};
use crate::{Error, Index, InternalError, Result, UserError};
pub fn write_to_db(
@ -64,6 +64,14 @@ pub fn write_to_db(
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
ReceiverAction::LargeVector(
large_vector @ LargeVector { docid, embedder_id, extractor_id, .. },
) => {
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
}
// Every time the is a message in the channel we search
@ -137,7 +145,7 @@ where
)?;
}
index.put_embedding_configs(wtxn, index_embeddings)?;
index.embedding_configs().put_embedding_configs(wtxn, index_embeddings)?;
Ok(())
}
@ -147,7 +155,7 @@ pub(super) fn update_index(
wtxn: &mut RwTxn<'_>,
new_fields_ids_map: FieldIdMapWithMetadata,
new_primary_key: Option<PrimaryKey<'_>>,
embedders: EmbeddingConfigs,
embedders: RuntimeEmbedders,
field_distribution: std::collections::BTreeMap<String, u64>,
document_ids: roaring::RoaringBitmap,
) -> Result<()> {
@ -226,16 +234,38 @@ pub fn write_from_bbqueue(
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
writer.del_items(wtxn, *dimensions, docid)?;
if !all_embeddings.is_empty() {
if embeddings.append(all_embeddings.to_vec()).is_err() {
return Err(Error::UserError(UserError::InvalidVectorDimensions {
expected: *dimensions,
found: all_embeddings.len(),
}));
}
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
}
EntryHeader::ArroySetVector(
asv @ ArroySetVector { docid, embedder_id, extractor_id, .. },
) => {
let frame = frame_with_header.frame();
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let embedding = asv.read_all_embeddings_into_vec(frame, aligned_embedding);
if embedding.is_empty() {
writer.del_item_in_store(wtxn, docid, extractor_id, *dimensions)?;
} else {
if embedding.len() != *dimensions {
return Err(Error::UserError(UserError::InvalidVectorDimensions {
expected: *dimensions,
found: embedding.len(),
}));
}
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
}
}
}
Ok(())

View file

@ -12,9 +12,9 @@ use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions};
use super::indexer::de::DeserrRawValue;
use crate::constants::RESERVED_VECTORS_FIELD_NAME;
use crate::documents::FieldIdMapper;
use crate::index::IndexEmbeddingConfig;
use crate::vector::db::{EmbeddingStatus, IndexEmbeddingConfig};
use crate::vector::parsed_vectors::{RawVectors, RawVectorsError, VectorOrArrayOfVectors};
use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfigs};
use crate::vector::{ArroyWrapper, Embedding, RuntimeEmbedders};
use crate::{DocumentId, Index, InternalError, Result, UserError};
#[derive(Serialize)]
@ -109,7 +109,7 @@ impl<'t> VectorDocumentFromDb<'t> {
None => None,
};
let embedding_config = index.embedding_configs(rtxn)?;
let embedding_config = index.embedding_configs().embedding_configs(rtxn)?;
Ok(Some(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc }))
}
@ -118,6 +118,7 @@ impl<'t> VectorDocumentFromDb<'t> {
&self,
embedder_id: u8,
config: &IndexEmbeddingConfig,
status: &EmbeddingStatus,
) -> Result<VectorEntry<'t>> {
let reader =
ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized());
@ -126,7 +127,7 @@ impl<'t> VectorDocumentFromDb<'t> {
Ok(VectorEntry {
has_configured_embedder: true,
embeddings: Some(Embeddings::FromDb(vectors)),
regenerate: !config.user_provided.contains(self.docid),
regenerate: status.must_regenerate(self.docid),
implicit: false,
})
}
@ -137,9 +138,9 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
self.embedding_config
.iter()
.map(|config| {
let embedder_id =
self.index.embedder_category_id.get(self.rtxn, &config.name)?.unwrap();
let entry = self.entry_from_db(embedder_id, config)?;
let info =
self.index.embedding_configs().embedder_info(self.rtxn, &config.name)?.unwrap();
let entry = self.entry_from_db(info.embedder_id, config, &info.embedding_status)?;
let config_name = self.doc_alloc.alloc_str(config.name.as_str());
Ok((&*config_name, entry))
})
@ -156,11 +157,11 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
}
fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'t>>> {
Ok(match self.index.embedder_category_id.get(self.rtxn, key)? {
Some(embedder_id) => {
Ok(match self.index.embedding_configs().embedder_info(self.rtxn, key)? {
Some(info) => {
let config =
self.embedding_config.iter().find(|config| config.name == key).unwrap();
Some(self.entry_from_db(embedder_id, config)?)
Some(self.entry_from_db(info.embedder_id, config, &info.embedding_status)?)
}
None => match self.vectors_field.as_ref().and_then(|obkv| obkv.get(key)) {
Some(embedding_from_doc) => {
@ -222,7 +223,7 @@ fn entry_from_raw_value(
pub struct VectorDocumentFromVersions<'doc> {
external_document_id: &'doc str,
vectors: RawMap<'doc, FxBuildHasher>,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
}
impl<'doc> VectorDocumentFromVersions<'doc> {
@ -230,7 +231,7 @@ impl<'doc> VectorDocumentFromVersions<'doc> {
external_document_id: &'doc str,
versions: &Versions<'doc>,
bump: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
) -> Result<Option<Self>> {
let document = DocumentFromVersions::new(versions);
if let Some(vectors_field) = document.vectors_field()? {
@ -283,7 +284,7 @@ impl<'doc> MergedVectorDocument<'doc> {
db_fields_ids_map: &'doc Mapper,
versions: &Versions<'doc>,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
) -> Result<Option<Self>> {
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
let new_doc =
@ -295,7 +296,7 @@ impl<'doc> MergedVectorDocument<'doc> {
external_document_id: &'doc str,
versions: &Versions<'doc>,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
embedders: &'doc RuntimeEmbedders,
) -> Result<Option<Self>> {
let Some(new_doc) =
VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?

View file

@ -3,6 +3,7 @@ use bumpalo::Bump;
use serde_json::Value;
use super::{EmbedError, Embedder, Embedding};
use crate::progress::EmbedderStats;
use crate::{DocumentId, Result, ThreadPoolNoAbort};
type ExtractorId = u8;
@ -43,6 +44,8 @@ pub struct EmbedSession<'doc, C, I> {
embedder_name: &'doc str,
embedder_stats: &'doc EmbedderStats,
on_embed: C,
}
@ -51,6 +54,7 @@ pub trait Input: Sized {
inputs: &[Self],
embedder: &Embedder,
threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError>;
}
@ -59,8 +63,9 @@ impl Input for &'_ str {
inputs: &[Self],
embedder: &Embedder,
threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError> {
embedder.embed_index_ref(inputs, threads)
embedder.embed_index_ref(inputs, threads, embedder_stats)
}
}
@ -69,8 +74,9 @@ impl Input for Value {
inputs: &[Value],
embedder: &Embedder,
threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError> {
embedder.embed_index_ref_fragments(inputs, threads)
embedder.embed_index_ref_fragments(inputs, threads, embedder_stats)
}
}
@ -81,12 +87,21 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
embedder_name: &'doc str,
threads: &'doc ThreadPoolNoAbort,
doc_alloc: &'doc Bump,
embedder_stats: &'doc EmbedderStats,
on_embed: C,
) -> Self {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
let texts = BVec::with_capacity_in(capacity, doc_alloc);
let ids = BVec::with_capacity_in(capacity, doc_alloc);
Self { inputs: texts, metadata: ids, embedder, threads, embedder_name, on_embed }
Self {
inputs: texts,
metadata: ids,
embedder,
threads,
embedder_name,
embedder_stats,
on_embed,
}
}
pub fn request_embedding(
@ -114,7 +129,12 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
if self.inputs.is_empty() {
return Ok(());
}
let res = match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads) {
let res = match I::embed_ref(
self.inputs.as_slice(),
self.embedder,
self.threads,
self.embedder_stats,
) {
Ok(embeddings) => {
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
self.on_embed.process_embedding_response(EmbeddingResponse {