minor fixes

This commit is contained in:
ManyTheFish 2025-06-26 18:07:08 +02:00
parent 0687cf058a
commit d35b2d8d33
6 changed files with 21 additions and 27 deletions

View file

@ -186,10 +186,10 @@ impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> {
} }
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> { impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> {
type Data = FullySend<RefCell<DocumentExtractorData>>; type Data = FullySend<()>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(FullySend(Default::default())) Ok(FullySend(()))
} }
fn process<'doc>( fn process<'doc>(

View file

@ -322,12 +322,13 @@ where
Result::Ok((facet_field_ids_delta, index_embeddings)) Result::Ok((facet_field_ids_delta, index_embeddings))
} }
pub(super) fn extract_all_settings_changes<'extractor, MSP, SD>( #[allow(clippy::too_many_arguments)]
pub(super) fn extract_all_settings_changes<MSP, SD>(
indexing_context: IndexingContext<MSP>, indexing_context: IndexingContext<MSP>,
indexer_span: Span, indexer_span: Span,
extractor_sender: ExtractorBbqueueSender, extractor_sender: ExtractorBbqueueSender,
settings_delta: &SD, settings_delta: &SD,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
finished_extraction: &AtomicBool, finished_extraction: &AtomicBool,
field_distribution: &mut BTreeMap<String, u64>, field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>, mut index_embeddings: Vec<IndexEmbeddingConfig>,
@ -342,7 +343,7 @@ where
let all_document_ids = let all_document_ids =
indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>(); indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
let primary_key = let primary_key =
primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?; primary_key_from_db(indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
let documents = DatabaseDocuments::new(&all_document_ids, primary_key); let documents = DatabaseDocuments::new(&all_document_ids, primary_key);
let span = let span =
@ -364,7 +365,7 @@ where
let embedding_sender = extractor_sender.embeddings(); let embedding_sender = extractor_sender.embeddings();
// extract the remaining embedders // extract the remaining embeddings
let extractor = SettingsChangeEmbeddingExtractor::new( let extractor = SettingsChangeEmbeddingExtractor::new(
settings_delta.new_embedders(), settings_delta.new_embedders(),
settings_delta.old_embedders(), settings_delta.old_embedders(),
@ -410,9 +411,9 @@ where
Result::Ok(index_embeddings) Result::Ok(index_embeddings)
} }
fn primary_key_from_db<'indexer, 'index>( fn primary_key_from_db<'indexer>(
index: &'indexer Index, index: &'indexer Index,
rtxn: &'indexer heed::RoTxn<'index>, rtxn: &'indexer heed::RoTxn<'_>,
fields: &'indexer impl FieldIdMapper, fields: &'indexer impl FieldIdMapper,
) -> Result<PrimaryKey<'indexer>> { ) -> Result<PrimaryKey<'indexer>> {
let Some(primary_key) = index.primary_key(rtxn)? else { let Some(primary_key) = index.primary_key(rtxn)? else {

View file

@ -205,8 +205,7 @@ where
Ok(congestion) Ok(congestion)
} }
#[allow(clippy::too_many_arguments)] // clippy: 😝 pub fn reindex<'indexer, 'index, MSP, SD>(
pub fn reindex<'pl, 'indexer, 'index, MSP, SD>(
wtxn: &mut RwTxn<'index>, wtxn: &mut RwTxn<'index>,
index: &'index Index, index: &'index Index,
pool: &ThreadPoolNoAbort, pool: &ThreadPoolNoAbort,
@ -307,7 +306,7 @@ where
index_embeddings, index_embeddings,
arroy_memory, arroy_memory,
&mut arroy_writers, &mut arroy_writers,
Some(&embedder_actions), Some(embedder_actions),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
) )
}) })
@ -336,8 +335,8 @@ where
Ok(congestion) Ok(congestion)
} }
fn arroy_writers_from_embedder_actions<'indexer, 'index>( fn arroy_writers_from_embedder_actions<'indexer>(
index: &'index Index, index: &Index,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>, embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
embedders: &'indexer EmbeddingConfigs, embedders: &'indexer EmbeddingConfigs,
index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>, index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>,
@ -372,15 +371,11 @@ fn arroy_writers_from_embedder_actions<'indexer, 'index>(
.collect() .collect()
} }
fn delete_old_embedders<'indexer, 'index, SD>( fn delete_old_embedders<SD>(wtxn: &mut RwTxn<'_>, index: &Index, settings_delta: &SD) -> Result<()>
wtxn: &mut RwTxn<'_>,
index: &'index Index,
settings_delta: &'indexer SD,
) -> Result<()>
where where
SD: SettingsDelta, SD: SettingsDelta,
{ {
for (_name, action) in settings_delta.embedder_actions() { for action in settings_delta.embedder_actions().values() {
if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() { if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() {
let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized); let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized);
let dimensions = reader.dimensions(wtxn)?; let dimensions = reader.dimensions(wtxn)?;

View file

@ -23,7 +23,7 @@ pub trait SettingsChangeExtractor<'extractor>: Sync {
fn process<'doc>( fn process<'doc>(
&'doc self, &'doc self,
changes: impl Iterator<Item = Result<DatabaseDocument<'doc>>>, documents: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
context: &'doc DocumentContext<Self::Data>, context: &'doc DocumentContext<Self::Data>,
) -> Result<()>; ) -> Result<()>;
} }
@ -128,12 +128,11 @@ pub fn settings_change_extract<
// Clean up and reuse the document-specific allocator // Clean up and reuse the document-specific allocator
context.doc_alloc.reset(); context.doc_alloc.reset();
let items = items.as_ref(); let documents = items
let changes = items
.iter() .iter()
.filter_map(|item| documents.item_to_database_document(context, item).transpose()); .filter_map(|item| documents.item_to_database_document(context, item).transpose());
let res = extractor.process(changes, context).map_err(Arc::new); let res = extractor.process(documents, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
// send back the doc_alloc in the pool // send back the doc_alloc in the pool

View file

@ -101,6 +101,7 @@ impl ChannelCongestion {
} }
#[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")] #[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")]
#[allow(clippy::too_many_arguments)]
pub fn build_vectors<MSP>( pub fn build_vectors<MSP>(
index: &Index, index: &Index,
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,

View file

@ -1476,7 +1476,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
chat: Setting::NotSet, chat: Setting::NotSet,
wtxn: _, wtxn: _,
index: _, index: _,
indexer_config: _, // TODO: this is not used indexer_config: _,
} = &self } = &self
{ {
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
@ -1486,9 +1486,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// Update index settings // Update index settings
let embedding_config_updates = self.update_embedding_configs()?; let embedding_config_updates = self.update_embedding_configs()?;
let mut new_inner_settings = let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
new_inner_settings.recompute_searchables(self.wtxn, self.index)?;
let primary_key_id = self let primary_key_id = self
.index .index