Reimplement reindexing shell

This commit is contained in:
ManyTheFish 2025-06-25 14:00:00 +02:00
parent 2ae545f38c
commit 576f2c64cd
3 changed files with 282 additions and 45 deletions

View File

@ -12,13 +12,18 @@ use super::super::steps::IndexingStep;
use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta; use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext}; use super::document_changes::{extract, DocumentChanges, IndexingContext};
use crate::documents::FieldIdMapper;
use crate::documents::PrimaryKey;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::MergingWordCache; use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::Index;
use crate::InternalError;
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -312,6 +317,28 @@ where
Result::Ok((facet_field_ids_delta, index_embeddings)) Result::Ok((facet_field_ids_delta, index_embeddings))
} }
pub(super) fn extract_all_settings_changes<'extractor, MSP, SD>(
indexing_context: IndexingContext<MSP>,
indexer_span: Span,
extractor_sender: ExtractorBbqueueSender,
settings_delta: &SD,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
finished_extraction: &AtomicBool,
field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>,
modified_docids: &mut RoaringBitmap,
) -> Result<Vec<IndexEmbeddingConfig>>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
{
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
Result::Ok(index_embeddings)
}
fn request_threads() -> &'static ThreadPoolNoAbort { fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new(); static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();

View File

@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Once, RwLock}; use std::sync::{Once, RwLock};
use std::thread::{self, Builder}; use std::thread::{self, Builder};
@ -20,8 +21,10 @@ use super::thread_local::ThreadLocal;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::progress::Progress; use crate::progress::Progress;
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
pub(crate) mod de; pub(crate) mod de;
@ -32,6 +35,7 @@ mod extract;
mod guess_primary_key; mod guess_primary_key;
mod partial_dump; mod partial_dump;
mod post_processing; mod post_processing;
pub mod settings_changes;
mod update_by_function; mod update_by_function;
mod write; mod write;
@ -40,8 +44,6 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate. /// This is the main function of this crate.
/// ///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
///
/// TODO return stats
#[allow(clippy::too_many_arguments)] // clippy: 😝 #[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn index<'pl, 'indexer, 'index, DC, MSP>( pub fn index<'pl, 'indexer, 'index, DC, MSP>(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
@ -65,48 +67,8 @@ where
let arroy_memory = grenad_parameters.max_memory; let arroy_memory = grenad_parameters.max_memory;
// We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch let (grenad_parameters, total_bbbuffer_capacity) =
// is because we still use the old indexer for the settings and it is highly impacted by the indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
// max memory. So we keep the changes here and will remove these changes once we use the new
// indexer to also index settings. Related to #5125 and #5141.
let grenad_parameters = GrenadParameters {
max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100),
..grenad_parameters
};
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
@ -239,3 +201,219 @@ where
Ok(congestion) Ok(congestion)
} }
#[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn reindex<'pl, 'indexer, 'index, MSP, SD>(
wtxn: &mut RwTxn<'index>,
index: &'index Index,
pool: &ThreadPoolNoAbort,
grenad_parameters: GrenadParameters,
settings_delta: &'indexer SD,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
) -> Result<ChannelCongestion>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta + Sync,
{
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
let arroy_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let db_fields_ids_map = index.fields_ids_map(wtxn)?;
let new_fields_ids_map = settings_delta.new_fields_ids_map().clone();
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let indexing_context = IndexingContext {
index,
db_fields_ids_map: &db_fields_ids_map,
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
must_stop_processing,
progress,
grenad_parameters: &grenad_parameters,
};
let index_embeddings = index.embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?;
let mut modified_docids = roaring::RoaringBitmap::new();
let congestion = thread::scope(|s| -> Result<ChannelCongestion> {
let indexer_span = tracing::Span::current();
let finished_extraction = &finished_extraction;
// prevent moving the field_distribution and document_ids in the inner closure...
let field_distribution = &mut field_distribution;
let modified_docids = &mut modified_docids;
let extractor_handle =
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.install(move || {
extract::extract_all_settings_changes(
indexing_context,
indexer_span,
extractor_sender,
settings_delta,
&mut extractor_allocs,
finished_extraction,
field_distribution,
index_embeddings,
modified_docids,
)
})
.unwrap()
})?;
let new_embedders = settings_delta.new_embedders();
let embedder_actions = settings_delta.embedder_actions();
let index_embedder_category_ids = settings_delta.new_embedder_category_id();
let mut arroy_writers = arroy_writers_from_embedder_actions(
index,
embedder_actions,
new_embedders,
index_embedder_category_ids,
)?;
let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
let index_embeddings = extractor_handle.join().unwrap()?;
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
pool.install(|| {
build_vectors(
index,
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,
Some(&embedder_actions),
&indexing_context.must_stop_processing,
)
})
.unwrap()?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>
})?;
// required to into_inner the new_fields_ids_map
drop(fields_ids_map_store);
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
let document_ids = index.documents_ids(wtxn)?;
update_index(
index,
wtxn,
new_fields_ids_map,
None,
settings_delta.new_embedders().clone(),
field_distribution,
document_ids,
)?;
Ok(congestion)
}
fn arroy_writers_from_embedder_actions<'indexer, 'index>(
index: &'index Index,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
embedders: &'indexer EmbeddingConfigs,
index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>,
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, ArroyWrapper, usize)>> {
let vector_arroy = index.vector_arroy;
embedders
.inner_as_ref()
.iter()
.filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) {
None => None,
Some(action) if action.write_back().is_some() => None,
Some(action) => {
let Some(&embedder_category_id) = index_embedder_category_ids.get(embedder_name)
else {
return Some(Err(crate::error::Error::InternalError(
crate::InternalError::DatabaseMissingEntry {
db_name: crate::index::db_name::VECTOR_EMBEDDER_CATEGORY_ID,
key: None,
},
)));
};
let writer =
ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized);
let dimensions = embedder.dimensions();
Some(Ok((
embedder_category_id,
(embedder_name.as_str(), embedder.as_ref(), writer, dimensions),
)))
}
})
.collect()
}
fn indexer_memory_settings(
current_num_threads: usize,
grenad_parameters: GrenadParameters,
) -> (GrenadParameters, usize) {
// We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch
// is because we still use the old indexer for the settings and it is highly impacted by the
// max memory. So we keep the changes here and will remove these changes once we use the new
// indexer to also index settings. Related to #5125 and #5141.
let grenad_parameters = GrenadParameters {
max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100),
..grenad_parameters
};
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * current_num_threads;
// 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
(grenad_parameters, total_bbbuffer_capacity)
}

View File

@ -2218,6 +2218,38 @@ fn deserialize_sub_embedder(
} }
} }
/// Implement this trait for the settings delta type.
/// This is used in the new settings update flow and will allow to easily replace the old settings delta type: `InnerIndexSettingsDiff`.
pub trait SettingsDelta {
fn new_embedders(&self) -> &EmbeddingConfigs;
fn old_embedders(&self) -> &EmbeddingConfigs;
fn new_embedder_category_id(&self) -> &HashMap<String, u8>;
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction>;
fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata;
}
impl SettingsDelta for InnerIndexSettingsDiff {
fn new_embedders(&self) -> &EmbeddingConfigs {
&self.new.embedding_configs
}
fn old_embedders(&self) -> &EmbeddingConfigs {
&self.old.embedding_configs
}
fn new_embedder_category_id(&self) -> &HashMap<String, u8> {
&self.new.embedder_category_id
}
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction> {
&self.embedding_config_updates
}
fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata {
&self.new.fields_ids_map
}
}
#[cfg(test)] #[cfg(test)]
#[path = "test_settings.rs"] #[path = "test_settings.rs"]
mod tests; mod tests;