diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 4ae8c7d46..fb47c705a 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter}; +use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -1277,7 +1277,6 @@ impl IndexScheduler { operations, mut tasks, } => { - let indexer_config = self.index_mapper.indexer_config(); // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // this is made difficult by the fact we're doing private clones of the index scheduler and sending it // to a fresh thread. @@ -1386,10 +1385,16 @@ impl IndexScheduler { } if tasks.iter().any(|res| res.error.is_none()) { - /// TODO create a pool if needed - // let pool = indexer_config.thread_pool.unwrap(); - let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); + let local_pool; + let pool = match &self.index_mapper.indexer_config().thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + // TODO we want to multithread this let document_changes = indexer.into_changes( &indexer_alloc, index, @@ -1398,18 +1403,20 @@ impl IndexScheduler { &mut new_fields_ids_map, )?; - indexer::index( - index_wtxn, - index, - &db_fields_ids_map, - new_fields_ids_map, - primary_key_has_been_set.then_some(primary_key), - &pool, - &document_changes, - embedders, - &|| must_stop_processing.get(), - &send_progress, - )?; + pool.install(|| { + indexer::index( + index_wtxn, + index, + &db_fields_ids_map, + new_fields_ids_map, + primary_key_has_been_set.then_some(primary_key), + &document_changes, + embedders, + &|| must_stop_processing.get(), + &send_progress, + ) + }) + .unwrap()?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } @@ -1489,27 +1496,37 @@ impl IndexScheduler { let result_count = Ok((candidates.len(), candidates.len())) as Result<_>; if task.error.is_none() { - /// TODO create a pool if needed - // let pool = indexer_config.thread_pool.unwrap(); - let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); + let local_pool; + let pool = match &self.index_mapper.indexer_config().thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; - let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); - let document_changes = indexer.into_changes(&primary_key)?; - let embedders = index.embedding_configs(index_wtxn)?; - let embedders = self.embedders(embedders)?; + pool.install(|| { + let indexer = + UpdateByFunction::new(candidates, context.clone(), code.clone()); + let document_changes = indexer.into_changes(&primary_key)?; + let embedders = index.embedding_configs(index_wtxn)?; + let embedders = self.embedders(embedders)?; - indexer::index( - index_wtxn, - index, - &db_fields_ids_map, - new_fields_ids_map, - None, // cannot change primary key in DocumentEdition - &pool, - &document_changes, - embedders, - &|| must_stop_processing.get(), - &send_progress, - )?; + indexer::index( + index_wtxn, + index, + &db_fields_ids_map, + new_fields_ids_map, + None, // cannot change primary key in DocumentEdition + &document_changes, + embedders, + &|| must_stop_processing.get(), + &send_progress, + )?; + + Result::Ok(()) + }) + .unwrap()?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } @@ -1629,9 +1646,14 @@ impl IndexScheduler { .map_err(milli::Error::from)?; if !tasks.iter().all(|res| res.error.is_some()) { - /// TODO create a pool if needed - // let pool = indexer_config.thread_pool.unwrap(); - let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); + let local_pool; + let pool = match &self.index_mapper.indexer_config().thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; let mut indexer = indexer::DocumentDeletion::new(); indexer.delete_documents_by_docids(to_delete); @@ -1639,18 +1661,20 @@ impl IndexScheduler { let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; - indexer::index( - index_wtxn, - index, - &db_fields_ids_map, - new_fields_ids_map, - None, // document deletion never changes primary key - &pool, - &document_changes, - embedders, - &|| must_stop_processing.get(), - &send_progress, - )?; + pool.install(|| { + indexer::index( + index_wtxn, + index, + &db_fields_ids_map, + new_fields_ids_map, + None, // document deletion never changes primary key + &document_changes, + embedders, + &|| must_stop_processing.get(), + &send_progress, + ) + }) + .unwrap()?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } diff --git a/crates/milli/src/search/new/matches/mod.rs b/crates/milli/src/search/new/matches/mod.rs index 80e3ec7b2..ba639b7f2 100644 --- a/crates/milli/src/search/new/matches/mod.rs +++ b/crates/milli/src/search/new/matches/mod.rs @@ -3,6 +3,9 @@ mod r#match; mod matching_words; mod simple_token_kind; +use std::borrow::Cow; +use std::cmp::{max, min}; + use charabia::{Language, SeparatorKind, Token, Tokenizer}; use either::Either; pub use matching_words::MatchingWords; @@ -10,10 +13,6 @@ use matching_words::{MatchType, PartialMatch}; use r#match::{Match, MatchPosition}; use serde::Serialize; use simple_token_kind::SimpleTokenKind; -use std::{ - borrow::Cow, - cmp::{max, min}, -}; const DEFAULT_CROP_MARKER: &str = "…"; const DEFAULT_HIGHLIGHT_PREFIX: &str = ""; diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index fbf102f18..9e8039ffd 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -366,14 +366,14 @@ pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender); impl FieldIdDocidFacetSender<'_> { pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &value)); + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); self.0 .send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry }) } pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &[])); + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &[])); self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry }) } diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 42fce3c3d..a324d2914 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -58,7 +58,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { context.index, &context.db_fields_ids_map, )?; - let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); + let geo_iter = + content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields().chain(geo_iter) { let (f, _) = res?; let entry = document_extractor_data @@ -74,7 +75,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { let docid = update.docid(); let content = update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; - let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); + let geo_iter = + content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields().chain(geo_iter) { let (f, _) = res?; let entry = document_extractor_data @@ -84,7 +86,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { *entry -= 1; } let content = update.updated(); - let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); + let geo_iter = + content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields().chain(geo_iter) { let (f, _) = res?; let entry = document_extractor_data @@ -114,7 +117,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); let content = insertion.inserted(); - let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); + let geo_iter = + content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields().chain(geo_iter) { let (f, _) = res?; let entry = document_extractor_data diff --git a/crates/milli/src/update/new/facet_search_builder.rs b/crates/milli/src/update/new/facet_search_builder.rs index 7eaec95a5..0c924bff4 100644 --- a/crates/milli/src/update/new/facet_search_builder.rs +++ b/crates/milli/src/update/new/facet_search_builder.rs @@ -6,11 +6,9 @@ use grenad::Sorter; use heed::types::{Bytes, SerdeJson}; use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn}; -use super::extract::FacetKind; use super::fst_merger_builder::FstMergerBuilder; use super::KvReaderDelAdd; -use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec}; -use crate::heed_codec::StrRefCodec; +use crate::heed_codec::facet::FacetGroupKey; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::{create_sorter, MergeDeladdBtreesetString}; use crate::{ diff --git a/crates/milli/src/update/new/fst_merger_builder.rs b/crates/milli/src/update/new/fst_merger_builder.rs index 9fd259ce6..1c584ef53 100644 --- a/crates/milli/src/update/new/fst_merger_builder.rs +++ b/crates/milli/src/update/new/fst_merger_builder.rs @@ -1,10 +1,12 @@ -use std::{fs::File, io::BufWriter}; +use std::fs::File; +use std::io::BufWriter; use fst::{Set, SetBuilder, Streamer}; use memmap2::Mmap; use tempfile::tempfile; -use crate::{update::del_add::DelAdd, InternalError, Result}; +use crate::update::del_add::DelAdd; +use crate::{InternalError, Result}; pub struct FstMergerBuilder<'a> { stream: Option>, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 2cdeca76d..1a5e4fc23 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -13,7 +13,6 @@ use itertools::{merge_join_by, EitherOrBoth}; pub use partial_dump::PartialDump; use rand::SeedableRng as _; use raw_collections::RawMap; -use rayon::ThreadPool; use time::OffsetDateTime; pub use update_by_function::UpdateByFunction; @@ -136,7 +135,6 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( db_fields_ids_map: &'indexer FieldsIdsMap, new_fields_ids_map: FieldsIdsMap, new_primary_key: Option>, - pool: &ThreadPool, document_changes: &DC, embedders: EmbeddingConfigs, must_stop_processing: &'indexer MSP, @@ -152,9 +150,9 @@ where let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = RwLock::new(new_fields_ids_map); - let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); - let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); - let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); + let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); + let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); let indexing_context = IndexingContext { index, @@ -179,248 +177,260 @@ where let document_ids = &mut document_ids; // TODO manage the errors correctly let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { - let result = pool.in_place_scope(|_s| { - let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); + + let rtxn = index.read_txn()?; + + // document but we need to create a function that collects and compresses documents. + let document_sender = extractor_sender.documents(); + let document_extractor = DocumentsExtractor::new(&document_sender, embedders); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let (finished_steps, step_name) = steps::extract_documents(); + extract(document_changes, + &document_extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + finished_steps, + total_steps, + step_name, + )?; + + for document_extractor_data in datastore { + let document_extractor_data = document_extractor_data.0.into_inner(); + for (field, delta) in document_extractor_data.field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); + } + document_extractor_data.docids_delta.apply_to(document_ids); + } + + field_distribution.retain(|_, v| *v != 0); + + const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; + let current_num_threads = rayon::current_num_threads(); + let max_memory = TEN_GIB / current_num_threads; + eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); + + let grenad_parameters = GrenadParameters { + max_memory: Some(max_memory), + ..GrenadParameters::default() + }; + + let facet_field_ids_delta; + + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); - let rtxn = index.read_txn()?; + let (finished_steps, step_name) = steps::extract_facets(); - // document but we need to create a function that collects and compresses documents. - let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(&document_sender, embedders); - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_documents(); - extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - - for document_extractor_data in datastore { - let document_extractor_data = document_extractor_data.0.into_inner(); - for (field, delta) in document_extractor_data.field_distribution_delta { - let current = field_distribution.entry(field).or_default(); - // adding the delta should never cause a negative result, as we are removing fields that previously existed. - *current = current.saturating_add_signed(delta); - } - document_extractor_data.docids_delta.apply_to(document_ids); - } - - field_distribution.retain(|_, v| *v != 0); - - const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; - let current_num_threads = rayon::current_num_threads(); - let max_memory = TEN_GIB / current_num_threads; - eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); - - let grenad_parameters = GrenadParameters { - max_memory: Some(max_memory), - ..GrenadParameters::default() - }; - - let facet_field_ids_delta; - - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); - let _entered = span.enter(); - - let (finished_steps, step_name) = steps::extract_facets(); - - facet_field_ids_delta = merge_and_send_facet_docids( - FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name)?, - FacetDatabases::new(index), - index, - extractor_sender.facet_docids(), - )?; - } - - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); - let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_words(); - - let WordDocidsCaches { - word_docids, - word_fid_docids, - exact_word_docids, - word_position_docids, - fid_word_count_docids, - } = WordDocidsExtractors::run_extraction( - grenad_parameters, + facet_field_ids_delta = merge_and_send_facet_docids( + FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, + &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name, - )?; + )?, + FacetDatabases::new(index), + index, + extractor_sender.facet_docids(), + )?; + } - // TODO Word Docids Merger - // extractor_sender.send_searchable::(word_docids).unwrap(); - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_docids, - index.word_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); + let _entered = span.enter(); + let (finished_steps, step_name) = steps::extract_words(); - // Word Fid Docids Merging - // extractor_sender.send_searchable::(word_fid_docids).unwrap(); - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_fid_docids, - index.word_fid_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } + let WordDocidsCaches { + word_docids, + word_fid_docids, + exact_word_docids, + word_position_docids, + fid_word_count_docids, + } = WordDocidsExtractors::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + finished_steps, + total_steps, + step_name, + )?; - // Exact Word Docids Merging - // extractor_sender.send_searchable::(exact_word_docids).unwrap(); - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - exact_word_docids, - index.exact_word_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - - // Word Position Docids Merging - // extractor_sender.send_searchable::(word_position_docids).unwrap(); - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_position_docids, - index.word_position_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - - // Fid Word Count Docids Merging - // extractor_sender.send_searchable::(fid_word_count_docids).unwrap(); - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); - let _entered = span.enter(); - merge_and_send_docids( - fid_word_count_docids, - index.field_id_word_count_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - } - - // run the proximity extraction only if the precision is by word - // this works only if the settings didn't change during this transaction. - let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); - if proximity_precision == ProximityPrecision::ByWord { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); + // TODO Word Docids Merger + // extractor_sender.send_searchable::(word_docids).unwrap(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let _entered = span.enter(); - - let (finished_steps, step_name) = steps::extract_word_proximity(); - - let caches = ::run_extraction(grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - finished_steps, - total_steps, - step_name, - )?; - merge_and_send_docids( - caches, - index.word_pair_proximity_docids.remap_types(), + word_docids, + index.word_docids.remap_types(), index, - extractor_sender.docids::(), + extractor_sender.docids::(), &indexing_context.must_stop_processing, )?; } - 'vectors: { - let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + // Word Fid Docids Merging + // extractor_sender.send_searchable::(word_fid_docids).unwrap(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let _entered = span.enter(); - - let mut index_embeddings = index.embedding_configs(&rtxn)?; - if index_embeddings.is_empty() { - break 'vectors; - } - - let embedding_sender = extractor_sender.embeddings(); - let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); - let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_embeddings(); - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - - for config in &mut index_embeddings { - 'data: for data in datastore.iter_mut() { - let data = &mut data.get_mut().0; - let Some(deladd) = data.remove(&config.name) else { continue 'data; }; - deladd.apply_to(&mut config.user_provided); - } - } - - embedding_sender.finish(index_embeddings).unwrap(); - } - - 'geo: { - let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); - let _entered = span.enter(); - - // let geo_sender = extractor_sender.geo_points(); - let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { - break 'geo; - }; - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_geo_points(); - extract(document_changes, - &extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - finished_steps, - total_steps, - step_name, - )?; - - merge_and_send_rtree( - datastore, - &rtxn, + merge_and_send_docids( + word_fid_docids, + index.word_fid_docids.remap_types(), index, - extractor_sender.geo(), + extractor_sender.docids::(), &indexing_context.must_stop_processing, )?; } - // TODO THIS IS TOO MUCH - // - [ ] Extract fieldid docid facet number - // - [ ] Extract fieldid docid facet string - // - [ ] Extract facetid string fst - // - [ ] Extract facetid normalized string strings + // Exact Word Docids Merging + // extractor_sender.send_searchable::(exact_word_docids).unwrap(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); + let _entered = span.enter(); + merge_and_send_docids( + exact_word_docids, + index.exact_word_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } - // TODO Inverted Indexes again - // - [x] Extract fieldid facet isempty docids - // - [x] Extract fieldid facet isnull docids - // - [x] Extract fieldid facet exists docids + // Word Position Docids Merging + // extractor_sender.send_searchable::(word_position_docids).unwrap(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_position_docids, + index.word_position_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } - // TODO This is the normal system - // - [x] Extract fieldid facet number docids - // - [x] Extract fieldid facet string docids + // Fid Word Count Docids Merging + // extractor_sender.send_searchable::(fid_word_count_docids).unwrap(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); + let _entered = span.enter(); + merge_and_send_docids( + fid_word_count_docids, + index.field_id_word_count_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + } - Result::Ok(facet_field_ids_delta) - }); + // run the proximity extraction only if the precision is by word + // this works only if the settings didn't change during this transaction. + let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); + if proximity_precision == ProximityPrecision::ByWord { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); + let _entered = span.enter(); + + let (finished_steps, step_name) = steps::extract_word_proximity(); + + let caches = ::run_extraction(grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + finished_steps, + total_steps, + step_name, + )?; + + merge_and_send_docids( + caches, + index.word_pair_proximity_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + + 'vectors: { + let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); + + let mut index_embeddings = index.embedding_configs(&rtxn)?; + if index_embeddings.is_empty() { + break 'vectors; + } + + let embedding_sender = extractor_sender.embeddings(); + let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); + let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let (finished_steps, step_name) = steps::extract_embeddings(); + extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; + + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { continue 'data; }; + deladd.apply_to(&mut config.user_provided); + } + } + + embedding_sender.finish(index_embeddings).unwrap(); + } + + 'geo: { + let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); + let _entered = span.enter(); + + // let geo_sender = extractor_sender.geo_points(); + let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { + break 'geo; + }; + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + let (finished_steps, step_name) = steps::extract_geo_points(); + extract(document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + finished_steps, + total_steps, + step_name, + )?; + + merge_and_send_rtree( + datastore, + &rtxn, + index, + extractor_sender.geo(), + &indexing_context.must_stop_processing, + )?; + } + + // TODO THIS IS TOO MUCH + // - [ ] Extract fieldid docid facet number + // - [ ] Extract fieldid docid facet string + // - [ ] Extract facetid string fst + // - [ ] Extract facetid normalized string strings + + // TODO Inverted Indexes again + // - [x] Extract fieldid facet isempty docids + // - [x] Extract fieldid facet isnull docids + // - [x] Extract fieldid facet exists docids + + // TODO This is the normal system + // - [x] Extract fieldid facet number docids + // - [x] Extract fieldid facet string docids { let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); @@ -429,7 +439,7 @@ where (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); } - result + Result::Ok(facet_field_ids_delta) })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index c0ff93901..9d0d8e176 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,5 +1,4 @@ use std::cell::RefCell; -use std::io; use hashbrown::HashSet; use heed::types::Bytes; diff --git a/crates/milli/src/update/new/vector_document.rs b/crates/milli/src/update/new/vector_document.rs index 736456f0f..319730db0 100644 --- a/crates/milli/src/update/new/vector_document.rs +++ b/crates/milli/src/update/new/vector_document.rs @@ -286,7 +286,7 @@ impl<'doc> MergedVectorDocument<'doc> { ) -> Result> { let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?; let new_doc = - VectorDocumentFromVersions::new(&external_document_id, versions, doc_alloc, embedders)?; + VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?; Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) }) } diff --git a/crates/milli/src/update/new/word_fst_builder.rs b/crates/milli/src/update/new/word_fst_builder.rs index 834266045..2b1c4604b 100644 --- a/crates/milli/src/update/new/word_fst_builder.rs +++ b/crates/milli/src/update/new/word_fst_builder.rs @@ -1,13 +1,14 @@ +use std::collections::HashSet; use std::io::BufWriter; use fst::{Set, SetBuilder, Streamer}; use memmap2::Mmap; -use std::collections::HashSet; use tempfile::tempfile; -use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result}; - use super::fst_merger_builder::FstMergerBuilder; +use crate::index::PrefixSettings; +use crate::update::del_add::DelAdd; +use crate::{InternalError, Prefix, Result}; pub struct WordFstBuilder<'a> { word_fst_builder: FstMergerBuilder<'a>,