Post processing of the merge

This commit is contained in:
Louis Dureuil 2024-11-06 17:50:12 +01:00
parent ee03743355
commit 10f49f0d75
No known key found for this signature in database
12 changed files with 512 additions and 996 deletions

View file

@ -1,17 +1,17 @@
use std::cmp::Ordering;
use std::sync::RwLock;
use std::sync::{OnceLock, RwLock};
use std::thread::{self, Builder};
use big_s::S;
use document_changes::{
for_each_document_change, DocumentChanges, FullySend, IndexingContext, ThreadLocal,
};
use document_changes::{extract, DocumentChanges, IndexingContext, Progress, ThreadLocal};
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
use hashbrown::HashMap;
use heed::types::{Bytes, DecodeIgnore, Str};
use heed::{RoTxn, RwTxn};
use itertools::{merge_join_by, EitherOrBoth};
pub use partial_dump::PartialDump;
use rand::SeedableRng as _;
use rayon::ThreadPool;
use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction;
@ -19,37 +19,100 @@ pub use update_by_function::UpdateByFunction;
use super::channel::*;
use super::extract::*;
use super::facet_search_builder::FacetSearchBuilder;
use super::merger::{FacetDatabases, FacetFieldIdsDelta};
use super::word_fst_builder::PrefixDelta;
use super::merger::FacetFieldIdsDelta;
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd;
use crate::update::new::word_fst_builder::{PrefixData, WordFstBuilder};
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids};
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings};
use crate::{
FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort,
ThreadPoolNoAbortBuilder, UserError,
};
pub mod de;
pub(crate) mod de;
pub mod document_changes;
mod document_deletion;
mod document_operation;
mod partial_dump;
mod update_by_function;
mod steps {
pub const STEPS: &[&str] = &[
"extracting documents",
"extracting facets",
"extracting words",
"extracting word proximity",
"extracting embeddings",
"writing to database",
"post-processing facets",
"post-processing words",
"finalizing",
];
const fn step(step: u16) -> (u16, &'static str) {
(step, STEPS[step as usize])
}
pub const fn total_steps() -> u16 {
STEPS.len() as u16
}
pub const fn extract_documents() -> (u16, &'static str) {
step(0)
}
pub const fn extract_facets() -> (u16, &'static str) {
step(1)
}
pub const fn extract_words() -> (u16, &'static str) {
step(2)
}
pub const fn extract_word_proximity() -> (u16, &'static str) {
step(3)
}
pub const fn extract_embeddings() -> (u16, &'static str) {
step(4)
}
pub const fn write_db() -> (u16, &'static str) {
step(5)
}
pub const fn post_processing_facets() -> (u16, &'static str) {
step(6)
}
pub const fn post_processing_words() -> (u16, &'static str) {
step(7)
}
pub const fn finalizing() -> (u16, &'static str) {
step(8)
}
}
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
///
/// TODO return stats
pub fn index<'pl, 'indexer, 'index, DC>(
#[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
wtxn: &mut RwTxn,
index: &'index Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
@ -57,15 +120,23 @@ pub fn index<'pl, 'indexer, 'index, DC>(
new_primary_key: Option<PrimaryKey<'pl>>,
pool: &ThreadPool,
document_changes: &DC,
embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP,
send_progress: &'indexer SP,
) -> Result<()>
where
DC: DocumentChanges<'pl>,
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
// TODO find a better channel limit
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
@ -76,46 +147,69 @@ where
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
must_stop_processing,
send_progress,
};
thread::scope(|s| -> crate::Result<_> {
let total_steps = steps::total_steps();
let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?;
thread::scope(|s| -> Result<()> {
let indexer_span = tracing::Span::current();
let embedders = &embedders;
// prevent moving the field_distribution and document_ids in the inner closure...
let field_distribution = &mut field_distribution;
let document_ids = &mut document_ids;
// TODO manage the errors correctly
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
// document but we need to create a function that collects and compresses documents.
let rtxn = index.read_txn().unwrap();
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
let rtxn = index.read_txn()?;
let mut documents_ids = index.documents_ids(&rtxn)?;
let delta_documents_ids = datastore.into_iter().map(|FullySend(d)| d.into_inner()).reduce(DelAddRoaringBitmap::merge).unwrap_or_default();
delta_documents_ids.apply_to(&mut documents_ids);
extractor_sender.send_documents_ids(documents_ids).unwrap();
// document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
// document_sender.finish().unwrap();
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
for document_extractor_data in datastore {
let document_extractor_data = document_extractor_data.0.into_inner();
for (field, delta) in document_extractor_data.field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
}
let facet_field_ids_delta;
field_distribution.retain(|_, v| *v == 0);
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
let facet_field_ids_delta;
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_facets();
facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?,
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
@ -125,6 +219,7 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_words();
let WordDocidsCaches {
word_docids,
@ -132,7 +227,7 @@ where
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?;
// TODO Word Docids Merger
// extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
@ -206,7 +301,10 @@ where
if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
let (finished_steps, step_name) = steps::extract_word_proximity();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?;
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
@ -215,62 +313,212 @@ where
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter();
}
'vectors: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
// TODO THIS IS TOO MUCH
// - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string
// - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
let index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() {
break 'vectors;
}
// TODO Inverted Indexes again
// - [x] Extract fieldid facet isempty docids
// - [x] Extract fieldid facet isnull docids
// - [x] Extract fieldid facet exists docids
let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings();
// TODO This is the normal system
// - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
// TODO use None when needed
Result::Ok(facet_field_ids_delta)
})
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
let mut user_provided = HashMap::new();
for data in datastore {
let data = data.into_inner().0;
for (embedder, deladd) in data.into_iter() {
let user_provided = user_provided.entry(embedder).or_insert(Default::default());
if let Some(del) = deladd.del {
*user_provided -= del;
}
if let Some(add) = deladd.add {
*user_provided |= add;
}
}
}
embedding_sender.finish(user_provided).unwrap();
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter();
let (finished_steps, step_name) = steps::write_db();
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
}
// TODO THIS IS TOO MUCH
// - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string
// - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
// TODO Inverted Indexes again
// - [x] Extract fieldid facet isempty docids
// - [x] Extract fieldid facet isnull docids
// - [x] Extract fieldid facet exists docids
// TODO This is the normal system
// - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
Result::Ok(facet_field_ids_delta)
})
})?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let indexer_span = tracing::Span::current();
let vector_arroy = index.vector_arroy;
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let indexer_span = tracing::Span::current();
let arroy_writers: Result<HashMap<_, _>> = embedders
.inner_as_ref()
.iter()
.map(|(embedder_name, (embedder, _, was_quantized))| {
let embedder_index = index.embedder_category_id.get(wtxn, embedder_name)?.ok_or(
InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
},
)?;
let dimensions = embedder.dimensions();
let writer = ArroyWrapper::new(vector_arroy, embedder_index, *was_quantized);
Ok((
embedder_index,
(embedder_name.as_str(), embedder.as_ref(), writer, dimensions),
))
})
.collect();
let mut arroy_writers = arroy_writers?;
for operation in writer_receiver {
let database = operation.database(index);
match operation.entry() {
EntryOperation::Delete(e) => {
if !database.delete(wtxn, e.entry())? {
unreachable!("We tried to delete an unknown key")
match operation {
WriterOperation::DbOperation(db_operation) => {
let database = db_operation.database(index);
match db_operation.entry() {
EntryOperation::Delete(e) => {
if !database.delete(wtxn, e.entry())? {
unreachable!("We tried to delete an unknown key")
}
}
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
}
}
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation {
ArroyOperation::DeleteVectors { docid } => {
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
&mut arroy_writers
{
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
}
ArroyOperation::SetVectors {
docid,
embedder_id,
embeddings: raw_embeddings,
} => {
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
// TODO: switch to Embeddings
let mut embeddings = Embeddings::new(*dimensions);
for embedding in raw_embeddings {
embeddings.append(embedding).unwrap();
}
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
ArroyOperation::SetVector { docid, embedder_id, embedding } => {
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, &embedding)?;
}
ArroyOperation::Finish { mut user_provided } => {
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter();
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
&mut arroy_writers
{
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
&mut rng,
dimensions,
false,
&indexing_context.must_stop_processing,
)?;
}
let mut configs = index.embedding_configs(wtxn)?;
for config in &mut configs {
if let Some(user_provided) = user_provided.remove(&config.name) {
config.user_provided = user_provided;
}
}
index.put_embedding_configs(wtxn, configs)?;
}
},
}
}
/// TODO handle the panicking threads
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
let (finished_steps, step_name) = steps::post_processing_facets();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
let (finished_steps, step_name) = steps::post_processing_words();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta)?;
}
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
let (finished_steps, step_name) = steps::finalizing();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
Result::Ok(())
Ok(()) as Result<_>
})?;
// required to into_inner the new_fields_ids_map
drop(fields_ids_map_store);
let fields_ids_map = new_fields_ids_map.into_inner().unwrap();
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
if let Some(new_primary_key) = new_primary_key {
index.put_primary_key(wtxn, new_primary_key.name())?;
@ -280,7 +528,8 @@ where
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?;
inner_index_settings.recompute_facets(wtxn, index)?;
inner_index_settings.recompute_searchables(wtxn, index)?;
index.put_field_distribution(wtxn, &field_distribution)?;
index.put_documents_ids(wtxn, &document_ids)?;
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
Ok(())
@ -517,3 +766,15 @@ pub fn retrieve_or_guess_primary_key<'a>(
Err(err) => Ok(Err(err)),
}
}
fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
REQUEST_THREADS.get_or_init(|| {
ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()
.unwrap()
})
}