mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Small commit to add hybrid search and autoembedding
This commit is contained in:
parent
21bcf32109
commit
13c2c6c16b
42 changed files with 4045 additions and 246 deletions
|
@ -1,9 +1,10 @@
|
|||
use std::cmp::Ordering;
|
||||
use std::convert::TryFrom;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader, BufWriter};
|
||||
use std::mem::size_of;
|
||||
use std::str::from_utf8;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytemuck::cast_slice;
|
||||
use grenad::Writer;
|
||||
|
@ -13,13 +14,56 @@ use serde_json::{from_slice, Value};
|
|||
|
||||
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
||||
use crate::error::UserError;
|
||||
use crate::prompt::Prompt;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::helpers::try_split_at;
|
||||
use crate::{DocumentId, FieldId, InternalError, Result, VectorOrArrayOfVectors};
|
||||
use crate::vector::Embedder;
|
||||
use crate::{DocumentId, FieldsIdsMap, InternalError, Result, VectorOrArrayOfVectors};
|
||||
|
||||
/// The length of the elements that are always in the buffer when inserting new values.
|
||||
const TRUNCATE_SIZE: usize = size_of::<DocumentId>();
|
||||
|
||||
pub struct ExtractedVectorPoints {
|
||||
// docid, _index -> KvWriterDelAdd -> Vector
|
||||
pub manual_vectors: grenad::Reader<BufReader<File>>,
|
||||
// docid -> ()
|
||||
pub remove_vectors: grenad::Reader<BufReader<File>>,
|
||||
// docid -> prompt
|
||||
pub prompts: grenad::Reader<BufReader<File>>,
|
||||
}
|
||||
|
||||
enum VectorStateDelta {
|
||||
NoChange,
|
||||
// Remove all vectors, generated or manual, from this document
|
||||
NowRemoved,
|
||||
|
||||
// Add the manually specified vectors, passed in the other grenad
|
||||
// Remove any previously generated vectors
|
||||
// Note: changing the value of the manually specified vector **should not record** this delta
|
||||
WasGeneratedNowManual(Vec<Vec<f32>>),
|
||||
|
||||
ManualDelta(Vec<Vec<f32>>, Vec<Vec<f32>>),
|
||||
|
||||
// Add the vector computed from the specified prompt
|
||||
// Remove any previous vector
|
||||
// Note: changing the value of the prompt **does require** recording this delta
|
||||
NowGenerated(String),
|
||||
}
|
||||
|
||||
impl VectorStateDelta {
|
||||
fn into_values(self) -> (bool, String, (Vec<Vec<f32>>, Vec<Vec<f32>>)) {
|
||||
match self {
|
||||
VectorStateDelta::NoChange => Default::default(),
|
||||
VectorStateDelta::NowRemoved => (true, Default::default(), Default::default()),
|
||||
VectorStateDelta::WasGeneratedNowManual(add) => {
|
||||
(true, Default::default(), (Default::default(), add))
|
||||
}
|
||||
VectorStateDelta::ManualDelta(del, add) => (false, Default::default(), (del, add)),
|
||||
VectorStateDelta::NowGenerated(prompt) => (true, prompt, Default::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the embedding vector contained in each document under the `_vectors` field.
|
||||
///
|
||||
/// Returns the generated grenad reader containing the docid as key associated to the Vec<f32>
|
||||
|
@ -27,16 +71,34 @@ const TRUNCATE_SIZE: usize = size_of::<DocumentId>();
|
|||
pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||
obkv_documents: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
vectors_fid: FieldId,
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
field_id_map: FieldsIdsMap,
|
||||
prompt: Option<&Prompt>,
|
||||
) -> Result<ExtractedVectorPoints> {
|
||||
puffin::profile_function!();
|
||||
|
||||
let mut writer = create_writer(
|
||||
// (docid, _index) -> KvWriterDelAdd -> Vector
|
||||
let mut manual_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> (prompt)
|
||||
let mut prompts_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> ()
|
||||
let mut remove_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
let vectors_fid = field_id_map.id("_vectors");
|
||||
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut cursor = obkv_documents.into_cursor()?;
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
|
@ -53,43 +115,148 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
// lazily get it when needed
|
||||
let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() };
|
||||
|
||||
// first we retrieve the _vectors field
|
||||
if let Some(value) = obkv.get(vectors_fid) {
|
||||
let delta = if let Some(value) = vectors_fid.and_then(|vectors_fid| obkv.get(vectors_fid)) {
|
||||
let vectors_obkv = KvReaderDelAdd::new(value);
|
||||
match (vectors_obkv.get(DelAdd::Deletion), vectors_obkv.get(DelAdd::Addition)) {
|
||||
(Some(old), Some(new)) => {
|
||||
// no autogeneration
|
||||
let del_vectors = extract_vectors(old, document_id)?;
|
||||
let add_vectors = extract_vectors(new, document_id)?;
|
||||
|
||||
// then we extract the values
|
||||
let del_vectors = vectors_obkv
|
||||
.get(DelAdd::Deletion)
|
||||
.map(|vectors| extract_vectors(vectors, document_id))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
let add_vectors = vectors_obkv
|
||||
.get(DelAdd::Addition)
|
||||
.map(|vectors| extract_vectors(vectors, document_id))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
VectorStateDelta::ManualDelta(
|
||||
del_vectors.unwrap_or_default(),
|
||||
add_vectors.unwrap_or_default(),
|
||||
)
|
||||
}
|
||||
(None, Some(new)) => {
|
||||
// was possibly autogenerated, remove all vectors for that document
|
||||
let add_vectors = extract_vectors(new, document_id)?;
|
||||
|
||||
// and we finally push the unique vectors into the writer
|
||||
push_vectors_diff(
|
||||
&mut writer,
|
||||
&mut key_buffer,
|
||||
del_vectors.unwrap_or_default(),
|
||||
add_vectors.unwrap_or_default(),
|
||||
)?;
|
||||
}
|
||||
VectorStateDelta::WasGeneratedNowManual(add_vectors.unwrap_or_default())
|
||||
}
|
||||
(Some(_old), None) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
// becomes autogenerated
|
||||
match prompt {
|
||||
Some(prompt) => VectorStateDelta::NowGenerated(prompt.render(
|
||||
obkv,
|
||||
DelAdd::Addition,
|
||||
&field_id_map,
|
||||
)?),
|
||||
None => VectorStateDelta::NowRemoved,
|
||||
}
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
}
|
||||
(None, None) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
|
||||
if document_is_kept {
|
||||
match prompt {
|
||||
Some(prompt) => {
|
||||
// Don't give up if the old prompt was failing
|
||||
let old_prompt = prompt
|
||||
.render(obkv, DelAdd::Deletion, &field_id_map)
|
||||
.unwrap_or_default();
|
||||
let new_prompt =
|
||||
prompt.render(obkv, DelAdd::Addition, &field_id_map)?;
|
||||
if old_prompt != new_prompt {
|
||||
log::trace!(
|
||||
"Changing prompt from\n{old_prompt}\n===\nto\n{new_prompt}"
|
||||
);
|
||||
VectorStateDelta::NowGenerated(new_prompt)
|
||||
} else {
|
||||
VectorStateDelta::NoChange
|
||||
}
|
||||
}
|
||||
// We no longer have a prompt, so we need to remove any existing vector
|
||||
None => VectorStateDelta::NowRemoved,
|
||||
}
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
|
||||
if document_is_kept {
|
||||
match prompt {
|
||||
Some(prompt) => {
|
||||
// Don't give up if the old prompt was failing
|
||||
let old_prompt = prompt
|
||||
.render(obkv, DelAdd::Deletion, &field_id_map)
|
||||
.unwrap_or_default();
|
||||
let new_prompt = prompt.render(obkv, DelAdd::Addition, &field_id_map)?;
|
||||
if old_prompt != new_prompt {
|
||||
log::trace!(
|
||||
"Changing prompt from\n{old_prompt}\n===\nto\n{new_prompt}"
|
||||
);
|
||||
VectorStateDelta::NowGenerated(new_prompt)
|
||||
} else {
|
||||
VectorStateDelta::NoChange
|
||||
}
|
||||
}
|
||||
None => VectorStateDelta::NowRemoved,
|
||||
}
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
};
|
||||
|
||||
// and we finally push the unique vectors into the writer
|
||||
push_vectors_diff(
|
||||
&mut remove_vectors_writer,
|
||||
&mut prompts_writer,
|
||||
&mut manual_vectors_writer,
|
||||
&mut key_buffer,
|
||||
delta,
|
||||
)?;
|
||||
}
|
||||
|
||||
writer_into_reader(writer)
|
||||
Ok(ExtractedVectorPoints {
|
||||
// docid, _index -> KvWriterDelAdd -> Vector
|
||||
manual_vectors: writer_into_reader(manual_vectors_writer)?,
|
||||
// docid -> ()
|
||||
remove_vectors: writer_into_reader(remove_vectors_writer)?,
|
||||
// docid -> prompt
|
||||
prompts: writer_into_reader(prompts_writer)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Computes the diff between both Del and Add numbers and
|
||||
/// only inserts the parts that differ in the sorter.
|
||||
fn push_vectors_diff(
|
||||
writer: &mut Writer<BufWriter<File>>,
|
||||
remove_vectors_writer: &mut Writer<BufWriter<File>>,
|
||||
prompts_writer: &mut Writer<BufWriter<File>>,
|
||||
manual_vectors_writer: &mut Writer<BufWriter<File>>,
|
||||
key_buffer: &mut Vec<u8>,
|
||||
mut del_vectors: Vec<Vec<f32>>,
|
||||
mut add_vectors: Vec<Vec<f32>>,
|
||||
delta: VectorStateDelta,
|
||||
) -> Result<()> {
|
||||
let (must_remove, prompt, (mut del_vectors, mut add_vectors)) = delta.into_values();
|
||||
if must_remove {
|
||||
key_buffer.truncate(TRUNCATE_SIZE);
|
||||
remove_vectors_writer.insert(&key_buffer, [])?;
|
||||
}
|
||||
if !prompt.is_empty() {
|
||||
key_buffer.truncate(TRUNCATE_SIZE);
|
||||
prompts_writer.insert(&key_buffer, prompt.as_bytes())?;
|
||||
}
|
||||
|
||||
// We sort and dedup the vectors
|
||||
del_vectors.sort_unstable_by(|a, b| compare_vectors(a, b));
|
||||
add_vectors.sort_unstable_by(|a, b| compare_vectors(a, b));
|
||||
|
@ -114,7 +281,7 @@ fn push_vectors_diff(
|
|||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
writer.insert(&key_buffer, bytes)?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
EitherOrBoth::Right(vector) => {
|
||||
// We insert only the Add part of the Obkv to inform
|
||||
|
@ -122,7 +289,7 @@ fn push_vectors_diff(
|
|||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
writer.insert(&key_buffer, bytes)?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -146,3 +313,102 @@ fn extract_vectors(value: &[u8], document_id: impl Fn() -> Value) -> Result<Opti
|
|||
.into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[logging_timer::time]
|
||||
pub fn extract_embeddings<R: io::Read + io::Seek>(
|
||||
// docid, prompt
|
||||
prompt_reader: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
embedder: Arc<Embedder>,
|
||||
) -> Result<(grenad::Reader<BufReader<File>>, Option<usize>)> {
|
||||
let rt = tokio::runtime::Builder::new_current_thread().enable_io().enable_time().build()?;
|
||||
|
||||
let n_chunks = embedder.chunk_count_hint(); // chunk level parellelism
|
||||
let n_vectors_per_chunk = embedder.prompt_count_in_chunk_hint(); // number of vectors in a single chunk
|
||||
|
||||
// docid, state with embedding
|
||||
let mut state_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
let mut chunks = Vec::with_capacity(n_chunks);
|
||||
let mut current_chunk = Vec::with_capacity(n_vectors_per_chunk);
|
||||
let mut current_chunk_ids = Vec::with_capacity(n_vectors_per_chunk);
|
||||
let mut chunks_ids = Vec::with_capacity(n_chunks);
|
||||
let mut cursor = prompt_reader.into_cursor()?;
|
||||
|
||||
let mut expected_dimension = None;
|
||||
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
||||
// SAFETY: precondition, the grenad value was saved from a string
|
||||
let prompt = unsafe { std::str::from_utf8_unchecked(value) };
|
||||
if current_chunk.len() == current_chunk.capacity() {
|
||||
chunks.push(std::mem::replace(
|
||||
&mut current_chunk,
|
||||
Vec::with_capacity(n_vectors_per_chunk),
|
||||
));
|
||||
chunks_ids.push(std::mem::replace(
|
||||
&mut current_chunk_ids,
|
||||
Vec::with_capacity(n_vectors_per_chunk),
|
||||
));
|
||||
};
|
||||
current_chunk.push(prompt.to_owned());
|
||||
current_chunk_ids.push(docid);
|
||||
|
||||
if chunks.len() == chunks.capacity() {
|
||||
let chunked_embeds = rt
|
||||
.block_on(
|
||||
embedder
|
||||
.embed_chunks(std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks))),
|
||||
)
|
||||
.map_err(crate::vector::Error::from)
|
||||
.map_err(crate::UserError::from)
|
||||
.map_err(crate::Error::from)?;
|
||||
|
||||
for (docid, embeddings) in chunks_ids
|
||||
.iter()
|
||||
.flat_map(|docids| docids.iter())
|
||||
.zip(chunked_embeds.iter().flat_map(|embeds| embeds.iter()))
|
||||
{
|
||||
state_writer.insert(docid.to_be_bytes(), cast_slice(embeddings.as_inner()))?;
|
||||
expected_dimension = Some(embeddings.dimension());
|
||||
}
|
||||
chunks_ids.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// send last chunk
|
||||
if !chunks.is_empty() {
|
||||
let chunked_embeds = rt
|
||||
.block_on(embedder.embed_chunks(std::mem::take(&mut chunks)))
|
||||
.map_err(crate::vector::Error::from)
|
||||
.map_err(crate::UserError::from)
|
||||
.map_err(crate::Error::from)?;
|
||||
for (docid, embeddings) in chunks_ids
|
||||
.iter()
|
||||
.flat_map(|docids| docids.iter())
|
||||
.zip(chunked_embeds.iter().flat_map(|embeds| embeds.iter()))
|
||||
{
|
||||
state_writer.insert(docid.to_be_bytes(), cast_slice(embeddings.as_inner()))?;
|
||||
expected_dimension = Some(embeddings.dimension());
|
||||
}
|
||||
}
|
||||
|
||||
if !current_chunk.is_empty() {
|
||||
let embeds = rt
|
||||
.block_on(embedder.embed(std::mem::take(&mut current_chunk)))
|
||||
.map_err(crate::vector::Error::from)
|
||||
.map_err(crate::UserError::from)
|
||||
.map_err(crate::Error::from)?;
|
||||
|
||||
for (docid, embeddings) in current_chunk_ids.iter().zip(embeds.iter()) {
|
||||
state_writer.insert(docid.to_be_bytes(), cast_slice(embeddings.as_inner()))?;
|
||||
expected_dimension = Some(embeddings.dimension());
|
||||
}
|
||||
}
|
||||
|
||||
Ok((writer_into_reader(state_writer)?, expected_dimension))
|
||||
}
|
||||
|
|
|
@ -9,9 +9,10 @@ mod extract_word_docids;
|
|||
mod extract_word_pair_proximity_docids;
|
||||
mod extract_word_position_docids;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crossbeam_channel::Sender;
|
||||
use log::debug;
|
||||
|
@ -23,7 +24,9 @@ use self::extract_facet_string_docids::extract_facet_string_docids;
|
|||
use self::extract_fid_docid_facet_values::{extract_fid_docid_facet_values, ExtractedFacetValues};
|
||||
use self::extract_fid_word_count_docids::extract_fid_word_count_docids;
|
||||
use self::extract_geo_points::extract_geo_points;
|
||||
use self::extract_vector_points::extract_vector_points;
|
||||
use self::extract_vector_points::{
|
||||
extract_embeddings, extract_vector_points, ExtractedVectorPoints,
|
||||
};
|
||||
use self::extract_word_docids::extract_word_docids;
|
||||
use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids;
|
||||
use self::extract_word_position_docids::extract_word_position_docids;
|
||||
|
@ -32,8 +35,10 @@ use super::helpers::{
|
|||
MergeFn, MergeableReader,
|
||||
};
|
||||
use super::{helpers, TypedChunk};
|
||||
use crate::prompt::Prompt;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::{FieldId, Result};
|
||||
use crate::vector::Embedder;
|
||||
use crate::{FieldId, FieldsIdsMap, Result};
|
||||
|
||||
/// Extract data for each databases from obkv documents in parallel.
|
||||
/// Send data in grenad file over provided Sender.
|
||||
|
@ -47,13 +52,14 @@ pub(crate) fn data_from_obkv_documents(
|
|||
faceted_fields: HashSet<FieldId>,
|
||||
primary_key_id: FieldId,
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
vectors_field_id: Option<FieldId>,
|
||||
field_id_map: FieldsIdsMap,
|
||||
stop_words: Option<fst::Set<&[u8]>>,
|
||||
allowed_separators: Option<&[&str]>,
|
||||
dictionary: Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
exact_attributes: HashSet<FieldId>,
|
||||
proximity_precision: ProximityPrecision,
|
||||
embedders: HashMap<String, (Arc<Embedder>, Arc<Prompt>)>,
|
||||
) -> Result<()> {
|
||||
puffin::profile_function!();
|
||||
|
||||
|
@ -64,7 +70,8 @@ pub(crate) fn data_from_obkv_documents(
|
|||
original_documents_chunk,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
vectors_field_id,
|
||||
field_id_map.clone(),
|
||||
embedders.clone(),
|
||||
)
|
||||
})
|
||||
.collect::<Result<()>>()?;
|
||||
|
@ -276,24 +283,42 @@ fn send_original_documents_data(
|
|||
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
|
||||
indexer: GrenadParameters,
|
||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||
vectors_field_id: Option<FieldId>,
|
||||
field_id_map: FieldsIdsMap,
|
||||
embedders: HashMap<String, (Arc<Embedder>, Arc<Prompt>)>,
|
||||
) -> Result<()> {
|
||||
let original_documents_chunk =
|
||||
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
||||
|
||||
if let Some(vectors_field_id) = vectors_field_id {
|
||||
let documents_chunk_cloned = original_documents_chunk.clone();
|
||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
let result = extract_vector_points(documents_chunk_cloned, indexer, vectors_field_id);
|
||||
let _ = match result {
|
||||
Ok(vector_points) => {
|
||||
lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points)))
|
||||
}
|
||||
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
|
||||
};
|
||||
});
|
||||
}
|
||||
let documents_chunk_cloned = original_documents_chunk.clone();
|
||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
let (embedder, prompt) = embedders.get("default").cloned().unzip();
|
||||
let result =
|
||||
extract_vector_points(documents_chunk_cloned, indexer, field_id_map, prompt.as_deref());
|
||||
let _ = match result {
|
||||
Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => {
|
||||
/// FIXME: support multiple embedders
|
||||
let results = embedder.and_then(|embedder| {
|
||||
match extract_embeddings(prompts, indexer, embedder.clone()) {
|
||||
Ok(results) => Some(results),
|
||||
Err(error) => {
|
||||
let _ = lmdb_writer_sx_cloned.send(Err(error));
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
let (embeddings, expected_dimension) = results.unzip();
|
||||
let expected_dimension = expected_dimension.flatten();
|
||||
lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints {
|
||||
remove_vectors,
|
||||
embeddings,
|
||||
expected_dimension,
|
||||
manual_vectors,
|
||||
}))
|
||||
}
|
||||
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
|
||||
};
|
||||
});
|
||||
|
||||
// TODO: create a custom internal error
|
||||
lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))).unwrap();
|
||||
|
|
|
@ -4,11 +4,12 @@ mod helpers;
|
|||
mod transform;
|
||||
mod typed_chunk;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{Cursor, Read, Seek};
|
||||
use std::iter::FromIterator;
|
||||
use std::num::NonZeroU32;
|
||||
use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use heed::types::Str;
|
||||
|
@ -32,10 +33,12 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
|
|||
pub use self::transform::{Transform, TransformOutput};
|
||||
use crate::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use crate::error::{Error, InternalError, UserError};
|
||||
use crate::prompt::Prompt;
|
||||
pub use crate::update::index_documents::helpers::CursorClonableMmap;
|
||||
use crate::update::{
|
||||
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
||||
};
|
||||
use crate::vector::Embedder;
|
||||
use crate::{CboRoaringBitmapCodec, Index, Result};
|
||||
|
||||
static MERGED_DATABASE_COUNT: usize = 7;
|
||||
|
@ -78,6 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
|
|||
should_abort: FA,
|
||||
added_documents: u64,
|
||||
deleted_documents: u64,
|
||||
embedders: HashMap<String, (Arc<Embedder>, Arc<Prompt>)>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
|
@ -121,6 +125,7 @@ where
|
|||
index,
|
||||
added_documents: 0,
|
||||
deleted_documents: 0,
|
||||
embedders: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -167,6 +172,14 @@ where
|
|||
Ok((self, Ok(indexed_documents)))
|
||||
}
|
||||
|
||||
pub fn with_embedders(
|
||||
mut self,
|
||||
embedders: HashMap<String, (Arc<Embedder>, Arc<Prompt>)>,
|
||||
) -> Self {
|
||||
self.embedders = embedders;
|
||||
self
|
||||
}
|
||||
|
||||
/// Remove a batch of documents from the current builder.
|
||||
///
|
||||
/// Returns the number of documents deleted from the builder.
|
||||
|
@ -322,17 +335,18 @@ where
|
|||
// get filterable fields for facet databases
|
||||
let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?;
|
||||
// get the fid of the `_geo.lat` and `_geo.lng` fields.
|
||||
let geo_fields_ids = match self.index.fields_ids_map(self.wtxn)?.id("_geo") {
|
||||
let mut field_id_map = self.index.fields_ids_map(self.wtxn)?;
|
||||
|
||||
// self.index.fields_ids_map($a)? ==>> field_id_map
|
||||
let geo_fields_ids = match field_id_map.id("_geo") {
|
||||
Some(gfid) => {
|
||||
let is_sortable = self.index.sortable_fields_ids(self.wtxn)?.contains(&gfid);
|
||||
let is_filterable = self.index.filterable_fields_ids(self.wtxn)?.contains(&gfid);
|
||||
// if `_geo` is faceted then we get the `lat` and `lng`
|
||||
if is_sortable || is_filterable {
|
||||
let field_ids = self
|
||||
.index
|
||||
.fields_ids_map(self.wtxn)?
|
||||
let field_ids = field_id_map
|
||||
.insert("_geo.lat")
|
||||
.zip(self.index.fields_ids_map(self.wtxn)?.insert("_geo.lng"))
|
||||
.zip(field_id_map.insert("_geo.lng"))
|
||||
.ok_or(UserError::AttributeLimitReached)?;
|
||||
Some(field_ids)
|
||||
} else {
|
||||
|
@ -341,8 +355,6 @@ where
|
|||
}
|
||||
None => None,
|
||||
};
|
||||
// get the fid of the `_vectors` field.
|
||||
let vectors_field_id = self.index.fields_ids_map(self.wtxn)?.id("_vectors");
|
||||
|
||||
let stop_words = self.index.stop_words(self.wtxn)?;
|
||||
let separators = self.index.allowed_separators(self.wtxn)?;
|
||||
|
@ -364,6 +376,8 @@ where
|
|||
self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4); // 4MiB
|
||||
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
|
||||
|
||||
let cloned_embedder = self.embedders.clone();
|
||||
|
||||
// Run extraction pipeline in parallel.
|
||||
pool.install(|| {
|
||||
puffin::profile_scope!("extract_and_send_grenad_chunks");
|
||||
|
@ -387,13 +401,14 @@ where
|
|||
faceted_fields,
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
vectors_field_id,
|
||||
field_id_map,
|
||||
stop_words,
|
||||
separators.as_deref(),
|
||||
dictionary.as_deref(),
|
||||
max_positions_per_attributes,
|
||||
exact_attributes,
|
||||
proximity_precision,
|
||||
cloned_embedder,
|
||||
)
|
||||
});
|
||||
|
||||
|
@ -2505,7 +2520,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let res = index.search(&rtxn).vector([0.0, 1.0, 2.0]).execute().unwrap();
|
||||
let res = index.search(&rtxn).vector([0.0, 1.0, 2.0].to_vec()).execute().unwrap();
|
||||
assert_eq!(res.documents_ids.len(), 3);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,12 @@ pub(crate) enum TypedChunk {
|
|||
FieldIdFacetIsNullDocids(grenad::Reader<BufReader<File>>),
|
||||
FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>),
|
||||
GeoPoints(grenad::Reader<BufReader<File>>),
|
||||
VectorPoints(grenad::Reader<BufReader<File>>),
|
||||
VectorPoints {
|
||||
remove_vectors: grenad::Reader<BufReader<File>>,
|
||||
embeddings: Option<grenad::Reader<BufReader<File>>>,
|
||||
expected_dimension: Option<usize>,
|
||||
manual_vectors: grenad::Reader<BufReader<File>>,
|
||||
},
|
||||
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
|
||||
}
|
||||
|
||||
|
@ -100,8 +105,8 @@ impl TypedChunk {
|
|||
TypedChunk::GeoPoints(grenad) => {
|
||||
format!("GeoPoints {{ number_of_entries: {} }}", grenad.len())
|
||||
}
|
||||
TypedChunk::VectorPoints(grenad) => {
|
||||
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
|
||||
TypedChunk::VectorPoints{ remove_vectors, manual_vectors, embeddings, expected_dimension } => {
|
||||
format!("VectorPoints {{ remove_vectors: {}, manual_vectors: {}, embeddings: {}, dimension: {} }}", remove_vectors.len(), manual_vectors.len(), embeddings.as_ref().map(|e| e.len()).unwrap_or_default(), expected_dimension.unwrap_or_default())
|
||||
}
|
||||
TypedChunk::ScriptLanguageDocids(sl_map) => {
|
||||
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", sl_map.len())
|
||||
|
@ -355,19 +360,64 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
index.put_geo_rtree(wtxn, &rtree)?;
|
||||
index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?;
|
||||
}
|
||||
TypedChunk::VectorPoints(vector_points) => {
|
||||
let mut vectors_set = HashSet::new();
|
||||
TypedChunk::VectorPoints {
|
||||
remove_vectors,
|
||||
manual_vectors,
|
||||
embeddings,
|
||||
expected_dimension,
|
||||
} => {
|
||||
if remove_vectors.is_empty()
|
||||
&& manual_vectors.is_empty()
|
||||
&& embeddings.as_ref().map_or(true, |e| e.is_empty())
|
||||
{
|
||||
return Ok((RoaringBitmap::new(), is_merged_database));
|
||||
}
|
||||
|
||||
let mut docid_vectors_map: HashMap<DocumentId, HashSet<Vec<OrderedFloat<f32>>>> =
|
||||
HashMap::new();
|
||||
|
||||
// We extract and store the previous vectors
|
||||
if let Some(hnsw) = index.vector_hnsw(wtxn)? {
|
||||
for (pid, point) in hnsw.iter() {
|
||||
let pid_key = pid.into_inner();
|
||||
let docid = index.vector_id_docid.get(wtxn, &pid_key)?.unwrap();
|
||||
let vector: Vec<_> = point.iter().copied().map(OrderedFloat).collect();
|
||||
vectors_set.insert((docid, vector));
|
||||
docid_vectors_map.entry(docid).or_default().insert(vector);
|
||||
}
|
||||
}
|
||||
|
||||
let mut cursor = vector_points.into_cursor()?;
|
||||
// remove vectors for docids we want them removed
|
||||
let mut cursor = remove_vectors.into_cursor()?;
|
||||
while let Some((key, _)) = cursor.move_on_next()? {
|
||||
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
||||
|
||||
docid_vectors_map.remove(&docid);
|
||||
}
|
||||
|
||||
// add generated embeddings
|
||||
if let Some((embeddings, expected_dimension)) = embeddings.zip(expected_dimension) {
|
||||
let mut cursor = embeddings.into_cursor()?;
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
||||
let data: Vec<OrderedFloat<_>> =
|
||||
pod_collect_to_vec(value).into_iter().map(OrderedFloat).collect();
|
||||
// it is a code error to have embeddings and not expected_dimension
|
||||
let embeddings =
|
||||
crate::vector::Embeddings::from_inner(data, expected_dimension)
|
||||
// code error if we somehow got the wrong dimension
|
||||
.unwrap();
|
||||
|
||||
let mut set = HashSet::new();
|
||||
for embedding in embeddings.iter() {
|
||||
set.insert(embedding.to_vec());
|
||||
}
|
||||
|
||||
docid_vectors_map.insert(docid, set);
|
||||
}
|
||||
}
|
||||
|
||||
// perform the manual diff
|
||||
let mut cursor = manual_vectors.into_cursor()?;
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
// convert the key back to a u32 (4 bytes)
|
||||
let (left, _index) = try_split_array_at(key).unwrap();
|
||||
|
@ -376,23 +426,30 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
let vector_deladd_obkv = KvReaderDelAdd::new(value);
|
||||
if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) {
|
||||
// convert the vector back to a Vec<f32>
|
||||
let vector = pod_collect_to_vec(value).into_iter().map(OrderedFloat).collect();
|
||||
let key = (docid, vector);
|
||||
if !vectors_set.remove(&key) {
|
||||
error!("Unable to delete the vector: {:?}", key.1);
|
||||
}
|
||||
let vector: Vec<OrderedFloat<_>> =
|
||||
pod_collect_to_vec(value).into_iter().map(OrderedFloat).collect();
|
||||
docid_vectors_map.entry(docid).and_modify(|v| {
|
||||
if !v.remove(&vector) {
|
||||
error!("Unable to delete the vector: {:?}", vector);
|
||||
}
|
||||
});
|
||||
}
|
||||
if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) {
|
||||
// convert the vector back to a Vec<f32>
|
||||
let vector = pod_collect_to_vec(value).into_iter().map(OrderedFloat).collect();
|
||||
vectors_set.insert((docid, vector));
|
||||
docid_vectors_map.entry(docid).and_modify(|v| {
|
||||
v.insert(vector);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Extract the most common vector dimension
|
||||
let expected_dimension_size = {
|
||||
let mut dims = HashMap::new();
|
||||
vectors_set.iter().for_each(|(_, v)| *dims.entry(v.len()).or_insert(0) += 1);
|
||||
docid_vectors_map
|
||||
.values()
|
||||
.flat_map(|v| v.iter())
|
||||
.for_each(|v| *dims.entry(v.len()).or_insert(0) += 1);
|
||||
dims.into_iter().max_by_key(|(_, count)| *count).map(|(len, _)| len)
|
||||
};
|
||||
|
||||
|
@ -400,7 +457,10 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
// prepare the vectors before inserting them in the HNSW.
|
||||
let mut points = Vec::new();
|
||||
let mut docids = Vec::new();
|
||||
for (docid, vector) in vectors_set {
|
||||
for (docid, vector) in docid_vectors_map
|
||||
.into_iter()
|
||||
.flat_map(|(docid, vectors)| std::iter::repeat(docid).zip(vectors))
|
||||
{
|
||||
if expected_dimension_size.map_or(false, |expected| expected != vector.len()) {
|
||||
return Err(UserError::InvalidVectorDimensions {
|
||||
expected: expected_dimension_size.unwrap_or(vector.len()),
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::result::Result as StdResult;
|
|||
|
||||
use charabia::{Normalize, Tokenizer, TokenizerBuilder};
|
||||
use deserr::{DeserializeError, Deserr};
|
||||
use itertools::Itertools;
|
||||
use itertools::{EitherOrBoth, Itertools};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
|
@ -15,6 +15,8 @@ use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS
|
|||
use crate::proximity::ProximityPrecision;
|
||||
use crate::update::index_documents::IndexDocumentsMethod;
|
||||
use crate::update::{IndexDocuments, UpdateIndexingStep};
|
||||
use crate::vector::settings::{EmbeddingSettings, PromptSettings};
|
||||
use crate::vector::EmbeddingConfig;
|
||||
use crate::{FieldsIdsMap, Index, OrderBy, Result};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
|
||||
|
@ -73,6 +75,13 @@ impl<T> Setting<T> {
|
|||
otherwise => otherwise,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn apply(&mut self, new: Self) {
|
||||
if let Setting::NotSet = new {
|
||||
return;
|
||||
}
|
||||
*self = new;
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Serialize> Serialize for Setting<T> {
|
||||
|
@ -129,6 +138,7 @@ pub struct Settings<'a, 't, 'i> {
|
|||
sort_facet_values_by: Setting<HashMap<String, OrderBy>>,
|
||||
pagination_max_total_hits: Setting<usize>,
|
||||
proximity_precision: Setting<ProximityPrecision>,
|
||||
embedder_settings: Setting<BTreeMap<String, Setting<EmbeddingSettings>>>,
|
||||
}
|
||||
|
||||
impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
||||
|
@ -161,6 +171,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
sort_facet_values_by: Setting::NotSet,
|
||||
pagination_max_total_hits: Setting::NotSet,
|
||||
proximity_precision: Setting::NotSet,
|
||||
embedder_settings: Setting::NotSet,
|
||||
indexer_config,
|
||||
}
|
||||
}
|
||||
|
@ -343,6 +354,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
self.proximity_precision = Setting::Reset;
|
||||
}
|
||||
|
||||
pub fn set_embedder_settings(&mut self, value: BTreeMap<String, Setting<EmbeddingSettings>>) {
|
||||
self.embedder_settings = Setting::Set(value);
|
||||
}
|
||||
|
||||
pub fn reset_embedder_settings(&mut self) {
|
||||
self.embedder_settings = Setting::Reset;
|
||||
}
|
||||
|
||||
fn reindex<FP, FA>(
|
||||
&mut self,
|
||||
progress_callback: &FP,
|
||||
|
@ -890,6 +909,60 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
Ok(changed)
|
||||
}
|
||||
|
||||
fn update_embedding_configs(&mut self) -> Result<bool> {
|
||||
let update = match std::mem::take(&mut self.embedder_settings) {
|
||||
Setting::Set(configs) => {
|
||||
let mut changed = false;
|
||||
let old_configs = self.index.embedding_configs(self.wtxn)?;
|
||||
let old_configs: BTreeMap<String, Setting<EmbeddingSettings>> =
|
||||
old_configs.into_iter().map(|(k, v)| (k, Setting::Set(v.into()))).collect();
|
||||
|
||||
let mut new_configs = BTreeMap::new();
|
||||
for joined in old_configs
|
||||
.into_iter()
|
||||
.merge_join_by(configs.into_iter(), |(left, _), (right, _)| left.cmp(right))
|
||||
{
|
||||
match joined {
|
||||
EitherOrBoth::Both((name, mut old), (_, new)) => {
|
||||
old.apply(new);
|
||||
let new = validate_prompt(&name, old)?;
|
||||
changed = true;
|
||||
new_configs.insert(name, new);
|
||||
}
|
||||
EitherOrBoth::Left((name, setting)) => {
|
||||
new_configs.insert(name, setting);
|
||||
}
|
||||
EitherOrBoth::Right((name, setting)) => {
|
||||
let setting = validate_prompt(&name, setting)?;
|
||||
changed = true;
|
||||
new_configs.insert(name, setting);
|
||||
}
|
||||
}
|
||||
}
|
||||
let new_configs: Vec<(String, EmbeddingConfig)> = new_configs
|
||||
.into_iter()
|
||||
.filter_map(|(name, setting)| match setting {
|
||||
Setting::Set(value) => Some((name, value.into())),
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => Some((name, EmbeddingSettings::default().into())),
|
||||
})
|
||||
.collect();
|
||||
if new_configs.is_empty() {
|
||||
self.index.delete_embedding_configs(self.wtxn)?;
|
||||
} else {
|
||||
self.index.put_embedding_configs(self.wtxn, new_configs)?;
|
||||
}
|
||||
changed
|
||||
}
|
||||
Setting::Reset => {
|
||||
self.index.delete_embedding_configs(self.wtxn)?;
|
||||
true
|
||||
}
|
||||
Setting::NotSet => false,
|
||||
};
|
||||
Ok(update)
|
||||
}
|
||||
|
||||
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA) -> Result<()>
|
||||
where
|
||||
FP: Fn(UpdateIndexingStep) + Sync,
|
||||
|
@ -927,6 +1000,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
let searchable_updated = self.update_searchable()?;
|
||||
let exact_attributes_updated = self.update_exact_attributes()?;
|
||||
let proximity_precision = self.update_proximity_precision()?;
|
||||
// TODO: very rough approximation of the needs for reindexing where any change will result in
|
||||
// a full reindexing.
|
||||
// What can be done instead:
|
||||
// 1. Only change the distance on a distance change
|
||||
// 2. Only change the name -> embedder mapping on a name change
|
||||
// 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage
|
||||
let embedding_configs_updated = self.update_embedding_configs()?;
|
||||
|
||||
if stop_words_updated
|
||||
|| non_separator_tokens_updated
|
||||
|
@ -937,6 +1017,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
|| searchable_updated
|
||||
|| exact_attributes_updated
|
||||
|| proximity_precision
|
||||
|| embedding_configs_updated
|
||||
{
|
||||
self.reindex(&progress_callback, &should_abort, old_fields_ids_map)?;
|
||||
}
|
||||
|
@ -945,6 +1026,34 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
}
|
||||
}
|
||||
|
||||
fn validate_prompt(
|
||||
name: &str,
|
||||
new: Setting<EmbeddingSettings>,
|
||||
) -> Result<Setting<EmbeddingSettings>> {
|
||||
match new {
|
||||
Setting::Set(EmbeddingSettings {
|
||||
embedder_options,
|
||||
prompt:
|
||||
Setting::Set(PromptSettings { template: Setting::Set(template), strategy, fallback }),
|
||||
}) => {
|
||||
// validate
|
||||
let template = crate::prompt::Prompt::new(template, None, None)
|
||||
.map(|prompt| crate::prompt::PromptData::from(prompt).template)
|
||||
.map_err(|inner| UserError::InvalidPromptForEmbeddings(name.to_owned(), inner))?;
|
||||
|
||||
Ok(Setting::Set(EmbeddingSettings {
|
||||
embedder_options,
|
||||
prompt: Setting::Set(PromptSettings {
|
||||
template: Setting::Set(template),
|
||||
strategy,
|
||||
fallback,
|
||||
}),
|
||||
}))
|
||||
}
|
||||
new => Ok(new),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use big_s::S;
|
||||
|
@ -1763,6 +1872,7 @@ mod tests {
|
|||
sort_facet_values_by,
|
||||
pagination_max_total_hits,
|
||||
proximity_precision,
|
||||
embedder_settings,
|
||||
} = settings;
|
||||
assert!(matches!(searchable_fields, Setting::NotSet));
|
||||
assert!(matches!(displayed_fields, Setting::NotSet));
|
||||
|
@ -1785,6 +1895,7 @@ mod tests {
|
|||
assert!(matches!(sort_facet_values_by, Setting::NotSet));
|
||||
assert!(matches!(pagination_max_total_hits, Setting::NotSet));
|
||||
assert!(matches!(proximity_precision, Setting::NotSet));
|
||||
assert!(matches!(embedder_settings, Setting::NotSet));
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue