mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 11:57:07 +02:00
Merge #4649
4649: Don't store the vectors in the documents database r=dureuill a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/4607 ## What does this PR do? - Ensure that anything falling under `_vectors` is NOT searchable, filterable or sortable - [x] per embedder, add a roaring bitmap of documents that provide "userProvided" embeddings - [x] in the indexing process in extract_vector_points, set the bit corresponding to the document depending on the "userProvided" subfield in the _vectors field. - [x] in the document DB in typed chunks, when writing the _vectors field, remove all keys corresponding to an embedder Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
e9bf4c43a4
60 changed files with 3920 additions and 1126 deletions
|
@ -4,6 +4,7 @@ use std::collections::HashMap;
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
|
||||
use crate::{FieldId, FieldsIdsMap, Weight};
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
|
@ -23,7 +24,13 @@ impl FieldidsWeightsMap {
|
|||
/// Should only be called in the case there are NO searchable attributes.
|
||||
/// All the fields will be inserted in the order of the fields ids map with a weight of 0.
|
||||
pub fn from_field_id_map_without_searchable(fid_map: &FieldsIdsMap) -> Self {
|
||||
FieldidsWeightsMap { map: fid_map.ids().map(|fid| (fid, 0)).collect() }
|
||||
FieldidsWeightsMap {
|
||||
map: fid_map
|
||||
.iter()
|
||||
.filter(|(_fid, name)| !crate::is_faceted_by(name, RESERVED_VECTORS_FIELD_NAME))
|
||||
.map(|(fid, _name)| (fid, 0))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes a field id from the map, returning the associated weight previously in the map.
|
||||
|
|
|
@ -41,6 +41,16 @@ impl FieldsIdsMap {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get the ids of a field and all its nested fields based on its name.
|
||||
pub fn nested_ids(&self, name: &str) -> Vec<FieldId> {
|
||||
self.names_ids
|
||||
.range(name.to_string()..)
|
||||
.take_while(|(key, _)| key.starts_with(name))
|
||||
.filter(|(key, _)| crate::is_faceted_by(key, name))
|
||||
.map(|(_name, id)| *id)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get the id of a field based on its name.
|
||||
pub fn id(&self, name: &str) -> Option<FieldId> {
|
||||
self.names_ids.get(name).copied()
|
||||
|
@ -126,4 +136,32 @@ mod tests {
|
|||
assert_eq!(iter.next(), Some((3, "title")));
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nested_fields() {
|
||||
let mut map = FieldsIdsMap::new();
|
||||
|
||||
assert_eq!(map.insert("id"), Some(0));
|
||||
assert_eq!(map.insert("doggo"), Some(1));
|
||||
assert_eq!(map.insert("doggo.name"), Some(2));
|
||||
assert_eq!(map.insert("doggolution"), Some(3));
|
||||
assert_eq!(map.insert("doggo.breed.name"), Some(4));
|
||||
assert_eq!(map.insert("description"), Some(5));
|
||||
|
||||
insta::assert_debug_snapshot!(map.nested_ids("doggo"), @r###"
|
||||
[
|
||||
1,
|
||||
4,
|
||||
2,
|
||||
]
|
||||
"###);
|
||||
|
||||
insta::assert_debug_snapshot!(map.nested_ids("doggo.breed"), @r###"
|
||||
[
|
||||
4,
|
||||
]
|
||||
"###);
|
||||
|
||||
insta::assert_debug_snapshot!(map.nested_ids("_vector"), @"[]");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ use heed::types::*;
|
|||
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
|
||||
use roaring::RoaringBitmap;
|
||||
use rstar::RTree;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::documents::PrimaryKey;
|
||||
|
@ -23,6 +24,7 @@ use crate::heed_codec::{
|
|||
};
|
||||
use crate::order_by_map::OrderByMap;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
|
||||
use crate::vector::{Embedding, EmbeddingConfig};
|
||||
use crate::{
|
||||
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
|
||||
|
@ -644,6 +646,7 @@ impl Index {
|
|||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
user_fields: &[&str],
|
||||
non_searchable_fields_ids: &[FieldId],
|
||||
fields_ids_map: &FieldsIdsMap,
|
||||
) -> Result<()> {
|
||||
// We can write the user defined searchable fields as-is.
|
||||
|
@ -662,6 +665,7 @@ impl Index {
|
|||
for (weight, user_field) in user_fields.iter().enumerate() {
|
||||
if crate::is_faceted_by(field_from_map, user_field)
|
||||
&& !real_fields.contains(&field_from_map)
|
||||
&& !non_searchable_fields_ids.contains(&id)
|
||||
{
|
||||
real_fields.push(field_from_map);
|
||||
|
||||
|
@ -708,6 +712,7 @@ impl Index {
|
|||
Ok(self
|
||||
.fields_ids_map(rtxn)?
|
||||
.names()
|
||||
.filter(|name| !crate::is_faceted_by(name, RESERVED_VECTORS_FIELD_NAME))
|
||||
.map(|field| Cow::Owned(field.to_string()))
|
||||
.collect())
|
||||
})
|
||||
|
@ -1568,12 +1573,16 @@ impl Index {
|
|||
Ok(script_language)
|
||||
}
|
||||
|
||||
/// Put the embedding configs:
|
||||
/// 1. The name of the embedder
|
||||
/// 2. The configuration option for this embedder
|
||||
/// 3. The list of documents with a user provided embedding
|
||||
pub(crate) fn put_embedding_configs(
|
||||
&self,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
configs: Vec<(String, EmbeddingConfig)>,
|
||||
configs: Vec<IndexEmbeddingConfig>,
|
||||
) -> heed::Result<()> {
|
||||
self.main.remap_types::<Str, SerdeJson<Vec<(String, EmbeddingConfig)>>>().put(
|
||||
self.main.remap_types::<Str, SerdeJson<Vec<IndexEmbeddingConfig>>>().put(
|
||||
wtxn,
|
||||
main_key::EMBEDDING_CONFIGS,
|
||||
&configs,
|
||||
|
@ -1584,13 +1593,10 @@ impl Index {
|
|||
self.main.remap_key_type::<Str>().delete(wtxn, main_key::EMBEDDING_CONFIGS)
|
||||
}
|
||||
|
||||
pub fn embedding_configs(
|
||||
&self,
|
||||
rtxn: &RoTxn<'_>,
|
||||
) -> Result<Vec<(String, crate::vector::EmbeddingConfig)>> {
|
||||
pub fn embedding_configs(&self, rtxn: &RoTxn<'_>) -> Result<Vec<IndexEmbeddingConfig>> {
|
||||
Ok(self
|
||||
.main
|
||||
.remap_types::<Str, SerdeJson<Vec<(String, EmbeddingConfig)>>>()
|
||||
.remap_types::<Str, SerdeJson<Vec<IndexEmbeddingConfig>>>()
|
||||
.get(rtxn, main_key::EMBEDDING_CONFIGS)?
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
@ -1662,6 +1668,13 @@ impl Index {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct IndexEmbeddingConfig {
|
||||
pub name: String,
|
||||
pub config: EmbeddingConfig,
|
||||
pub user_provided: RoaringBitmap,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
@ -1669,15 +1682,17 @@ pub(crate) mod tests {
|
|||
|
||||
use big_s::S;
|
||||
use heed::{EnvOpenOptions, RwTxn};
|
||||
use maplit::hashset;
|
||||
use maplit::{btreemap, hashset};
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::documents::DocumentsBatchReader;
|
||||
use crate::error::{Error, InternalError};
|
||||
use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS};
|
||||
use crate::update::{
|
||||
self, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings,
|
||||
self, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting,
|
||||
Settings,
|
||||
};
|
||||
use crate::vector::settings::{EmbedderSource, EmbeddingSettings};
|
||||
use crate::{db_snap, obkv_to_json, Filter, Index, Search, SearchResult};
|
||||
|
||||
pub(crate) struct TempIndex {
|
||||
|
@ -2783,4 +2798,95 @@ pub(crate) mod tests {
|
|||
]
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn vectors_are_never_indexed_as_searchable_or_filterable() {
|
||||
let index = TempIndex::new();
|
||||
|
||||
index
|
||||
.add_documents(documents!([
|
||||
{ "id": 0, "_vectors": { "doggo": [2345] } },
|
||||
{ "id": 1, "_vectors": { "doggo": [6789] } },
|
||||
]))
|
||||
.unwrap();
|
||||
|
||||
db_snap!(index, fields_ids_map, @r###"
|
||||
0 id |
|
||||
1 _vectors |
|
||||
2 _vectors.doggo |
|
||||
"###);
|
||||
db_snap!(index, searchable_fields, @r###"["id"]"###);
|
||||
db_snap!(index, fieldids_weights_map, @r###"
|
||||
fid weight
|
||||
0 0 |
|
||||
"###);
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let mut search = index.search(&rtxn);
|
||||
let results = search.query("2345").execute().unwrap();
|
||||
assert!(results.candidates.is_empty());
|
||||
drop(rtxn);
|
||||
|
||||
index
|
||||
.update_settings(|settings| {
|
||||
settings.set_searchable_fields(vec![S("_vectors"), S("_vectors.doggo")]);
|
||||
settings.set_filterable_fields(hashset![S("_vectors"), S("_vectors.doggo")]);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
db_snap!(index, fields_ids_map, @r###"
|
||||
0 id |
|
||||
1 _vectors |
|
||||
2 _vectors.doggo |
|
||||
"###);
|
||||
db_snap!(index, searchable_fields, @"[]");
|
||||
db_snap!(index, fieldids_weights_map, @r###"
|
||||
fid weight
|
||||
"###);
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let mut search = index.search(&rtxn);
|
||||
let results = search.query("2345").execute().unwrap();
|
||||
assert!(results.candidates.is_empty());
|
||||
|
||||
let mut search = index.search(&rtxn);
|
||||
let results = search
|
||||
.filter(Filter::from_str("_vectors.doggo = 6789").unwrap().unwrap())
|
||||
.execute()
|
||||
.unwrap();
|
||||
assert!(results.candidates.is_empty());
|
||||
|
||||
index
|
||||
.update_settings(|settings| {
|
||||
settings.set_embedder_settings(btreemap! {
|
||||
S("doggo") => Setting::Set(EmbeddingSettings {
|
||||
dimensions: Setting::Set(1),
|
||||
source: Setting::Set(EmbedderSource::UserProvided),
|
||||
..EmbeddingSettings::default()}),
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
db_snap!(index, fields_ids_map, @r###"
|
||||
0 id |
|
||||
1 _vectors |
|
||||
2 _vectors.doggo |
|
||||
"###);
|
||||
db_snap!(index, searchable_fields, @"[]");
|
||||
db_snap!(index, fieldids_weights_map, @r###"
|
||||
fid weight
|
||||
"###);
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let mut search = index.search(&rtxn);
|
||||
let results = search.query("2345").execute().unwrap();
|
||||
assert!(results.candidates.is_empty());
|
||||
|
||||
let mut search = index.search(&rtxn);
|
||||
let results = search
|
||||
.filter(Filter::from_str("_vectors.doggo = 6789").unwrap().unwrap())
|
||||
.execute()
|
||||
.unwrap();
|
||||
assert!(results.candidates.is_empty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ pub enum SearchEvents {
|
|||
RankingRuleStartIteration { ranking_rule_idx: usize, universe_len: u64 },
|
||||
RankingRuleNextBucket { ranking_rule_idx: usize, universe_len: u64, bucket_len: u64 },
|
||||
RankingRuleSkipBucket { ranking_rule_idx: usize, bucket_len: u64 },
|
||||
RankingRuleEndIteration { ranking_rule_idx: usize, universe_len: u64 },
|
||||
RankingRuleEndIteration { ranking_rule_idx: usize },
|
||||
ExtendResults { new: Vec<u32> },
|
||||
ProximityGraph { graph: RankingRuleGraph<ProximityGraph> },
|
||||
ProximityPaths { paths: Vec<Vec<Interned<ProximityCondition>>> },
|
||||
|
@ -123,12 +123,9 @@ impl SearchLogger<QueryGraph> for VisualSearchLogger {
|
|||
&mut self,
|
||||
ranking_rule_idx: usize,
|
||||
_ranking_rule: &dyn RankingRule<QueryGraph>,
|
||||
universe: &RoaringBitmap,
|
||||
_universe: &RoaringBitmap,
|
||||
) {
|
||||
self.events.push(SearchEvents::RankingRuleEndIteration {
|
||||
ranking_rule_idx,
|
||||
universe_len: universe.len(),
|
||||
});
|
||||
self.events.push(SearchEvents::RankingRuleEndIteration { ranking_rule_idx });
|
||||
self.location.pop();
|
||||
}
|
||||
fn add_to_results(&mut self, docids: &[u32]) {
|
||||
|
@ -326,7 +323,7 @@ impl<'ctx> DetailedLoggerFinish<'ctx> {
|
|||
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
|
||||
self.write_skip_bucket(bucket_len)?;
|
||||
}
|
||||
SearchEvents::RankingRuleEndIteration { ranking_rule_idx, universe_len: _ } => {
|
||||
SearchEvents::RankingRuleEndIteration { ranking_rule_idx } => {
|
||||
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
|
||||
self.write_end_iteration()?;
|
||||
}
|
||||
|
|
|
@ -1,244 +0,0 @@
|
|||
---
|
||||
source: milli/src/search/new/tests/attribute_fid.rs
|
||||
expression: "format!(\"{document_ids_scores:#?}\")"
|
||||
---
|
||||
[
|
||||
(
|
||||
2,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 19,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 91,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
6,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 15,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 81,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
5,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 14,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 79,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
4,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 13,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 77,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
3,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 12,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 83,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
9,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 11,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 75,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
8,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 10,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 79,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
7,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 10,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 73,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
11,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 7,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 77,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
10,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 6,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 81,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
13,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 6,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 81,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
12,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 6,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 78,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
14,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 5,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 75,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
0,
|
||||
[
|
||||
Fid(
|
||||
Rank {
|
||||
rank: 1,
|
||||
max_rank: 19,
|
||||
},
|
||||
),
|
||||
Position(
|
||||
Rank {
|
||||
rank: 91,
|
||||
max_rank: 91,
|
||||
},
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
|
@ -1,7 +0,0 @@
|
|||
---
|
||||
source: milli/src/index.rs
|
||||
---
|
||||
age 1 |
|
||||
id 2 |
|
||||
name 2 |
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
---
|
||||
source: milli/src/index.rs
|
||||
---
|
||||
age 1 |
|
||||
id 2 |
|
||||
name 2 |
|
||||
|
|
@ -64,6 +64,13 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
|
|||
self.index.delete_geo_rtree(self.wtxn)?;
|
||||
self.index.delete_geo_faceted_documents_ids(self.wtxn)?;
|
||||
|
||||
// Remove all user-provided bits from the configs
|
||||
let mut configs = self.index.embedding_configs(self.wtxn)?;
|
||||
for config in configs.iter_mut() {
|
||||
config.user_provided.clear();
|
||||
}
|
||||
self.index.put_embedding_configs(self.wtxn, configs)?;
|
||||
|
||||
// Clear the other databases.
|
||||
external_documents_ids.clear(self.wtxn)?;
|
||||
word_docids.clear(self.wtxn)?;
|
||||
|
|
|
@ -8,18 +8,19 @@ use std::sync::Arc;
|
|||
|
||||
use bytemuck::cast_slice;
|
||||
use grenad::Writer;
|
||||
use itertools::EitherOrBoth;
|
||||
use ordered_float::OrderedFloat;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde_json::Value;
|
||||
|
||||
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::prompt::Prompt;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::helpers::try_split_at;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::vector::parsed_vectors::{ParsedVectorsDiff, RESERVED_VECTORS_FIELD_NAME};
|
||||
use crate::vector::parsed_vectors::{ParsedVectorsDiff, VectorState, RESERVED_VECTORS_FIELD_NAME};
|
||||
use crate::vector::settings::{EmbedderAction, ReindexAction};
|
||||
use crate::vector::Embedder;
|
||||
use crate::{DocumentId, Result, ThreadPoolNoAbort};
|
||||
use crate::{try_split_array_at, DocumentId, FieldId, FieldsIdsMap, Result, ThreadPoolNoAbort};
|
||||
|
||||
/// The length of the elements that are always in the buffer when inserting new values.
|
||||
const TRUNCATE_SIZE: usize = size_of::<DocumentId>();
|
||||
|
@ -35,6 +36,8 @@ pub struct ExtractedVectorPoints {
|
|||
// embedder
|
||||
pub embedder_name: String,
|
||||
pub embedder: Arc<Embedder>,
|
||||
pub add_to_user_provided: RoaringBitmap,
|
||||
pub remove_from_user_provided: RoaringBitmap,
|
||||
}
|
||||
|
||||
enum VectorStateDelta {
|
||||
|
@ -42,12 +45,7 @@ enum VectorStateDelta {
|
|||
// Remove all vectors, generated or manual, from this document
|
||||
NowRemoved,
|
||||
|
||||
// Add the manually specified vectors, passed in the other grenad
|
||||
// Remove any previously generated vectors
|
||||
// Note: changing the value of the manually specified vector **should not record** this delta
|
||||
WasGeneratedNowManual(Vec<Vec<f32>>),
|
||||
|
||||
ManualDelta(Vec<Vec<f32>>, Vec<Vec<f32>>),
|
||||
NowManual(Vec<Vec<f32>>),
|
||||
|
||||
// Add the vector computed from the specified prompt
|
||||
// Remove any previous vector
|
||||
|
@ -56,14 +54,12 @@ enum VectorStateDelta {
|
|||
}
|
||||
|
||||
impl VectorStateDelta {
|
||||
fn into_values(self) -> (bool, String, (Vec<Vec<f32>>, Vec<Vec<f32>>)) {
|
||||
fn into_values(self) -> (bool, String, Vec<Vec<f32>>) {
|
||||
match self {
|
||||
VectorStateDelta::NoChange => Default::default(),
|
||||
VectorStateDelta::NowRemoved => (true, Default::default(), Default::default()),
|
||||
VectorStateDelta::WasGeneratedNowManual(add) => {
|
||||
(true, Default::default(), (Default::default(), add))
|
||||
}
|
||||
VectorStateDelta::ManualDelta(del, add) => (false, Default::default(), (del, add)),
|
||||
// We always delete the previous vectors
|
||||
VectorStateDelta::NowManual(add) => (true, Default::default(), add),
|
||||
VectorStateDelta::NowGenerated(prompt) => (true, prompt, Default::default()),
|
||||
}
|
||||
}
|
||||
|
@ -74,12 +70,27 @@ struct EmbedderVectorExtractor {
|
|||
embedder: Arc<Embedder>,
|
||||
prompt: Arc<Prompt>,
|
||||
|
||||
// (docid, _index) -> KvWriterDelAdd -> Vector
|
||||
manual_vectors_writer: Writer<BufWriter<File>>,
|
||||
// (docid) -> (prompt)
|
||||
prompts_writer: Writer<BufWriter<File>>,
|
||||
// (docid) -> ()
|
||||
remove_vectors_writer: Writer<BufWriter<File>>,
|
||||
// (docid, _index) -> KvWriterDelAdd -> Vector
|
||||
manual_vectors_writer: Writer<BufWriter<File>>,
|
||||
// The docids of the documents that contains a user defined embedding
|
||||
add_to_user_provided: RoaringBitmap,
|
||||
|
||||
action: ExtractionAction,
|
||||
}
|
||||
|
||||
struct DocumentOperation {
|
||||
// The docids of the documents that contains an auto-generated embedding
|
||||
remove_from_user_provided: RoaringBitmap,
|
||||
}
|
||||
|
||||
enum ExtractionAction {
|
||||
SettingsFullReindex,
|
||||
SettingsRegeneratePrompts { old_prompt: Arc<Prompt> },
|
||||
DocumentOperation(DocumentOperation),
|
||||
}
|
||||
|
||||
/// Extracts the embedding vector contained in each document under the `_vectors` field.
|
||||
|
@ -89,6 +100,7 @@ struct EmbedderVectorExtractor {
|
|||
pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||
obkv_documents: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
embedders_configs: &[IndexEmbeddingConfig],
|
||||
settings_diff: &InnerIndexSettingsDiff,
|
||||
) -> Result<Vec<ExtractedVectorPoints>> {
|
||||
let reindex_vectors = settings_diff.reindex_vectors();
|
||||
|
@ -97,153 +109,207 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
let new_fields_ids_map = &settings_diff.new.fields_ids_map;
|
||||
// the vector field id may have changed
|
||||
let old_vectors_fid = old_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME);
|
||||
// filter the old vector fid if the settings has been changed forcing reindexing.
|
||||
let old_vectors_fid = old_vectors_fid.filter(|_| !reindex_vectors);
|
||||
|
||||
let new_vectors_fid = new_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME);
|
||||
|
||||
let mut extractors = Vec::new();
|
||||
for (embedder_name, (embedder, prompt)) in
|
||||
settings_diff.new.embedding_configs.clone().into_iter()
|
||||
{
|
||||
// (docid, _index) -> KvWriterDelAdd -> Vector
|
||||
let manual_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> (prompt)
|
||||
let prompts_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
let mut configs = settings_diff.new.embedding_configs.clone().into_inner();
|
||||
let old_configs = &settings_diff.old.embedding_configs;
|
||||
|
||||
// (docid) -> ()
|
||||
let remove_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
if reindex_vectors {
|
||||
for (name, action) in settings_diff.embedding_config_updates.iter() {
|
||||
match action {
|
||||
EmbedderAction::WriteBackToDocuments(_) => continue, // already deleted
|
||||
EmbedderAction::Reindex(action) => {
|
||||
let Some((embedder_name, (embedder, prompt))) = configs.remove_entry(name)
|
||||
else {
|
||||
tracing::error!(embedder = name, "Requested embedder config not found");
|
||||
continue;
|
||||
};
|
||||
|
||||
extractors.push(EmbedderVectorExtractor {
|
||||
embedder_name,
|
||||
embedder,
|
||||
prompt,
|
||||
manual_vectors_writer,
|
||||
prompts_writer,
|
||||
remove_vectors_writer,
|
||||
});
|
||||
// (docid, _index) -> KvWriterDelAdd -> Vector
|
||||
let manual_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> (prompt)
|
||||
let prompts_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> ()
|
||||
let remove_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
let action = match action {
|
||||
ReindexAction::FullReindex => ExtractionAction::SettingsFullReindex,
|
||||
ReindexAction::RegeneratePrompts => {
|
||||
let Some((_, old_prompt)) = old_configs.get(name) else {
|
||||
tracing::error!(embedder = name, "Old embedder config not found");
|
||||
continue;
|
||||
};
|
||||
|
||||
ExtractionAction::SettingsRegeneratePrompts { old_prompt }
|
||||
}
|
||||
};
|
||||
|
||||
extractors.push(EmbedderVectorExtractor {
|
||||
embedder_name,
|
||||
embedder,
|
||||
prompt,
|
||||
prompts_writer,
|
||||
remove_vectors_writer,
|
||||
manual_vectors_writer,
|
||||
add_to_user_provided: RoaringBitmap::new(),
|
||||
action,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// document operation
|
||||
|
||||
for (embedder_name, (embedder, prompt)) in configs.into_iter() {
|
||||
// (docid, _index) -> KvWriterDelAdd -> Vector
|
||||
let manual_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> (prompt)
|
||||
let prompts_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// (docid) -> ()
|
||||
let remove_vectors_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
extractors.push(EmbedderVectorExtractor {
|
||||
embedder_name,
|
||||
embedder,
|
||||
prompt,
|
||||
prompts_writer,
|
||||
remove_vectors_writer,
|
||||
manual_vectors_writer,
|
||||
add_to_user_provided: RoaringBitmap::new(),
|
||||
action: ExtractionAction::DocumentOperation(DocumentOperation {
|
||||
remove_from_user_provided: RoaringBitmap::new(),
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut cursor = obkv_documents.into_cursor()?;
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
// this must always be serialized as (docid, external_docid);
|
||||
const SIZE_OF_DOCUMENTID: usize = std::mem::size_of::<DocumentId>();
|
||||
let (docid_bytes, external_id_bytes) =
|
||||
try_split_at(key, std::mem::size_of::<DocumentId>()).unwrap();
|
||||
try_split_array_at::<u8, SIZE_OF_DOCUMENTID>(key).unwrap();
|
||||
debug_assert!(from_utf8(external_id_bytes).is_ok());
|
||||
let docid = DocumentId::from_be_bytes(docid_bytes);
|
||||
|
||||
let obkv = obkv::KvReader::new(value);
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(docid_bytes);
|
||||
key_buffer.extend_from_slice(docid_bytes.as_slice());
|
||||
|
||||
// since we only need the primary key when we throw an error we create this getter to
|
||||
// lazily get it when needed
|
||||
let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() };
|
||||
|
||||
let mut parsed_vectors = ParsedVectorsDiff::new(obkv, old_vectors_fid, new_vectors_fid)
|
||||
.map_err(|error| error.to_crate_error(document_id().to_string()))?;
|
||||
let mut parsed_vectors = ParsedVectorsDiff::new(
|
||||
docid,
|
||||
embedders_configs,
|
||||
obkv,
|
||||
old_vectors_fid,
|
||||
new_vectors_fid,
|
||||
)
|
||||
.map_err(|error| error.to_crate_error(document_id().to_string()))?;
|
||||
|
||||
for EmbedderVectorExtractor {
|
||||
embedder_name,
|
||||
embedder: _,
|
||||
prompt,
|
||||
manual_vectors_writer,
|
||||
prompts_writer,
|
||||
remove_vectors_writer,
|
||||
manual_vectors_writer,
|
||||
add_to_user_provided,
|
||||
action,
|
||||
} in extractors.iter_mut()
|
||||
{
|
||||
let delta = match parsed_vectors.remove(embedder_name) {
|
||||
(Some(old), Some(new)) => {
|
||||
// no autogeneration
|
||||
let del_vectors = old.into_array_of_vectors();
|
||||
let add_vectors = new.into_array_of_vectors();
|
||||
|
||||
if add_vectors.len() > usize::from(u8::MAX) {
|
||||
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
|
||||
document_id().to_string(),
|
||||
add_vectors.len(),
|
||||
)));
|
||||
}
|
||||
|
||||
VectorStateDelta::ManualDelta(del_vectors, add_vectors)
|
||||
}
|
||||
(Some(_old), None) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
// becomes autogenerated
|
||||
VectorStateDelta::NowGenerated(prompt.render(
|
||||
obkv,
|
||||
DelAdd::Addition,
|
||||
new_fields_ids_map,
|
||||
)?)
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
}
|
||||
(None, Some(new)) => {
|
||||
// was possibly autogenerated, remove all vectors for that document
|
||||
let add_vectors = new.into_array_of_vectors();
|
||||
if add_vectors.len() > usize::from(u8::MAX) {
|
||||
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
|
||||
document_id().to_string(),
|
||||
add_vectors.len(),
|
||||
)));
|
||||
}
|
||||
|
||||
VectorStateDelta::WasGeneratedNowManual(add_vectors)
|
||||
}
|
||||
(None, None) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
|
||||
if document_is_kept {
|
||||
// Don't give up if the old prompt was failing
|
||||
let old_prompt = Some(&prompt)
|
||||
// TODO: this filter works because we erase the vec database when a embedding setting changes.
|
||||
// When vector pipeline will be optimized, this should be removed.
|
||||
.filter(|_| !settings_diff.reindex_vectors())
|
||||
.map(|p| {
|
||||
p.render(obkv, DelAdd::Deletion, old_fields_ids_map)
|
||||
.unwrap_or_default()
|
||||
});
|
||||
let new_prompt =
|
||||
prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
|
||||
if old_prompt.as_ref() != Some(&new_prompt) {
|
||||
let old_prompt = old_prompt.unwrap_or_default();
|
||||
tracing::trace!(
|
||||
"🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}"
|
||||
);
|
||||
VectorStateDelta::NowGenerated(new_prompt)
|
||||
} else {
|
||||
tracing::trace!("⏭️ Prompt unmodified, skipping");
|
||||
VectorStateDelta::NoChange
|
||||
let (old, new) = parsed_vectors.remove(embedder_name);
|
||||
let delta = match action {
|
||||
ExtractionAction::SettingsFullReindex => match old {
|
||||
// A full reindex can be triggered either by:
|
||||
// 1. a new embedder
|
||||
// 2. an existing embedder changed so that it must regenerate all generated embeddings.
|
||||
// For a new embedder, there can be `_vectors.embedder` embeddings to add to the DB
|
||||
VectorState::Inline(vectors) => {
|
||||
if !vectors.must_regenerate() {
|
||||
add_to_user_provided.insert(docid);
|
||||
}
|
||||
|
||||
match vectors.into_array_of_vectors() {
|
||||
Some(add_vectors) => {
|
||||
if add_vectors.len() > usize::from(u8::MAX) {
|
||||
return Err(crate::Error::UserError(
|
||||
crate::UserError::TooManyVectors(
|
||||
document_id().to_string(),
|
||||
add_vectors.len(),
|
||||
),
|
||||
));
|
||||
}
|
||||
VectorStateDelta::NowManual(add_vectors)
|
||||
}
|
||||
None => VectorStateDelta::NoChange,
|
||||
}
|
||||
}
|
||||
// this happens only when an existing embedder changed. We cannot regenerate userProvided vectors
|
||||
VectorState::Manual => VectorStateDelta::NoChange,
|
||||
// generated vectors must be regenerated
|
||||
VectorState::Generated => regenerate_prompt(obkv, prompt, new_fields_ids_map)?,
|
||||
},
|
||||
// prompt regeneration is only triggered for existing embedders
|
||||
ExtractionAction::SettingsRegeneratePrompts { old_prompt } => {
|
||||
if old.must_regenerate() {
|
||||
regenerate_if_prompt_changed(
|
||||
obkv,
|
||||
(old_prompt, prompt),
|
||||
(&old_fields_ids_map, &new_fields_ids_map),
|
||||
)?
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
// we can simply ignore user provided vectors as they are not regenerated and are
|
||||
// already in the DB since this is an existing embedder
|
||||
VectorStateDelta::NoChange
|
||||
}
|
||||
}
|
||||
ExtractionAction::DocumentOperation(DocumentOperation {
|
||||
remove_from_user_provided,
|
||||
}) => extract_vector_document_diff(
|
||||
docid,
|
||||
obkv,
|
||||
prompt,
|
||||
(add_to_user_provided, remove_from_user_provided),
|
||||
(old, new),
|
||||
(&old_fields_ids_map, &new_fields_ids_map),
|
||||
document_id,
|
||||
)?,
|
||||
};
|
||||
|
||||
// and we finally push the unique vectors into the writer
|
||||
push_vectors_diff(
|
||||
remove_vectors_writer,
|
||||
|
@ -251,7 +317,6 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
manual_vectors_writer,
|
||||
&mut key_buffer,
|
||||
delta,
|
||||
reindex_vectors,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
@ -262,43 +327,185 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
embedder_name,
|
||||
embedder,
|
||||
prompt: _,
|
||||
manual_vectors_writer,
|
||||
prompts_writer,
|
||||
remove_vectors_writer,
|
||||
action,
|
||||
manual_vectors_writer,
|
||||
add_to_user_provided,
|
||||
} in extractors
|
||||
{
|
||||
results.push(ExtractedVectorPoints {
|
||||
// docid, _index -> KvWriterDelAdd -> Vector
|
||||
manual_vectors: writer_into_reader(manual_vectors_writer)?,
|
||||
// docid -> ()
|
||||
remove_vectors: writer_into_reader(remove_vectors_writer)?,
|
||||
// docid -> prompt
|
||||
prompts: writer_into_reader(prompts_writer)?,
|
||||
let remove_from_user_provided =
|
||||
if let ExtractionAction::DocumentOperation(DocumentOperation {
|
||||
remove_from_user_provided,
|
||||
}) = action
|
||||
{
|
||||
remove_from_user_provided
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
|
||||
results.push(ExtractedVectorPoints {
|
||||
manual_vectors: writer_into_reader(manual_vectors_writer)?,
|
||||
remove_vectors: writer_into_reader(remove_vectors_writer)?,
|
||||
prompts: writer_into_reader(prompts_writer)?,
|
||||
embedder,
|
||||
embedder_name,
|
||||
add_to_user_provided,
|
||||
remove_from_user_provided,
|
||||
})
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Computes the diff between both Del and Add numbers and
|
||||
/// only inserts the parts that differ in the sorter.
|
||||
fn extract_vector_document_diff(
|
||||
docid: DocumentId,
|
||||
obkv: obkv::KvReader<'_, FieldId>,
|
||||
prompt: &Prompt,
|
||||
(add_to_user_provided, remove_from_user_provided): (&mut RoaringBitmap, &mut RoaringBitmap),
|
||||
(old, new): (VectorState, VectorState),
|
||||
(old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap),
|
||||
document_id: impl Fn() -> Value,
|
||||
) -> Result<VectorStateDelta> {
|
||||
match (old.must_regenerate(), new.must_regenerate()) {
|
||||
(true, true) | (false, false) => {}
|
||||
(true, false) => {
|
||||
add_to_user_provided.insert(docid);
|
||||
}
|
||||
(false, true) => {
|
||||
remove_from_user_provided.insert(docid);
|
||||
}
|
||||
}
|
||||
|
||||
let delta = match (old, new) {
|
||||
// regardless of the previous state, if a document now contains inline _vectors, they must
|
||||
// be extracted manually
|
||||
(_old, VectorState::Inline(new)) => match new.into_array_of_vectors() {
|
||||
Some(add_vectors) => {
|
||||
if add_vectors.len() > usize::from(u8::MAX) {
|
||||
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
|
||||
document_id().to_string(),
|
||||
add_vectors.len(),
|
||||
)));
|
||||
}
|
||||
|
||||
VectorStateDelta::NowManual(add_vectors)
|
||||
}
|
||||
None => VectorStateDelta::NoChange,
|
||||
},
|
||||
// no `_vectors` anywhere, we check for document removal and otherwise we regenerate the prompt if the
|
||||
// document changed
|
||||
(VectorState::Generated, VectorState::Generated) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
|
||||
if document_is_kept {
|
||||
// Don't give up if the old prompt was failing
|
||||
let old_prompt = Some(&prompt).map(|p| {
|
||||
p.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or_default()
|
||||
});
|
||||
let new_prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
|
||||
if old_prompt.as_ref() != Some(&new_prompt) {
|
||||
let old_prompt = old_prompt.unwrap_or_default();
|
||||
tracing::trace!(
|
||||
"🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}"
|
||||
);
|
||||
VectorStateDelta::NowGenerated(new_prompt)
|
||||
} else {
|
||||
tracing::trace!("⏭️ Prompt unmodified, skipping");
|
||||
VectorStateDelta::NoChange
|
||||
}
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
}
|
||||
// inline to the left is not supposed to be possible because the embedder is not new, so `_vectors` was removed from
|
||||
// the previous version of the document.
|
||||
// Manual -> Generated is also not possible without an Inline to the right (which is handled above)
|
||||
// Generated -> Generated is handled above, so not possible
|
||||
// As a result, this code is unreachable
|
||||
(_not_generated, VectorState::Generated) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
// becomes autogenerated
|
||||
VectorStateDelta::NowGenerated(prompt.render(
|
||||
obkv,
|
||||
DelAdd::Addition,
|
||||
new_fields_ids_map,
|
||||
)?)
|
||||
} else {
|
||||
// make sure the document is always removed from user provided on removal
|
||||
remove_from_user_provided.insert(docid);
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
}
|
||||
// inline to the left is not possible because the embedder is not new, and so `_vectors` was removed from the previous
|
||||
// version of the document.
|
||||
// however the Rust type system cannot know that.
|
||||
(_manual, VectorState::Manual) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
// if the new version of documents has the vectors in the DB,
|
||||
// then they are user-provided and nothing possibly changed
|
||||
VectorStateDelta::NoChange
|
||||
} else {
|
||||
// make sure the document is always removed from user provided on removal
|
||||
remove_from_user_provided.insert(docid);
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(delta)
|
||||
}
|
||||
|
||||
fn regenerate_if_prompt_changed(
|
||||
obkv: obkv::KvReader<'_, FieldId>,
|
||||
(old_prompt, new_prompt): (&Prompt, &Prompt),
|
||||
(old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap),
|
||||
) -> Result<VectorStateDelta> {
|
||||
let old_prompt =
|
||||
old_prompt.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or(Default::default());
|
||||
let new_prompt = new_prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
|
||||
|
||||
if new_prompt == old_prompt {
|
||||
return Ok(VectorStateDelta::NoChange);
|
||||
}
|
||||
Ok(VectorStateDelta::NowGenerated(new_prompt))
|
||||
}
|
||||
|
||||
fn regenerate_prompt(
|
||||
obkv: obkv::KvReader<'_, FieldId>,
|
||||
prompt: &Prompt,
|
||||
new_fields_ids_map: &FieldsIdsMap,
|
||||
) -> Result<VectorStateDelta> {
|
||||
let prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
|
||||
|
||||
Ok(VectorStateDelta::NowGenerated(prompt))
|
||||
}
|
||||
|
||||
/// We cannot compute the diff between both Del and Add vectors.
|
||||
/// We'll push every vector and compute the difference later in TypedChunk.
|
||||
fn push_vectors_diff(
|
||||
remove_vectors_writer: &mut Writer<BufWriter<File>>,
|
||||
prompts_writer: &mut Writer<BufWriter<File>>,
|
||||
manual_vectors_writer: &mut Writer<BufWriter<File>>,
|
||||
key_buffer: &mut Vec<u8>,
|
||||
delta: VectorStateDelta,
|
||||
reindex_vectors: bool,
|
||||
) -> Result<()> {
|
||||
let (must_remove, prompt, (mut del_vectors, mut add_vectors)) = delta.into_values();
|
||||
if must_remove
|
||||
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
|
||||
// When vector pipeline will be optimized, this should be removed.
|
||||
&& !reindex_vectors
|
||||
{
|
||||
let (must_remove, prompt, mut add_vectors) = delta.into_values();
|
||||
if must_remove {
|
||||
key_buffer.truncate(TRUNCATE_SIZE);
|
||||
remove_vectors_writer.insert(&key_buffer, [])?;
|
||||
}
|
||||
|
@ -308,44 +515,22 @@ fn push_vectors_diff(
|
|||
}
|
||||
|
||||
// We sort and dedup the vectors
|
||||
del_vectors.sort_unstable_by(|a, b| compare_vectors(a, b));
|
||||
add_vectors.sort_unstable_by(|a, b| compare_vectors(a, b));
|
||||
del_vectors.dedup_by(|a, b| compare_vectors(a, b).is_eq());
|
||||
add_vectors.dedup_by(|a, b| compare_vectors(a, b).is_eq());
|
||||
|
||||
let merged_vectors_iter =
|
||||
itertools::merge_join_by(del_vectors, add_vectors, |del, add| compare_vectors(del, add));
|
||||
|
||||
// insert vectors into the writer
|
||||
for (i, eob) in merged_vectors_iter.into_iter().enumerate().take(u16::MAX as usize) {
|
||||
for (i, vector) in add_vectors.into_iter().enumerate().take(u16::MAX as usize) {
|
||||
// Generate the key by extending the unique index to it.
|
||||
key_buffer.truncate(TRUNCATE_SIZE);
|
||||
let index = u16::try_from(i).unwrap();
|
||||
key_buffer.extend_from_slice(&index.to_be_bytes());
|
||||
|
||||
match eob {
|
||||
EitherOrBoth::Both(_, _) => (), // no need to touch anything
|
||||
EitherOrBoth::Left(vector) => {
|
||||
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
|
||||
// When vector pipeline will be optimized, this should be removed.
|
||||
if !reindex_vectors {
|
||||
// We insert only the Del part of the Obkv to inform
|
||||
// that we only want to remove all those vectors.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
EitherOrBoth::Right(vector) => {
|
||||
// We insert only the Add part of the Obkv to inform
|
||||
// that we only want to remove all those vectors.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
// We insert only the Add part of the Obkv to inform
|
||||
// that we only want to remove all those vectors.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -30,6 +30,7 @@ use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids
|
|||
use self::extract_word_position_docids::extract_word_position_docids;
|
||||
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
|
||||
use super::{helpers, TypedChunk};
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder};
|
||||
|
||||
|
@ -43,6 +44,7 @@ pub(crate) fn data_from_obkv_documents(
|
|||
indexer: GrenadParameters,
|
||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||
primary_key_id: FieldId,
|
||||
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
|
||||
settings_diff: Arc<InnerIndexSettingsDiff>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
) -> Result<()> {
|
||||
|
@ -55,6 +57,7 @@ pub(crate) fn data_from_obkv_documents(
|
|||
original_documents_chunk,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
embedders_configs.clone(),
|
||||
settings_diff.clone(),
|
||||
)
|
||||
})
|
||||
|
@ -210,6 +213,7 @@ fn send_original_documents_data(
|
|||
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
|
||||
indexer: GrenadParameters,
|
||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
|
||||
settings_diff: Arc<InnerIndexSettingsDiff>,
|
||||
) -> Result<()> {
|
||||
let original_documents_chunk =
|
||||
|
@ -226,11 +230,17 @@ fn send_original_documents_data(
|
|||
|
||||
if index_vectors {
|
||||
let settings_diff = settings_diff.clone();
|
||||
let embedders_configs = embedders_configs.clone();
|
||||
|
||||
let original_documents_chunk = original_documents_chunk.clone();
|
||||
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
match extract_vector_points(original_documents_chunk.clone(), indexer, &settings_diff) {
|
||||
match extract_vector_points(
|
||||
original_documents_chunk.clone(),
|
||||
indexer,
|
||||
&embedders_configs,
|
||||
&settings_diff,
|
||||
) {
|
||||
Ok(extracted_vectors) => {
|
||||
for ExtractedVectorPoints {
|
||||
manual_vectors,
|
||||
|
@ -238,6 +248,8 @@ fn send_original_documents_data(
|
|||
prompts,
|
||||
embedder_name,
|
||||
embedder,
|
||||
add_to_user_provided,
|
||||
remove_from_user_provided,
|
||||
} in extracted_vectors
|
||||
{
|
||||
let embeddings = match extract_embeddings(
|
||||
|
@ -262,6 +274,8 @@ fn send_original_documents_data(
|
|||
expected_dimension: embedder.dimensions(),
|
||||
manual_vectors,
|
||||
embedder_name,
|
||||
add_to_user_provided,
|
||||
remove_from_user_provided,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -286,6 +286,7 @@ where
|
|||
settings_diff.new.recompute_searchables(self.wtxn, self.index)?;
|
||||
|
||||
let settings_diff = Arc::new(settings_diff);
|
||||
let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?);
|
||||
|
||||
let backup_pool;
|
||||
let pool = match self.indexer_config.thread_pool {
|
||||
|
@ -399,6 +400,7 @@ where
|
|||
pool_params,
|
||||
lmdb_writer_sx.clone(),
|
||||
primary_key_id,
|
||||
embedders_configs.clone(),
|
||||
settings_diff_cloned,
|
||||
max_positions_per_attributes,
|
||||
)
|
||||
|
@ -501,6 +503,8 @@ where
|
|||
embeddings,
|
||||
manual_vectors,
|
||||
embedder_name,
|
||||
add_to_user_provided,
|
||||
remove_from_user_provided,
|
||||
} => {
|
||||
dimension.insert(embedder_name.clone(), expected_dimension);
|
||||
TypedChunk::VectorPoints {
|
||||
|
@ -509,6 +513,8 @@ where
|
|||
expected_dimension,
|
||||
manual_vectors,
|
||||
embedder_name,
|
||||
add_to_user_provided,
|
||||
remove_from_user_provided,
|
||||
}
|
||||
}
|
||||
otherwise => otherwise,
|
||||
|
@ -781,6 +787,7 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::documents::documents_batch_reader_from_objects;
|
||||
use crate::index::tests::TempIndex;
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::search::TermsMatchingStrategy;
|
||||
use crate::update::Setting;
|
||||
use crate::{db_snap, Filter, Search};
|
||||
|
@ -2616,10 +2623,12 @@ mod tests {
|
|||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let mut embedding_configs = index.embedding_configs(&rtxn).unwrap();
|
||||
let (embedder_name, embedder) = embedding_configs.pop().unwrap();
|
||||
let IndexEmbeddingConfig { name: embedder_name, config: embedder, user_provided } =
|
||||
embedding_configs.pop().unwrap();
|
||||
insta::assert_snapshot!(embedder_name, @"manual");
|
||||
insta::assert_debug_snapshot!(user_provided, @"RoaringBitmap<[0, 1, 2]>");
|
||||
let embedder =
|
||||
std::sync::Arc::new(crate::vector::Embedder::new(embedder.embedder_options).unwrap());
|
||||
assert_eq!("manual", embedder_name);
|
||||
let res = index
|
||||
.search(&rtxn)
|
||||
.semantic(embedder_name, embedder, Some([0.0, 1.0, 2.0].to_vec()))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::borrow::Cow;
|
||||
use std::collections::btree_map::Entry as BEntry;
|
||||
use std::collections::hash_map::Entry as HEntry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek};
|
||||
|
||||
|
@ -27,6 +27,8 @@ use crate::update::del_add::{
|
|||
use crate::update::index_documents::GrenadParameters;
|
||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
||||
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
|
||||
use crate::{
|
||||
is_faceted_by, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result,
|
||||
};
|
||||
|
@ -51,7 +53,6 @@ pub struct Transform<'a, 'i> {
|
|||
fields_ids_map: FieldsIdsMap,
|
||||
|
||||
indexer_settings: &'a IndexerConfig,
|
||||
pub autogenerate_docids: bool,
|
||||
pub index_documents_method: IndexDocumentsMethod,
|
||||
available_documents_ids: AvailableDocumentsIds,
|
||||
|
||||
|
@ -105,7 +106,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
index: &'i Index,
|
||||
indexer_settings: &'a IndexerConfig,
|
||||
index_documents_method: IndexDocumentsMethod,
|
||||
autogenerate_docids: bool,
|
||||
_autogenerate_docids: bool,
|
||||
) -> Result<Self> {
|
||||
// We must choose the appropriate merge function for when two or more documents
|
||||
// with the same user id must be merged or fully replaced in the same batch.
|
||||
|
@ -139,7 +140,6 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
index,
|
||||
fields_ids_map: index.fields_ids_map(wtxn)?,
|
||||
indexer_settings,
|
||||
autogenerate_docids,
|
||||
available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids),
|
||||
original_sorter,
|
||||
flattened_sorter,
|
||||
|
@ -808,13 +808,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
let mut new_inner_settings = old_inner_settings.clone();
|
||||
new_inner_settings.fields_ids_map = fields_ids_map;
|
||||
|
||||
let embedding_configs_updated = false;
|
||||
let embedding_config_updates = Default::default();
|
||||
let settings_update_only = false;
|
||||
let settings_diff = InnerIndexSettingsDiff::new(
|
||||
old_inner_settings,
|
||||
new_inner_settings,
|
||||
primary_key_id,
|
||||
embedding_configs_updated,
|
||||
embedding_config_updates,
|
||||
settings_update_only,
|
||||
);
|
||||
|
||||
|
@ -835,10 +835,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
/// Rebind the field_ids of the provided document to their values
|
||||
/// based on the field_ids_maps difference between the old and the new settings,
|
||||
/// then fill the provided buffers with delta documents using KvWritterDelAdd.
|
||||
#[allow(clippy::too_many_arguments)] // need the vectors + fid, feel free to create a struct xo xo
|
||||
fn rebind_existing_document(
|
||||
old_obkv: KvReader<FieldId>,
|
||||
settings_diff: &InnerIndexSettingsDiff,
|
||||
modified_faceted_fields: &HashSet<String>,
|
||||
mut injected_vectors: serde_json::Map<String, serde_json::Value>,
|
||||
old_vectors_fid: Option<FieldId>,
|
||||
original_obkv_buffer: Option<&mut Vec<u8>>,
|
||||
flattened_obkv_buffer: Option<&mut Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
|
@ -861,9 +864,49 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
|
||||
// The operations that we must perform on the different fields.
|
||||
let mut operations = HashMap::new();
|
||||
let mut error_seen = false;
|
||||
|
||||
let mut obkv_writer = KvWriter::<_, FieldId>::memory();
|
||||
for (id, val) in old_obkv.iter() {
|
||||
'write_fid: for (id, val) in old_obkv.iter() {
|
||||
if !injected_vectors.is_empty() {
|
||||
'inject_vectors: {
|
||||
let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
|
||||
|
||||
if id < vectors_fid {
|
||||
break 'inject_vectors;
|
||||
}
|
||||
|
||||
let mut existing_vectors = if id == vectors_fid {
|
||||
let existing_vectors: std::result::Result<
|
||||
serde_json::Map<String, serde_json::Value>,
|
||||
serde_json::Error,
|
||||
> = serde_json::from_slice(val);
|
||||
|
||||
match existing_vectors {
|
||||
Ok(existing_vectors) => existing_vectors,
|
||||
Err(error) => {
|
||||
if !error_seen {
|
||||
tracing::error!(%error, "Unexpected `_vectors` field that is not a map. Treating as an empty map");
|
||||
error_seen = true;
|
||||
}
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
|
||||
existing_vectors.append(&mut injected_vectors);
|
||||
|
||||
operations.insert(vectors_fid, DelAddOperation::DeletionAndAddition);
|
||||
obkv_writer
|
||||
.insert(vectors_fid, serde_json::to_vec(&existing_vectors).unwrap())?;
|
||||
if id == vectors_fid {
|
||||
continue 'write_fid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if is_primary_key(id) || necessary_faceted_field(id) || reindex_vectors {
|
||||
operations.insert(id, DelAddOperation::DeletionAndAddition);
|
||||
obkv_writer.insert(id, val)?;
|
||||
|
@ -872,6 +915,15 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
obkv_writer.insert(id, val)?;
|
||||
}
|
||||
}
|
||||
if !injected_vectors.is_empty() {
|
||||
'inject_vectors: {
|
||||
let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
|
||||
|
||||
operations.insert(vectors_fid, DelAddOperation::DeletionAndAddition);
|
||||
obkv_writer.insert(vectors_fid, serde_json::to_vec(&injected_vectors).unwrap())?;
|
||||
}
|
||||
}
|
||||
|
||||
let data = obkv_writer.into_inner()?;
|
||||
let obkv = KvReader::<FieldId>::new(&data);
|
||||
|
||||
|
@ -937,6 +989,35 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
None
|
||||
};
|
||||
|
||||
let readers: Result<
|
||||
BTreeMap<&str, (Vec<arroy::Reader<arroy::distances::Angular>>, &RoaringBitmap)>,
|
||||
> = settings_diff
|
||||
.embedding_config_updates
|
||||
.iter()
|
||||
.filter_map(|(name, action)| {
|
||||
if let EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
|
||||
embedder_id,
|
||||
user_provided,
|
||||
}) = action
|
||||
{
|
||||
let readers: Result<Vec<_>> =
|
||||
self.index.arroy_readers(wtxn, *embedder_id).collect();
|
||||
match readers {
|
||||
Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))),
|
||||
Err(error) => Some(Err(error)),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let readers = readers?;
|
||||
|
||||
let old_vectors_fid = settings_diff
|
||||
.old
|
||||
.fields_ids_map
|
||||
.id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME);
|
||||
|
||||
// We initialize the sorter with the user indexing settings.
|
||||
let mut flattened_sorter =
|
||||
if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
|
||||
|
@ -963,10 +1044,50 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
let injected_vectors: std::result::Result<
|
||||
serde_json::Map<String, serde_json::Value>,
|
||||
arroy::Error,
|
||||
> = readers
|
||||
.iter()
|
||||
.filter_map(|(name, (readers, user_provided))| {
|
||||
if !user_provided.contains(docid) {
|
||||
return None;
|
||||
}
|
||||
let mut vectors = Vec::new();
|
||||
for reader in readers {
|
||||
let Some(vector) = reader.item_vector(wtxn, docid).transpose() else {
|
||||
break;
|
||||
};
|
||||
|
||||
match vector {
|
||||
Ok(vector) => vectors.push(vector),
|
||||
Err(error) => return Some(Err(error)),
|
||||
}
|
||||
}
|
||||
if vectors.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(Ok((
|
||||
name.to_string(),
|
||||
serde_json::to_value(ExplicitVectors {
|
||||
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
|
||||
vectors,
|
||||
)),
|
||||
regenerate: false,
|
||||
})
|
||||
.unwrap(),
|
||||
)))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let injected_vectors = injected_vectors?;
|
||||
|
||||
Self::rebind_existing_document(
|
||||
old_obkv,
|
||||
&settings_diff,
|
||||
&modified_faceted_fields,
|
||||
injected_vectors,
|
||||
old_vectors_fid,
|
||||
Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()),
|
||||
Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()),
|
||||
)?;
|
||||
|
@ -983,6 +1104,23 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
}
|
||||
}
|
||||
|
||||
let mut writers = Vec::new();
|
||||
|
||||
// delete all vectors from the embedders that need removal
|
||||
for (_, (readers, _)) in readers {
|
||||
for reader in readers {
|
||||
let dimensions = reader.dimensions();
|
||||
let arroy_index = reader.index();
|
||||
drop(reader);
|
||||
let writer = arroy::Writer::new(self.index.vector_arroy, arroy_index, dimensions);
|
||||
writers.push(writer);
|
||||
}
|
||||
}
|
||||
|
||||
for writer in writers {
|
||||
writer.clear(wtxn)?;
|
||||
}
|
||||
|
||||
let grenad_params = GrenadParameters {
|
||||
chunk_compression_type: self.indexer_settings.chunk_compression_type,
|
||||
chunk_compression_level: self.indexer_settings.chunk_compression_level,
|
||||
|
|
|
@ -20,6 +20,7 @@ use super::MergeFn;
|
|||
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
||||
use crate::facet::FacetType;
|
||||
use crate::index::db_name::DOCUMENTS;
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::proximity::MAX_DISTANCE;
|
||||
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
|
||||
use crate::update::facet::FacetsUpdate;
|
||||
|
@ -90,6 +91,8 @@ pub(crate) enum TypedChunk {
|
|||
expected_dimension: usize,
|
||||
manual_vectors: grenad::Reader<BufReader<File>>,
|
||||
embedder_name: String,
|
||||
add_to_user_provided: RoaringBitmap,
|
||||
remove_from_user_provided: RoaringBitmap,
|
||||
},
|
||||
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
|
||||
}
|
||||
|
@ -154,8 +157,11 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
let mut docids = index.documents_ids(wtxn)?;
|
||||
let mut iter = merger.into_stream_merger_iter()?;
|
||||
|
||||
let embedders: BTreeSet<_> =
|
||||
index.embedding_configs(wtxn)?.into_iter().map(|(k, _v)| k).collect();
|
||||
let embedders: BTreeSet<_> = index
|
||||
.embedding_configs(wtxn)?
|
||||
.into_iter()
|
||||
.map(|IndexEmbeddingConfig { name, .. }| name)
|
||||
.collect();
|
||||
let mut vectors_buffer = Vec::new();
|
||||
while let Some((key, reader)) = iter.next()? {
|
||||
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
|
||||
|
@ -181,7 +187,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
// if the `_vectors` field cannot be parsed as map of vectors, just write it as-is
|
||||
break 'vectors Some(addition);
|
||||
};
|
||||
vectors.retain_user_provided_vectors(&embedders);
|
||||
vectors.retain_not_embedded_vectors(&embedders);
|
||||
let crate::vector::parsed_vectors::ParsedVectors(vectors) = vectors;
|
||||
if vectors.is_empty() {
|
||||
// skip writing empty `_vectors` map
|
||||
|
@ -619,6 +625,8 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
let mut remove_vectors_builder = MergerBuilder::new(keep_first as MergeFn);
|
||||
let mut manual_vectors_builder = MergerBuilder::new(keep_first as MergeFn);
|
||||
let mut embeddings_builder = MergerBuilder::new(keep_first as MergeFn);
|
||||
let mut add_to_user_provided = RoaringBitmap::new();
|
||||
let mut remove_from_user_provided = RoaringBitmap::new();
|
||||
let mut params = None;
|
||||
for typed_chunk in typed_chunks {
|
||||
let TypedChunk::VectorPoints {
|
||||
|
@ -627,6 +635,8 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
embeddings,
|
||||
expected_dimension,
|
||||
embedder_name,
|
||||
add_to_user_provided: aud,
|
||||
remove_from_user_provided: rud,
|
||||
} = typed_chunk
|
||||
else {
|
||||
unreachable!();
|
||||
|
@ -639,11 +649,23 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||
if let Some(embeddings) = embeddings {
|
||||
embeddings_builder.push(embeddings.into_cursor()?);
|
||||
}
|
||||
add_to_user_provided |= aud;
|
||||
remove_from_user_provided |= rud;
|
||||
}
|
||||
|
||||
// typed chunks has always at least 1 chunk.
|
||||
let Some((expected_dimension, embedder_name)) = params else { unreachable!() };
|
||||
|
||||
let mut embedding_configs = index.embedding_configs(wtxn)?;
|
||||
let index_embedder_config = embedding_configs
|
||||
.iter_mut()
|
||||
.find(|IndexEmbeddingConfig { name, .. }| name == &embedder_name)
|
||||
.unwrap();
|
||||
index_embedder_config.user_provided -= remove_from_user_provided;
|
||||
index_embedder_config.user_provided |= add_to_user_provided;
|
||||
|
||||
index.put_embedding_configs(wtxn, embedding_configs)?;
|
||||
|
||||
let embedder_index = index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or(
|
||||
InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None },
|
||||
)?;
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::sync::Arc;
|
|||
use charabia::{Normalize, Tokenizer, TokenizerBuilder};
|
||||
use deserr::{DeserializeError, Deserr};
|
||||
use itertools::{EitherOrBoth, Itertools};
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
|
@ -14,12 +15,18 @@ use super::index_documents::{IndexDocumentsConfig, Transform};
|
|||
use super::IndexerConfig;
|
||||
use crate::criterion::Criterion;
|
||||
use crate::error::UserError;
|
||||
use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS};
|
||||
use crate::index::{
|
||||
IndexEmbeddingConfig, DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS,
|
||||
};
|
||||
use crate::order_by_map::OrderByMap;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::update::index_documents::IndexDocumentsMethod;
|
||||
use crate::update::{IndexDocuments, UpdateIndexingStep};
|
||||
use crate::vector::settings::{check_set, check_unset, EmbedderSource, EmbeddingSettings};
|
||||
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
|
||||
use crate::vector::settings::{
|
||||
check_set, check_unset, EmbedderAction, EmbedderSource, EmbeddingSettings, ReindexAction,
|
||||
WriteBackToDocuments,
|
||||
};
|
||||
use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs};
|
||||
use crate::{FieldId, FieldsIdsMap, Index, Result};
|
||||
|
||||
|
@ -490,6 +497,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
self.index.put_all_searchable_fields_from_fields_ids_map(
|
||||
self.wtxn,
|
||||
&names,
|
||||
&fields_ids_map.nested_ids(RESERVED_VECTORS_FIELD_NAME),
|
||||
&fields_ids_map,
|
||||
)?;
|
||||
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
|
||||
|
@ -919,92 +927,177 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
Ok(changed)
|
||||
}
|
||||
|
||||
fn update_embedding_configs(&mut self) -> Result<bool> {
|
||||
let update = match std::mem::take(&mut self.embedder_settings) {
|
||||
Setting::Set(configs) => {
|
||||
let mut changed = false;
|
||||
fn update_embedding_configs(&mut self) -> Result<BTreeMap<String, EmbedderAction>> {
|
||||
match std::mem::take(&mut self.embedder_settings) {
|
||||
Setting::Set(configs) => self.update_embedding_configs_set(configs),
|
||||
Setting::Reset => {
|
||||
// all vectors should be written back to documents
|
||||
let old_configs = self.index.embedding_configs(self.wtxn)?;
|
||||
let old_configs: BTreeMap<String, Setting<EmbeddingSettings>> =
|
||||
old_configs.into_iter().map(|(k, v)| (k, Setting::Set(v.into()))).collect();
|
||||
|
||||
let mut new_configs = BTreeMap::new();
|
||||
for joined in old_configs
|
||||
let remove_all: Result<BTreeMap<String, EmbedderAction>> = old_configs
|
||||
.into_iter()
|
||||
.merge_join_by(configs.into_iter(), |(left, _), (right, _)| left.cmp(right))
|
||||
{
|
||||
match joined {
|
||||
// updated config
|
||||
EitherOrBoth::Both((name, mut old), (_, new)) => {
|
||||
changed |= EmbeddingSettings::apply_and_need_reindex(&mut old, new);
|
||||
if changed {
|
||||
tracing::debug!(embedder = name, "need reindex");
|
||||
} else {
|
||||
tracing::debug!(embedder = name, "skip reindex");
|
||||
}
|
||||
let new = validate_embedding_settings(old, &name)?;
|
||||
new_configs.insert(name, new);
|
||||
}
|
||||
// unchanged config
|
||||
EitherOrBoth::Left((name, setting)) => {
|
||||
new_configs.insert(name, setting);
|
||||
}
|
||||
// new config
|
||||
EitherOrBoth::Right((name, mut setting)) => {
|
||||
// apply the default source in case the source was not set so that it gets validated
|
||||
crate::vector::settings::EmbeddingSettings::apply_default_source(
|
||||
&mut setting,
|
||||
);
|
||||
crate::vector::settings::EmbeddingSettings::apply_default_openai_model(
|
||||
&mut setting,
|
||||
);
|
||||
let setting = validate_embedding_settings(setting, &name)?;
|
||||
changed = true;
|
||||
new_configs.insert(name, setting);
|
||||
}
|
||||
}
|
||||
}
|
||||
let new_configs: Vec<(String, EmbeddingConfig)> = new_configs
|
||||
.into_iter()
|
||||
.filter_map(|(name, setting)| match setting {
|
||||
Setting::Set(value) => Some((name, value.into())),
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => Some((name, EmbeddingSettings::default().into())),
|
||||
.map(|IndexEmbeddingConfig { name, config: _, user_provided }| -> Result<_> {
|
||||
let embedder_id =
|
||||
self.index.embedder_category_id.get(self.wtxn, &name)?.ok_or(
|
||||
crate::InternalError::DatabaseMissingEntry {
|
||||
db_name: crate::index::db_name::VECTOR_EMBEDDER_CATEGORY_ID,
|
||||
key: None,
|
||||
},
|
||||
)?;
|
||||
Ok((
|
||||
name,
|
||||
EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
|
||||
embedder_id,
|
||||
user_provided,
|
||||
}),
|
||||
))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let remove_all = remove_all?;
|
||||
|
||||
self.index.embedder_category_id.clear(self.wtxn)?;
|
||||
for (index, (embedder_name, _)) in new_configs.iter().enumerate() {
|
||||
self.index.embedder_category_id.put_with_flags(
|
||||
self.wtxn,
|
||||
heed::PutFlags::APPEND,
|
||||
embedder_name,
|
||||
&index
|
||||
.try_into()
|
||||
.map_err(|_| UserError::TooManyEmbedders(new_configs.len()))?,
|
||||
)?;
|
||||
}
|
||||
|
||||
if new_configs.is_empty() {
|
||||
self.index.delete_embedding_configs(self.wtxn)?;
|
||||
} else {
|
||||
self.index.put_embedding_configs(self.wtxn, new_configs)?;
|
||||
}
|
||||
changed
|
||||
}
|
||||
Setting::Reset => {
|
||||
self.index.delete_embedding_configs(self.wtxn)?;
|
||||
true
|
||||
Ok(remove_all)
|
||||
}
|
||||
Setting::NotSet => false,
|
||||
};
|
||||
|
||||
// if any changes force a reindexing
|
||||
// clear the vector database.
|
||||
if update {
|
||||
self.index.vector_arroy.clear(self.wtxn)?;
|
||||
Setting::NotSet => Ok(Default::default()),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(update)
|
||||
fn update_embedding_configs_set(
|
||||
&mut self,
|
||||
configs: BTreeMap<String, Setting<EmbeddingSettings>>,
|
||||
) -> Result<BTreeMap<String, EmbedderAction>> {
|
||||
use crate::vector::settings::SettingsDiff;
|
||||
|
||||
let old_configs = self.index.embedding_configs(self.wtxn)?;
|
||||
let old_configs: BTreeMap<String, (EmbeddingSettings, RoaringBitmap)> = old_configs
|
||||
.into_iter()
|
||||
.map(|IndexEmbeddingConfig { name, config, user_provided }| {
|
||||
(name, (config.into(), user_provided))
|
||||
})
|
||||
.collect();
|
||||
let mut updated_configs = BTreeMap::new();
|
||||
let mut embedder_actions = BTreeMap::new();
|
||||
for joined in old_configs
|
||||
.into_iter()
|
||||
.merge_join_by(configs.into_iter(), |(left, _), (right, _)| left.cmp(right))
|
||||
{
|
||||
match joined {
|
||||
// updated config
|
||||
EitherOrBoth::Both((name, (old, user_provided)), (_, new)) => {
|
||||
let settings_diff = SettingsDiff::from_settings(old, new);
|
||||
match settings_diff {
|
||||
SettingsDiff::Remove => {
|
||||
tracing::debug!(
|
||||
embedder = name,
|
||||
user_provided = user_provided.len(),
|
||||
"removing embedder"
|
||||
);
|
||||
let embedder_id =
|
||||
self.index.embedder_category_id.get(self.wtxn, &name)?.ok_or(
|
||||
crate::InternalError::DatabaseMissingEntry {
|
||||
db_name: crate::index::db_name::VECTOR_EMBEDDER_CATEGORY_ID,
|
||||
key: None,
|
||||
},
|
||||
)?;
|
||||
// free id immediately
|
||||
self.index.embedder_category_id.delete(self.wtxn, &name)?;
|
||||
embedder_actions.insert(
|
||||
name,
|
||||
EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
|
||||
embedder_id,
|
||||
user_provided,
|
||||
}),
|
||||
);
|
||||
}
|
||||
SettingsDiff::Reindex { action, updated_settings } => {
|
||||
tracing::debug!(
|
||||
embedder = name,
|
||||
user_provided = user_provided.len(),
|
||||
?action,
|
||||
"reindex embedder"
|
||||
);
|
||||
embedder_actions.insert(name.clone(), EmbedderAction::Reindex(action));
|
||||
let new =
|
||||
validate_embedding_settings(Setting::Set(updated_settings), &name)?;
|
||||
updated_configs.insert(name, (new, user_provided));
|
||||
}
|
||||
SettingsDiff::UpdateWithoutReindex { updated_settings } => {
|
||||
tracing::debug!(
|
||||
embedder = name,
|
||||
user_provided = user_provided.len(),
|
||||
"update without reindex embedder"
|
||||
);
|
||||
let new =
|
||||
validate_embedding_settings(Setting::Set(updated_settings), &name)?;
|
||||
updated_configs.insert(name, (new, user_provided));
|
||||
}
|
||||
}
|
||||
}
|
||||
// unchanged config
|
||||
EitherOrBoth::Left((name, (setting, user_provided))) => {
|
||||
tracing::debug!(embedder = name, "unchanged embedder");
|
||||
updated_configs.insert(name, (Setting::Set(setting), user_provided));
|
||||
}
|
||||
// new config
|
||||
EitherOrBoth::Right((name, mut setting)) => {
|
||||
tracing::debug!(embedder = name, "new embedder");
|
||||
// apply the default source in case the source was not set so that it gets validated
|
||||
crate::vector::settings::EmbeddingSettings::apply_default_source(&mut setting);
|
||||
crate::vector::settings::EmbeddingSettings::apply_default_openai_model(
|
||||
&mut setting,
|
||||
);
|
||||
let setting = validate_embedding_settings(setting, &name)?;
|
||||
embedder_actions
|
||||
.insert(name.clone(), EmbedderAction::Reindex(ReindexAction::FullReindex));
|
||||
updated_configs.insert(name, (setting, RoaringBitmap::new()));
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut free_indices: [bool; u8::MAX as usize] = [true; u8::MAX as usize];
|
||||
for res in self.index.embedder_category_id.iter(self.wtxn)? {
|
||||
let (_name, id) = res?;
|
||||
free_indices[id as usize] = false;
|
||||
}
|
||||
let mut free_indices = free_indices.iter_mut().enumerate();
|
||||
let mut find_free_index =
|
||||
move || free_indices.find(|(_, free)| **free).map(|(index, _)| index as u8);
|
||||
for (name, action) in embedder_actions.iter() {
|
||||
match action {
|
||||
EmbedderAction::Reindex(ReindexAction::RegeneratePrompts) => {
|
||||
/* cannot be a new embedder, so has to have an id already */
|
||||
}
|
||||
EmbedderAction::Reindex(ReindexAction::FullReindex) => {
|
||||
if self.index.embedder_category_id.get(self.wtxn, name)?.is_none() {
|
||||
let id = find_free_index()
|
||||
.ok_or(UserError::TooManyEmbedders(updated_configs.len()))?;
|
||||
tracing::debug!(embedder = name, id, "assigning free id to new embedder");
|
||||
self.index.embedder_category_id.put(self.wtxn, name, &id)?;
|
||||
}
|
||||
}
|
||||
EmbedderAction::WriteBackToDocuments(_) => { /* already removed */ }
|
||||
}
|
||||
}
|
||||
let updated_configs: Vec<IndexEmbeddingConfig> = updated_configs
|
||||
.into_iter()
|
||||
.filter_map(|(name, (config, user_provided))| match config {
|
||||
Setting::Set(config) => {
|
||||
Some(IndexEmbeddingConfig { name, config: config.into(), user_provided })
|
||||
}
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => Some(IndexEmbeddingConfig {
|
||||
name,
|
||||
config: EmbeddingSettings::default().into(),
|
||||
user_provided,
|
||||
}),
|
||||
})
|
||||
.collect();
|
||||
if updated_configs.is_empty() {
|
||||
self.index.delete_embedding_configs(self.wtxn)?;
|
||||
} else {
|
||||
self.index.put_embedding_configs(self.wtxn, updated_configs)?;
|
||||
}
|
||||
Ok(embedder_actions)
|
||||
}
|
||||
|
||||
fn update_search_cutoff(&mut self) -> Result<bool> {
|
||||
|
@ -1058,13 +1151,8 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
self.update_searchable()?;
|
||||
self.update_exact_attributes()?;
|
||||
self.update_proximity_precision()?;
|
||||
// TODO: very rough approximation of the needs for reindexing where any change will result in
|
||||
// a full reindexing.
|
||||
// What can be done instead:
|
||||
// 1. Only change the distance on a distance change
|
||||
// 2. Only change the name -> embedder mapping on a name change
|
||||
// 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage
|
||||
let embedding_configs_updated = self.update_embedding_configs()?;
|
||||
|
||||
let embedding_config_updates = self.update_embedding_configs()?;
|
||||
|
||||
let mut new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?;
|
||||
new_inner_settings.recompute_facets(self.wtxn, self.index)?;
|
||||
|
@ -1078,7 +1166,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||
old_inner_settings,
|
||||
new_inner_settings,
|
||||
primary_key_id,
|
||||
embedding_configs_updated,
|
||||
embedding_config_updates,
|
||||
settings_update_only,
|
||||
);
|
||||
|
||||
|
@ -1094,8 +1182,7 @@ pub struct InnerIndexSettingsDiff {
|
|||
pub(crate) old: InnerIndexSettings,
|
||||
pub(crate) new: InnerIndexSettings,
|
||||
pub(crate) primary_key_id: Option<FieldId>,
|
||||
// TODO: compare directly the embedders.
|
||||
pub(crate) embedding_configs_updated: bool,
|
||||
pub(crate) embedding_config_updates: BTreeMap<String, EmbedderAction>,
|
||||
pub(crate) settings_update_only: bool,
|
||||
/// The set of only the additional searchable fields.
|
||||
/// If any other searchable field has been modified, is set to None.
|
||||
|
@ -1116,7 +1203,7 @@ impl InnerIndexSettingsDiff {
|
|||
old_settings: InnerIndexSettings,
|
||||
new_settings: InnerIndexSettings,
|
||||
primary_key_id: Option<FieldId>,
|
||||
embedding_configs_updated: bool,
|
||||
embedding_config_updates: BTreeMap<String, EmbedderAction>,
|
||||
settings_update_only: bool,
|
||||
) -> Self {
|
||||
let only_additional_fields = match (
|
||||
|
@ -1153,7 +1240,7 @@ impl InnerIndexSettingsDiff {
|
|||
old: old_settings,
|
||||
new: new_settings,
|
||||
primary_key_id,
|
||||
embedding_configs_updated,
|
||||
embedding_config_updates,
|
||||
settings_update_only,
|
||||
only_additional_fields,
|
||||
cache_reindex_searchable_without_user_defined,
|
||||
|
@ -1220,7 +1307,7 @@ impl InnerIndexSettingsDiff {
|
|||
}
|
||||
|
||||
pub fn reindex_vectors(&self) -> bool {
|
||||
self.embedding_configs_updated
|
||||
!self.embedding_config_updates.is_empty()
|
||||
}
|
||||
|
||||
pub fn settings_update_only(&self) -> bool {
|
||||
|
@ -1252,6 +1339,8 @@ pub(crate) struct InnerIndexSettings {
|
|||
pub embedding_configs: EmbeddingConfigs,
|
||||
pub existing_fields: HashSet<String>,
|
||||
pub geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
pub non_searchable_fields_ids: Vec<FieldId>,
|
||||
pub non_faceted_fields_ids: Vec<FieldId>,
|
||||
}
|
||||
|
||||
impl InnerIndexSettings {
|
||||
|
@ -1265,8 +1354,8 @@ impl InnerIndexSettings {
|
|||
let user_defined_searchable_fields =
|
||||
user_defined_searchable_fields.map(|sf| sf.into_iter().map(String::from).collect());
|
||||
let user_defined_faceted_fields = index.user_defined_faceted_fields(rtxn)?;
|
||||
let searchable_fields_ids = index.searchable_fields_ids(rtxn)?;
|
||||
let faceted_fields_ids = index.faceted_fields_ids(rtxn)?;
|
||||
let mut searchable_fields_ids = index.searchable_fields_ids(rtxn)?;
|
||||
let mut faceted_fields_ids = index.faceted_fields_ids(rtxn)?;
|
||||
let exact_attributes = index.exact_attributes_ids(rtxn)?;
|
||||
let proximity_precision = index.proximity_precision(rtxn)?.unwrap_or_default();
|
||||
let embedding_configs = embedders(index.embedding_configs(rtxn)?)?;
|
||||
|
@ -1294,6 +1383,10 @@ impl InnerIndexSettings {
|
|||
None => None,
|
||||
};
|
||||
|
||||
let vectors_fids = fields_ids_map.nested_ids(RESERVED_VECTORS_FIELD_NAME);
|
||||
searchable_fields_ids.retain(|id| !vectors_fids.contains(id));
|
||||
faceted_fields_ids.retain(|id| !vectors_fids.contains(id));
|
||||
|
||||
Ok(Self {
|
||||
stop_words,
|
||||
allowed_separators,
|
||||
|
@ -1308,6 +1401,8 @@ impl InnerIndexSettings {
|
|||
embedding_configs,
|
||||
existing_fields,
|
||||
geo_fields_ids,
|
||||
non_searchable_fields_ids: vectors_fids.clone(),
|
||||
non_faceted_fields_ids: vectors_fids.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1315,9 +1410,10 @@ impl InnerIndexSettings {
|
|||
pub fn recompute_facets(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> {
|
||||
let new_facets = self
|
||||
.fields_ids_map
|
||||
.names()
|
||||
.filter(|&field| crate::is_faceted(field, &self.user_defined_faceted_fields))
|
||||
.map(|field| field.to_string())
|
||||
.iter()
|
||||
.filter(|(fid, _field)| !self.non_faceted_fields_ids.contains(fid))
|
||||
.filter(|(_fid, field)| crate::is_faceted(field, &self.user_defined_faceted_fields))
|
||||
.map(|(_fid, field)| field.to_string())
|
||||
.collect();
|
||||
index.put_faceted_fields(wtxn, &new_facets)?;
|
||||
|
||||
|
@ -1337,6 +1433,7 @@ impl InnerIndexSettings {
|
|||
index.put_all_searchable_fields_from_fields_ids_map(
|
||||
wtxn,
|
||||
&searchable_fields,
|
||||
&self.non_searchable_fields_ids,
|
||||
&self.fields_ids_map,
|
||||
)?;
|
||||
}
|
||||
|
@ -1347,19 +1444,25 @@ impl InnerIndexSettings {
|
|||
}
|
||||
}
|
||||
|
||||
fn embedders(embedding_configs: Vec<(String, EmbeddingConfig)>) -> Result<EmbeddingConfigs> {
|
||||
fn embedders(embedding_configs: Vec<IndexEmbeddingConfig>) -> Result<EmbeddingConfigs> {
|
||||
let res: Result<_> = embedding_configs
|
||||
.into_iter()
|
||||
.map(|(name, EmbeddingConfig { embedder_options, prompt })| {
|
||||
let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?);
|
||||
.map(
|
||||
|IndexEmbeddingConfig {
|
||||
name,
|
||||
config: EmbeddingConfig { embedder_options, prompt },
|
||||
..
|
||||
}| {
|
||||
let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?);
|
||||
|
||||
let embedder = Arc::new(
|
||||
Embedder::new(embedder_options.clone())
|
||||
.map_err(crate::vector::Error::from)
|
||||
.map_err(crate::Error::from)?,
|
||||
);
|
||||
Ok((name, (embedder, prompt)))
|
||||
})
|
||||
let embedder = Arc::new(
|
||||
Embedder::new(embedder_options.clone())
|
||||
.map_err(crate::vector::Error::from)
|
||||
.map_err(crate::Error::from)?,
|
||||
);
|
||||
Ok((name, (embedder, prompt)))
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
res.map(EmbeddingConfigs::new)
|
||||
}
|
||||
|
|
|
@ -152,6 +152,10 @@ impl EmbeddingConfigs {
|
|||
&self.0
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> HashMap<String, (Arc<Embedder>, Arc<Prompt>)> {
|
||||
self.0
|
||||
}
|
||||
|
||||
/// Get the name of the default embedder configuration.
|
||||
///
|
||||
/// The default embedder is determined as follows:
|
||||
|
|
|
@ -4,8 +4,9 @@ use obkv::KvReader;
|
|||
use serde_json::{from_slice, Value};
|
||||
|
||||
use super::Embedding;
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||
use crate::{FieldId, InternalError, UserError};
|
||||
use crate::{DocumentId, FieldId, InternalError, UserError};
|
||||
|
||||
pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors";
|
||||
|
||||
|
@ -17,11 +18,20 @@ pub enum Vectors {
|
|||
}
|
||||
|
||||
impl Vectors {
|
||||
pub fn into_array_of_vectors(self) -> Vec<Embedding> {
|
||||
pub fn must_regenerate(&self) -> bool {
|
||||
match self {
|
||||
Vectors::ImplicitlyUserProvided(embeddings)
|
||||
| Vectors::Explicit(ExplicitVectors { embeddings, user_provided: _ }) => {
|
||||
embeddings.into_array_of_vectors().unwrap_or_default()
|
||||
Vectors::ImplicitlyUserProvided(_) => false,
|
||||
Vectors::Explicit(ExplicitVectors { regenerate, .. }) => *regenerate,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_array_of_vectors(self) -> Option<Vec<Embedding>> {
|
||||
match self {
|
||||
Vectors::ImplicitlyUserProvided(embeddings) => {
|
||||
Some(embeddings.into_array_of_vectors().unwrap_or_default())
|
||||
}
|
||||
Vectors::Explicit(ExplicitVectors { embeddings, regenerate: _ }) => {
|
||||
embeddings.map(|embeddings| embeddings.into_array_of_vectors().unwrap_or_default())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,22 +40,46 @@ impl Vectors {
|
|||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ExplicitVectors {
|
||||
pub embeddings: VectorOrArrayOfVectors,
|
||||
pub user_provided: bool,
|
||||
pub embeddings: Option<VectorOrArrayOfVectors>,
|
||||
pub regenerate: bool,
|
||||
}
|
||||
|
||||
pub enum VectorState {
|
||||
Inline(Vectors),
|
||||
Manual,
|
||||
Generated,
|
||||
}
|
||||
|
||||
impl VectorState {
|
||||
pub fn must_regenerate(&self) -> bool {
|
||||
match self {
|
||||
VectorState::Inline(vectors) => vectors.must_regenerate(),
|
||||
VectorState::Manual => false,
|
||||
VectorState::Generated => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum VectorsState {
|
||||
NoVectorsFid,
|
||||
NoVectorsFieldInDocument,
|
||||
Vectors(BTreeMap<String, Vectors>),
|
||||
}
|
||||
|
||||
pub struct ParsedVectorsDiff {
|
||||
pub old: Option<BTreeMap<String, Vectors>>,
|
||||
pub new: Option<BTreeMap<String, Vectors>>,
|
||||
old: BTreeMap<String, VectorState>,
|
||||
new: VectorsState,
|
||||
}
|
||||
|
||||
impl ParsedVectorsDiff {
|
||||
pub fn new(
|
||||
docid: DocumentId,
|
||||
embedders_configs: &[IndexEmbeddingConfig],
|
||||
documents_diff: KvReader<'_, FieldId>,
|
||||
old_vectors_fid: Option<FieldId>,
|
||||
new_vectors_fid: Option<FieldId>,
|
||||
) -> Result<Self, Error> {
|
||||
let old = match old_vectors_fid
|
||||
let mut old = match old_vectors_fid
|
||||
.and_then(|vectors_fid| documents_diff.get(vectors_fid))
|
||||
.map(KvReaderDelAdd::new)
|
||||
.map(|obkv| to_vector_map(obkv, DelAdd::Deletion))
|
||||
|
@ -61,19 +95,54 @@ impl ParsedVectorsDiff {
|
|||
return Err(error);
|
||||
}
|
||||
}
|
||||
.flatten();
|
||||
let new = new_vectors_fid
|
||||
.and_then(|vectors_fid| documents_diff.get(vectors_fid))
|
||||
.map(KvReaderDelAdd::new)
|
||||
.map(|obkv| to_vector_map(obkv, DelAdd::Addition))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
.flatten().map_or(BTreeMap::default(), |del| del.into_iter().map(|(name, vec)| (name, VectorState::Inline(vec))).collect());
|
||||
for embedding_config in embedders_configs {
|
||||
if embedding_config.user_provided.contains(docid) {
|
||||
old.entry(embedding_config.name.to_string()).or_insert(VectorState::Manual);
|
||||
}
|
||||
}
|
||||
|
||||
let new = 'new: {
|
||||
let Some(new_vectors_fid) = new_vectors_fid else {
|
||||
break 'new VectorsState::NoVectorsFid;
|
||||
};
|
||||
let Some(bytes) = documents_diff.get(new_vectors_fid) else {
|
||||
break 'new VectorsState::NoVectorsFieldInDocument;
|
||||
};
|
||||
let obkv = KvReaderDelAdd::new(bytes);
|
||||
match to_vector_map(obkv, DelAdd::Addition)? {
|
||||
Some(new) => VectorsState::Vectors(new),
|
||||
None => VectorsState::NoVectorsFieldInDocument,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self { old, new })
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, embedder_name: &str) -> (Option<Vectors>, Option<Vectors>) {
|
||||
let old = self.old.as_mut().and_then(|old| old.remove(embedder_name));
|
||||
let new = self.new.as_mut().and_then(|new| new.remove(embedder_name));
|
||||
pub fn remove(&mut self, embedder_name: &str) -> (VectorState, VectorState) {
|
||||
let old = self.old.remove(embedder_name).unwrap_or(VectorState::Generated);
|
||||
let state_from_old = match old {
|
||||
// assume a userProvided is still userProvided
|
||||
VectorState::Manual => VectorState::Manual,
|
||||
// generated is still generated
|
||||
VectorState::Generated => VectorState::Generated,
|
||||
// weird case that shouldn't happen were the previous docs version is inline,
|
||||
// but it was removed in the new version
|
||||
// Since it is not in the new version, we switch to generated
|
||||
VectorState::Inline(_) => VectorState::Generated,
|
||||
};
|
||||
let new = match &mut self.new {
|
||||
VectorsState::Vectors(new) => {
|
||||
new.remove(embedder_name).map(VectorState::Inline).unwrap_or(state_from_old)
|
||||
}
|
||||
_ =>
|
||||
// if no `_vectors` field is present in the new document,
|
||||
// the state depends on the previous version of the document
|
||||
{
|
||||
state_from_old
|
||||
}
|
||||
};
|
||||
|
||||
(old, new)
|
||||
}
|
||||
}
|
||||
|
@ -89,15 +158,8 @@ impl ParsedVectors {
|
|||
Ok(ParsedVectors(value))
|
||||
}
|
||||
|
||||
pub fn retain_user_provided_vectors(&mut self, embedders: &BTreeSet<String>) {
|
||||
self.0.retain(|k, v| match v {
|
||||
Vectors::ImplicitlyUserProvided(_) => true,
|
||||
Vectors::Explicit(ExplicitVectors { embeddings: _, user_provided }) => {
|
||||
*user_provided
|
||||
// if the embedder is not in the config, then never touch it
|
||||
|| !embedders.contains(k)
|
||||
}
|
||||
});
|
||||
pub fn retain_not_embedded_vectors(&mut self, embedders: &BTreeSet<String>) {
|
||||
self.0.retain(|k, _v| !embedders.contains(k))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,6 +212,22 @@ impl VectorOrArrayOfVectors {
|
|||
pub fn from_array_of_vectors(array_of_vec: Vec<Embedding>) -> Self {
|
||||
Self { inner: Some(either::Either::Left(array_of_vec)) }
|
||||
}
|
||||
|
||||
pub fn from_vector(vec: Embedding) -> Self {
|
||||
Self { inner: Some(either::Either::Right(vec)) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Embedding> for VectorOrArrayOfVectors {
|
||||
fn from(vec: Embedding) -> Self {
|
||||
Self::from_vector(vec)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<Embedding>> for VectorOrArrayOfVectors {
|
||||
fn from(vec: Vec<Embedding>) -> Self {
|
||||
Self::from_array_of_vectors(vec)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use deserr::Deserr;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::rest::InputType;
|
||||
|
@ -72,6 +73,238 @@ pub fn check_unset<T>(
|
|||
}
|
||||
}
|
||||
|
||||
/// Indicates what action should take place during a reindexing operation for an embedder
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum ReindexAction {
|
||||
/// An indexing operation should take place for this embedder, keeping existing vectors
|
||||
/// and checking whether the document template changed or not
|
||||
RegeneratePrompts,
|
||||
/// An indexing operation should take place for all documents for this embedder, removing existing vectors
|
||||
/// (except userProvided ones)
|
||||
FullReindex,
|
||||
}
|
||||
|
||||
pub enum SettingsDiff {
|
||||
Remove,
|
||||
Reindex { action: ReindexAction, updated_settings: EmbeddingSettings },
|
||||
UpdateWithoutReindex { updated_settings: EmbeddingSettings },
|
||||
}
|
||||
|
||||
pub enum EmbedderAction {
|
||||
WriteBackToDocuments(WriteBackToDocuments),
|
||||
Reindex(ReindexAction),
|
||||
}
|
||||
|
||||
pub struct WriteBackToDocuments {
|
||||
pub embedder_id: u8,
|
||||
pub user_provided: RoaringBitmap,
|
||||
}
|
||||
|
||||
impl SettingsDiff {
|
||||
pub fn from_settings(old: EmbeddingSettings, new: Setting<EmbeddingSettings>) -> Self {
|
||||
match new {
|
||||
Setting::Set(new) => {
|
||||
let EmbeddingSettings {
|
||||
mut source,
|
||||
mut model,
|
||||
mut revision,
|
||||
mut api_key,
|
||||
mut dimensions,
|
||||
mut document_template,
|
||||
mut url,
|
||||
mut query,
|
||||
mut input_field,
|
||||
mut path_to_embeddings,
|
||||
mut embedding_object,
|
||||
mut input_type,
|
||||
mut distribution,
|
||||
} = old;
|
||||
|
||||
let EmbeddingSettings {
|
||||
source: new_source,
|
||||
model: new_model,
|
||||
revision: new_revision,
|
||||
api_key: new_api_key,
|
||||
dimensions: new_dimensions,
|
||||
document_template: new_document_template,
|
||||
url: new_url,
|
||||
query: new_query,
|
||||
input_field: new_input_field,
|
||||
path_to_embeddings: new_path_to_embeddings,
|
||||
embedding_object: new_embedding_object,
|
||||
input_type: new_input_type,
|
||||
distribution: new_distribution,
|
||||
} = new;
|
||||
|
||||
let mut reindex_action = None;
|
||||
|
||||
// **Warning**: do not use short-circuiting || here, we want all these operations applied
|
||||
if source.apply(new_source) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
// when the source changes, we need to reapply the default settings for the new source
|
||||
apply_default_for_source(
|
||||
&source,
|
||||
&mut model,
|
||||
&mut revision,
|
||||
&mut dimensions,
|
||||
&mut url,
|
||||
&mut query,
|
||||
&mut input_field,
|
||||
&mut path_to_embeddings,
|
||||
&mut embedding_object,
|
||||
&mut input_type,
|
||||
&mut document_template,
|
||||
)
|
||||
}
|
||||
if model.apply(new_model) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if revision.apply(new_revision) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if dimensions.apply(new_dimensions) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if url.apply(new_url) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if query.apply(new_query) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if input_field.apply(new_input_field) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if path_to_embeddings.apply(new_path_to_embeddings) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if embedding_object.apply(new_embedding_object) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if input_type.apply(new_input_type) {
|
||||
ReindexAction::push_action(&mut reindex_action, ReindexAction::FullReindex);
|
||||
}
|
||||
if document_template.apply(new_document_template) {
|
||||
ReindexAction::push_action(
|
||||
&mut reindex_action,
|
||||
ReindexAction::RegeneratePrompts,
|
||||
);
|
||||
}
|
||||
|
||||
distribution.apply(new_distribution);
|
||||
api_key.apply(new_api_key);
|
||||
|
||||
let updated_settings = EmbeddingSettings {
|
||||
source,
|
||||
model,
|
||||
revision,
|
||||
api_key,
|
||||
dimensions,
|
||||
document_template,
|
||||
url,
|
||||
query,
|
||||
input_field,
|
||||
path_to_embeddings,
|
||||
embedding_object,
|
||||
input_type,
|
||||
distribution,
|
||||
};
|
||||
|
||||
match reindex_action {
|
||||
Some(action) => Self::Reindex { action, updated_settings },
|
||||
None => Self::UpdateWithoutReindex { updated_settings },
|
||||
}
|
||||
}
|
||||
Setting::Reset => Self::Remove,
|
||||
Setting::NotSet => Self::UpdateWithoutReindex { updated_settings: old },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReindexAction {
|
||||
fn push_action(this: &mut Option<Self>, other: Self) {
|
||||
*this = match (*this, other) {
|
||||
(_, ReindexAction::FullReindex) => Some(ReindexAction::FullReindex),
|
||||
(Some(ReindexAction::FullReindex), _) => Some(ReindexAction::FullReindex),
|
||||
(_, ReindexAction::RegeneratePrompts) => Some(ReindexAction::RegeneratePrompts),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)] // private function
|
||||
fn apply_default_for_source(
|
||||
source: &Setting<EmbedderSource>,
|
||||
model: &mut Setting<String>,
|
||||
revision: &mut Setting<String>,
|
||||
dimensions: &mut Setting<usize>,
|
||||
url: &mut Setting<String>,
|
||||
query: &mut Setting<serde_json::Value>,
|
||||
input_field: &mut Setting<Vec<String>>,
|
||||
path_to_embeddings: &mut Setting<Vec<String>>,
|
||||
embedding_object: &mut Setting<Vec<String>>,
|
||||
input_type: &mut Setting<InputType>,
|
||||
document_template: &mut Setting<String>,
|
||||
) {
|
||||
match source {
|
||||
Setting::Set(EmbedderSource::HuggingFace) => {
|
||||
*model = Setting::Reset;
|
||||
*revision = Setting::Reset;
|
||||
*dimensions = Setting::NotSet;
|
||||
*url = Setting::NotSet;
|
||||
*query = Setting::NotSet;
|
||||
*input_field = Setting::NotSet;
|
||||
*path_to_embeddings = Setting::NotSet;
|
||||
*embedding_object = Setting::NotSet;
|
||||
*input_type = Setting::NotSet;
|
||||
}
|
||||
Setting::Set(EmbedderSource::Ollama) => {
|
||||
*model = Setting::Reset;
|
||||
*revision = Setting::NotSet;
|
||||
*dimensions = Setting::Reset;
|
||||
*url = Setting::NotSet;
|
||||
*query = Setting::NotSet;
|
||||
*input_field = Setting::NotSet;
|
||||
*path_to_embeddings = Setting::NotSet;
|
||||
*embedding_object = Setting::NotSet;
|
||||
*input_type = Setting::NotSet;
|
||||
}
|
||||
Setting::Set(EmbedderSource::OpenAi) | Setting::Reset => {
|
||||
*model = Setting::Reset;
|
||||
*revision = Setting::NotSet;
|
||||
*dimensions = Setting::NotSet;
|
||||
*url = Setting::NotSet;
|
||||
*query = Setting::NotSet;
|
||||
*input_field = Setting::NotSet;
|
||||
*path_to_embeddings = Setting::NotSet;
|
||||
*embedding_object = Setting::NotSet;
|
||||
*input_type = Setting::NotSet;
|
||||
}
|
||||
Setting::Set(EmbedderSource::Rest) => {
|
||||
*model = Setting::NotSet;
|
||||
*revision = Setting::NotSet;
|
||||
*dimensions = Setting::Reset;
|
||||
*url = Setting::Reset;
|
||||
*query = Setting::Reset;
|
||||
*input_field = Setting::Reset;
|
||||
*path_to_embeddings = Setting::Reset;
|
||||
*embedding_object = Setting::Reset;
|
||||
*input_type = Setting::Reset;
|
||||
}
|
||||
Setting::Set(EmbedderSource::UserProvided) => {
|
||||
*model = Setting::NotSet;
|
||||
*revision = Setting::NotSet;
|
||||
*dimensions = Setting::Reset;
|
||||
*url = Setting::NotSet;
|
||||
*query = Setting::NotSet;
|
||||
*input_field = Setting::NotSet;
|
||||
*path_to_embeddings = Setting::NotSet;
|
||||
*embedding_object = Setting::NotSet;
|
||||
*input_type = Setting::NotSet;
|
||||
*document_template = Setting::NotSet;
|
||||
}
|
||||
Setting::NotSet => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_set<T>(
|
||||
key: &Setting<T>,
|
||||
field: &'static str,
|
||||
|
@ -210,66 +443,6 @@ impl EmbeddingSettings {
|
|||
*model = Setting::Set(openai::EmbeddingModel::default().name().to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn apply_and_need_reindex(
|
||||
old: &mut Setting<EmbeddingSettings>,
|
||||
new: Setting<EmbeddingSettings>,
|
||||
) -> bool {
|
||||
match (old, new) {
|
||||
(
|
||||
Setting::Set(EmbeddingSettings {
|
||||
source: old_source,
|
||||
model: old_model,
|
||||
revision: old_revision,
|
||||
api_key: old_api_key,
|
||||
dimensions: old_dimensions,
|
||||
document_template: old_document_template,
|
||||
url: old_url,
|
||||
query: old_query,
|
||||
input_field: old_input_field,
|
||||
path_to_embeddings: old_path_to_embeddings,
|
||||
embedding_object: old_embedding_object,
|
||||
input_type: old_input_type,
|
||||
distribution: old_distribution,
|
||||
}),
|
||||
Setting::Set(EmbeddingSettings {
|
||||
source: new_source,
|
||||
model: new_model,
|
||||
revision: new_revision,
|
||||
api_key: new_api_key,
|
||||
dimensions: new_dimensions,
|
||||
document_template: new_document_template,
|
||||
url: new_url,
|
||||
query: new_query,
|
||||
input_field: new_input_field,
|
||||
path_to_embeddings: new_path_to_embeddings,
|
||||
embedding_object: new_embedding_object,
|
||||
input_type: new_input_type,
|
||||
distribution: new_distribution,
|
||||
}),
|
||||
) => {
|
||||
let mut needs_reindex = false;
|
||||
|
||||
needs_reindex |= old_source.apply(new_source);
|
||||
needs_reindex |= old_model.apply(new_model);
|
||||
needs_reindex |= old_revision.apply(new_revision);
|
||||
needs_reindex |= old_dimensions.apply(new_dimensions);
|
||||
needs_reindex |= old_document_template.apply(new_document_template);
|
||||
needs_reindex |= old_url.apply(new_url);
|
||||
needs_reindex |= old_query.apply(new_query);
|
||||
needs_reindex |= old_input_field.apply(new_input_field);
|
||||
needs_reindex |= old_path_to_embeddings.apply(new_path_to_embeddings);
|
||||
needs_reindex |= old_embedding_object.apply(new_embedding_object);
|
||||
needs_reindex |= old_input_type.apply(new_input_type);
|
||||
|
||||
old_distribution.apply(new_distribution);
|
||||
old_api_key.apply(new_api_key);
|
||||
needs_reindex
|
||||
}
|
||||
(Setting::Reset, Setting::Reset) | (_, Setting::NotSet) => false,
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, Deserr)]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue