implement the binary quantization in meilisearch

This commit is contained in:
Tamo 2024-09-18 18:13:37 +02:00
parent 5f474a640d
commit cc45e264ca
20 changed files with 559 additions and 223 deletions

View file

@ -20,7 +20,7 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::{EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistribution};
use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME};
use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::settings::ReindexAction;
use crate::vector::{Embedder, Embeddings};
use crate::{try_split_array_at, DocumentId, FieldId, Result, ThreadPoolNoAbort};
@ -208,65 +208,65 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
if reindex_vectors {
for (name, action) in settings_diff.embedding_config_updates.iter() {
match action {
EmbedderAction::WriteBackToDocuments(_) => continue, // already deleted
EmbedderAction::Reindex(action) => {
let Some((embedder_name, (embedder, prompt))) = configs.remove_entry(name)
else {
tracing::error!(embedder = name, "Requested embedder config not found");
continue;
};
if let Some(action) = action.reindex() {
let Some((embedder_name, (embedder, prompt, _quantized))) =
configs.remove_entry(name)
else {
tracing::error!(embedder = name, "Requested embedder config not found");
continue;
};
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> (prompt)
let prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> (prompt)
let prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> ()
let remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> ()
let remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let action = match action {
ReindexAction::FullReindex => ExtractionAction::SettingsFullReindex,
ReindexAction::RegeneratePrompts => {
let Some((_, old_prompt)) = old_configs.get(name) else {
tracing::error!(embedder = name, "Old embedder config not found");
continue;
};
let action = match action {
ReindexAction::FullReindex => ExtractionAction::SettingsFullReindex,
ReindexAction::RegeneratePrompts => {
let Some((_, old_prompt, _quantized)) = old_configs.get(name) else {
tracing::error!(embedder = name, "Old embedder config not found");
continue;
};
ExtractionAction::SettingsRegeneratePrompts { old_prompt }
}
};
ExtractionAction::SettingsRegeneratePrompts { old_prompt }
}
};
extractors.push(EmbedderVectorExtractor {
embedder_name,
embedder,
prompt,
prompts_writer,
remove_vectors_writer,
manual_vectors_writer,
add_to_user_provided: RoaringBitmap::new(),
action,
});
}
extractors.push(EmbedderVectorExtractor {
embedder_name,
embedder,
prompt,
prompts_writer,
remove_vectors_writer,
manual_vectors_writer,
add_to_user_provided: RoaringBitmap::new(),
action,
});
} else {
continue;
}
}
} else {
// document operation
for (embedder_name, (embedder, prompt)) in configs.into_iter() {
for (embedder_name, (embedder, prompt, _quantized)) in configs.into_iter() {
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,

View file

@ -43,7 +43,7 @@ use crate::update::index_documents::parallel::ImmutableObkvs;
use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
};
use crate::vector::EmbeddingConfigs;
use crate::vector::{ArroyReader, EmbeddingConfigs};
use crate::{CboRoaringBitmapCodec, Index, Object, Result};
static MERGED_DATABASE_COUNT: usize = 7;
@ -679,6 +679,24 @@ where
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
// If an embedder wasn't used in the typedchunk but must be binary quantized
// we should insert it in `dimension`
for (name, action) in settings_diff.embedding_config_updates.iter() {
if action.is_being_quantized && !dimension.contains_key(name.as_str()) {
let index = self.index.embedder_category_id.get(self.wtxn, name)?.ok_or(
InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
},
)?;
let first_id = crate::vector::arroy_db_range_for_embedder(index).next().unwrap();
let reader =
ArroyReader::new(self.index.vector_arroy, first_id, action.was_quantized);
let dim = reader.dimensions(self.wtxn)?;
dimension.insert(name.to_string(), dim);
}
}
for (embedder_name, dimension) in dimension {
let wtxn = &mut *self.wtxn;
let vector_arroy = self.index.vector_arroy;
@ -686,13 +704,19 @@ where
let embedder_index = self.index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None },
)?;
let embedder_config = settings_diff.embedding_config_updates.get(&embedder_name);
let was_quantized = embedder_config.map_or(false, |action| action.was_quantized);
let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized);
pool.install(|| {
for k in crate::vector::arroy_db_range_for_embedder(embedder_index) {
let writer = arroy::Writer::new(vector_arroy, k, dimension);
if writer.need_build(wtxn)? {
writer.build(wtxn, &mut rng, None)?;
} else if writer.is_empty(wtxn)? {
let mut writer = ArroyReader::new(vector_arroy, k, was_quantized);
if is_quantizing {
writer.quantize(wtxn, k, dimension)?;
}
if writer.need_build(wtxn, dimension)? {
writer.build(wtxn, &mut rng, dimension)?;
} else if writer.is_empty(wtxn, dimension)? {
break;
}
}
@ -2746,6 +2770,7 @@ mod tests {
response: Setting::NotSet,
distribution: Setting::NotSet,
headers: Setting::NotSet,
binary_quantized: Setting::NotSet,
}),
);
settings.set_embedder_settings(embedders);
@ -2774,7 +2799,7 @@ mod tests {
std::sync::Arc::new(crate::vector::Embedder::new(embedder.embedder_options).unwrap());
let res = index
.search(&rtxn)
.semantic(embedder_name, embedder, Some([0.0, 1.0, 2.0].to_vec()))
.semantic(embedder_name, embedder, false, Some([0.0, 1.0, 2.0].to_vec()))
.execute()
.unwrap();
assert_eq!(res.documents_ids.len(), 3);

View file

@ -28,7 +28,8 @@ use crate::update::index_documents::GrenadParameters;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::settings::WriteBackToDocuments;
use crate::vector::ArroyReader;
use crate::{
is_faceted_by, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result,
};
@ -989,23 +990,16 @@ impl<'a, 'i> Transform<'a, 'i> {
None
};
let readers: Result<
BTreeMap<&str, (Vec<arroy::Reader<'_, arroy::distances::Angular>>, &RoaringBitmap)>,
> = settings_diff
let readers: Result<BTreeMap<&str, (Vec<ArroyReader>, &RoaringBitmap)>> = settings_diff
.embedding_config_updates
.iter()
.filter_map(|(name, action)| {
if let EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
embedder_id,
user_provided,
}) = action
if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
action.write_back()
{
let readers: Result<Vec<_>> =
self.index.arroy_readers(wtxn, *embedder_id).collect();
match readers {
Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))),
Err(error) => Some(Err(error)),
}
let readers: Vec<_> =
self.index.arroy_readers(*embedder_id, action.was_quantized).collect();
Some(Ok((name.as_str(), (readers, user_provided))))
} else {
None
}
@ -1104,23 +1098,14 @@ impl<'a, 'i> Transform<'a, 'i> {
}
}
let mut writers = Vec::new();
// delete all vectors from the embedders that need removal
for (_, (readers, _)) in readers {
for reader in readers {
let dimensions = reader.dimensions();
let arroy_index = reader.index();
drop(reader);
let writer = arroy::Writer::new(self.index.vector_arroy, arroy_index, dimensions);
writers.push(writer);
let dimensions = reader.dimensions(wtxn)?;
reader.clear(wtxn, dimensions)?;
}
}
for writer in writers {
writer.clear(wtxn)?;
}
let grenad_params = GrenadParameters {
chunk_compression_type: self.indexer_settings.chunk_compression_type,
chunk_compression_level: self.indexer_settings.chunk_compression_level,

View file

@ -27,6 +27,7 @@ use crate::update::index_documents::helpers::{
as_cloneable_grenad, keep_latest_obkv, try_split_array_at,
};
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::ArroyReader;
use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
Result, SerializationError, U8StrStrCodec,
@ -666,9 +667,13 @@ pub(crate) fn write_typed_chunk_into_index(
let embedder_index = index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None },
)?;
let binary_quantized = settings_diff
.embedding_config_updates
.get(&embedder_name)
.map_or(false, |conf| conf.was_quantized);
// FIXME: allow customizing distance
let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index)
.map(|k| arroy::Writer::new(index.vector_arroy, k, expected_dimension))
.map(|k| ArroyReader::new(index.vector_arroy, k, binary_quantized))
.collect();
// remove vectors for docids we want them removed
@ -679,7 +684,7 @@ pub(crate) fn write_typed_chunk_into_index(
for writer in &writers {
// Uses invariant: vectors are packed in the first writers.
if !writer.del_item(wtxn, docid)? {
if !writer.del_item(wtxn, expected_dimension, docid)? {
break;
}
}
@ -711,7 +716,7 @@ pub(crate) fn write_typed_chunk_into_index(
)));
}
for (embedding, writer) in embeddings.iter().zip(&writers) {
writer.add_item(wtxn, docid, embedding)?;
writer.add_item(wtxn, expected_dimension, docid, embedding)?;
}
}
@ -734,7 +739,7 @@ pub(crate) fn write_typed_chunk_into_index(
break;
};
if candidate == vector {
writer.del_item(wtxn, docid)?;
writer.del_item(wtxn, expected_dimension, docid)?;
deleted_index = Some(index);
}
}
@ -751,8 +756,13 @@ pub(crate) fn write_typed_chunk_into_index(
if let Some((last_index, vector)) = last_index_with_a_vector {
// unwrap: computed the index from the list of writers
let writer = writers.get(last_index).unwrap();
writer.del_item(wtxn, docid)?;
writers.get(deleted_index).unwrap().add_item(wtxn, docid, &vector)?;
writer.del_item(wtxn, expected_dimension, docid)?;
writers.get(deleted_index).unwrap().add_item(
wtxn,
expected_dimension,
docid,
&vector,
)?;
}
}
}
@ -762,8 +772,8 @@ pub(crate) fn write_typed_chunk_into_index(
// overflow was detected during vector extraction.
for writer in &writers {
if !writer.contains_item(wtxn, docid)? {
writer.add_item(wtxn, docid, &vector)?;
if !writer.contains_item(wtxn, expected_dimension, docid)? {
writer.add_item(wtxn, expected_dimension, docid, &vector)?;
break;
}
}

View file

@ -425,11 +425,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
println!("inside reindex");
// if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition.
if self.index.number_of_documents(self.wtxn)? == 0 {
return Ok(());
}
println!("didnt early exit");
let transform = Transform::new(
self.wtxn,
@ -954,7 +956,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
let old_configs = self.index.embedding_configs(self.wtxn)?;
let remove_all: Result<BTreeMap<String, EmbedderAction>> = old_configs
.into_iter()
.map(|IndexEmbeddingConfig { name, config: _, user_provided }| -> Result<_> {
.map(|IndexEmbeddingConfig { name, config, user_provided }| -> Result<_> {
let embedder_id =
self.index.embedder_category_id.get(self.wtxn, &name)?.ok_or(
crate::InternalError::DatabaseMissingEntry {
@ -964,10 +966,10 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
)?;
Ok((
name,
EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
embedder_id,
user_provided,
}),
EmbedderAction::with_write_back(
WriteBackToDocuments { embedder_id, user_provided },
config.quantized(),
),
))
})
.collect();
@ -1004,7 +1006,8 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
match joined {
// updated config
EitherOrBoth::Both((name, (old, user_provided)), (_, new)) => {
let settings_diff = SettingsDiff::from_settings(old, new);
let was_quantized = old.binary_quantized.set().unwrap_or_default();
let settings_diff = SettingsDiff::from_settings(old, new)?;
match settings_diff {
SettingsDiff::Remove => {
tracing::debug!(
@ -1023,25 +1026,29 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.index.embedder_category_id.delete(self.wtxn, &name)?;
embedder_actions.insert(
name,
EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
embedder_id,
user_provided,
}),
EmbedderAction::with_write_back(
WriteBackToDocuments { embedder_id, user_provided },
was_quantized,
),
);
}
SettingsDiff::Reindex { action, updated_settings } => {
SettingsDiff::Reindex { action, updated_settings, quantize } => {
tracing::debug!(
embedder = name,
user_provided = user_provided.len(),
?action,
"reindex embedder"
);
embedder_actions.insert(name.clone(), EmbedderAction::Reindex(action));
embedder_actions.insert(
name.clone(),
EmbedderAction::with_reindex(action, was_quantized)
.with_is_being_quantized(quantize),
);
let new =
validate_embedding_settings(Setting::Set(updated_settings), &name)?;
updated_configs.insert(name, (new, user_provided));
}
SettingsDiff::UpdateWithoutReindex { updated_settings } => {
SettingsDiff::UpdateWithoutReindex { updated_settings, quantize } => {
tracing::debug!(
embedder = name,
user_provided = user_provided.len(),
@ -1049,6 +1056,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
);
let new =
validate_embedding_settings(Setting::Set(updated_settings), &name)?;
if quantize {
embedder_actions.insert(
name.clone(),
EmbedderAction::default().with_is_being_quantized(true),
);
}
updated_configs.insert(name, (new, user_provided));
}
}
@ -1067,8 +1080,10 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
&mut setting,
);
let setting = validate_embedding_settings(setting, &name)?;
embedder_actions
.insert(name.clone(), EmbedderAction::Reindex(ReindexAction::FullReindex));
embedder_actions.insert(
name.clone(),
EmbedderAction::with_reindex(ReindexAction::FullReindex, false),
);
updated_configs.insert(name, (setting, RoaringBitmap::new()));
}
}
@ -1082,19 +1097,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
let mut find_free_index =
move || free_indices.find(|(_, free)| **free).map(|(index, _)| index as u8);
for (name, action) in embedder_actions.iter() {
match action {
EmbedderAction::Reindex(ReindexAction::RegeneratePrompts) => {
/* cannot be a new embedder, so has to have an id already */
}
EmbedderAction::Reindex(ReindexAction::FullReindex) => {
if self.index.embedder_category_id.get(self.wtxn, name)?.is_none() {
let id = find_free_index()
.ok_or(UserError::TooManyEmbedders(updated_configs.len()))?;
tracing::debug!(embedder = name, id, "assigning free id to new embedder");
self.index.embedder_category_id.put(self.wtxn, name, &id)?;
}
}
EmbedderAction::WriteBackToDocuments(_) => { /* already removed */ }
if matches!(action.reindex(), Some(ReindexAction::FullReindex))
&& self.index.embedder_category_id.get(self.wtxn, name)?.is_none()
{
let id =
find_free_index().ok_or(UserError::TooManyEmbedders(updated_configs.len()))?;
tracing::debug!(embedder = name, id, "assigning free id to new embedder");
self.index.embedder_category_id.put(self.wtxn, name, &id)?;
}
}
let updated_configs: Vec<IndexEmbeddingConfig> = updated_configs
@ -1277,7 +1286,11 @@ impl InnerIndexSettingsDiff {
// if the user-defined searchables changed, then we need to reindex prompts.
if cache_user_defined_searchables {
for (embedder_name, (config, _)) in new_settings.embedding_configs.inner_as_ref() {
for (embedder_name, (config, _, _quantized)) in
new_settings.embedding_configs.inner_as_ref()
{
let was_quantized =
old_settings.embedding_configs.get(&embedder_name).map_or(false, |conf| conf.2);
// skip embedders that don't use document templates
if !config.uses_document_template() {
continue;
@ -1287,16 +1300,19 @@ impl InnerIndexSettingsDiff {
// this always makes the code clearer by explicitly handling the cases
match embedding_config_updates.entry(embedder_name.clone()) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(EmbedderAction::Reindex(ReindexAction::RegeneratePrompts));
entry.insert(EmbedderAction::with_reindex(
ReindexAction::RegeneratePrompts,
was_quantized,
));
}
std::collections::btree_map::Entry::Occupied(entry) => {
let EmbedderAction {
was_quantized: _,
is_being_quantized: _, // We are deleting this embedder, so no point in regeneration
write_back: _, // We are already fully reindexing
reindex: _, // We are already regenerating prompts
} = entry.get();
}
std::collections::btree_map::Entry::Occupied(entry) => match entry.get() {
EmbedderAction::WriteBackToDocuments(_) => { /* we are deleting this embedder, so no point in regeneration */
}
EmbedderAction::Reindex(ReindexAction::FullReindex) => { /* we are already fully reindexing */
}
EmbedderAction::Reindex(ReindexAction::RegeneratePrompts) => { /* we are already regenerating prompts */
}
},
};
}
}
@ -1546,7 +1562,7 @@ fn embedders(embedding_configs: Vec<IndexEmbeddingConfig>) -> Result<EmbeddingCo
.map(
|IndexEmbeddingConfig {
name,
config: EmbeddingConfig { embedder_options, prompt },
config: EmbeddingConfig { embedder_options, prompt, quantized },
..
}| {
let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?);
@ -1556,7 +1572,7 @@ fn embedders(embedding_configs: Vec<IndexEmbeddingConfig>) -> Result<EmbeddingCo
.map_err(crate::vector::Error::from)
.map_err(crate::Error::from)?,
);
Ok((name, (embedder, prompt)))
Ok((name, (embedder, prompt, quantized.unwrap_or_default())))
},
)
.collect();
@ -1581,6 +1597,7 @@ fn validate_prompt(
response,
distribution,
headers,
binary_quantized: binary_quantize,
}) => {
let max_bytes = match document_template_max_bytes.set() {
Some(max_bytes) => NonZeroUsize::new(max_bytes).ok_or_else(|| {
@ -1613,6 +1630,7 @@ fn validate_prompt(
response,
distribution,
headers,
binary_quantized: binary_quantize,
}))
}
new => Ok(new),
@ -1638,6 +1656,7 @@ pub fn validate_embedding_settings(
response,
distribution,
headers,
binary_quantized: binary_quantize,
} = settings;
if let Some(0) = dimensions.set() {
@ -1678,6 +1697,7 @@ pub fn validate_embedding_settings(
response,
distribution,
headers,
binary_quantized: binary_quantize,
}));
};
match inferred_source {
@ -1779,6 +1799,7 @@ pub fn validate_embedding_settings(
response,
distribution,
headers,
binary_quantized: binary_quantize,
}))
}