diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 6ec2b17bf..ecb44fc14 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -36,7 +36,7 @@ use meilisearch_types::milli::update::{ use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter, Object, UserError}; +use meilisearch_types::milli::{self, Filter, GlobalFieldsIdsMap, Object, UserError}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -1302,49 +1302,6 @@ impl IndexScheduler { let primary_key = guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap(); - // if let Some(primary_key) = primary_key { - // match index.primary_key(index_wtxn)? { - // // if a primary key was set AND had already been defined in the index - // // but to a different value, we can make the whole batch fail. - // Some(pk) => { - // if primary_key != pk { - // return Err(milli::Error::from( - // milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()), - // ) - // .into()); - // } - // } - // // if the primary key was set and there was no primary key set for this index - // // we set it to the received value before starting the indexing process. - // None => { - // todo!(); - // let mut builder = - // milli::update::Settings::new(index_wtxn, index, indexer_config); - // builder.set_primary_key(primary_key); - // builder.execute( - // |indexing_step| tracing::debug!(update = ?indexing_step), - // || must_stop_processing.clone().get(), - // )?; - // primary_key_has_been_set = true; - // } - // } - // } - - // let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; - - // let embedder_configs = index.embedding_configs(index_wtxn)?; - // // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) - // let embedders = self.embedders(embedder_configs)?; - - // let mut builder = milli::update::IndexDocuments::new( - // index_wtxn, - // index, - // indexer_config, - // config, - // |indexing_step| tracing::trace!(?indexing_step, "Update"), - // || must_stop_processing.get(), - // )?; - let mut indexer = indexer::DocumentOperation::new(method); for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { match operation { @@ -1401,12 +1358,10 @@ impl IndexScheduler { // let pool = indexer_config.thread_pool.unwrap(); let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); // let fields_ids_map = RwLock::new(fields_ids_map); - let param = (index, &rtxn, &mut fields_ids_map, &primary_key); - let document_changes = indexer.document_changes(param)?; - indexer::index(index_wtxn, index, &pool, document_changes)?; - - /// TODO we must store it or not? - let fields_ids_map = fields_ids_map; + let param = (index, &rtxn, &primary_key); + let document_changes = indexer.document_changes(&mut fields_ids_map, param)?; + /// TODO pass/write the FieldsIdsMap + indexer::index(index_wtxn, index, fields_ids_map, &pool, document_changes)?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } else if primary_key_has_been_set { diff --git a/milli/src/fields_ids_map.rs b/milli/src/fields_ids_map.rs index 39d67f20c..52e02045d 100644 --- a/milli/src/fields_ids_map.rs +++ b/milli/src/fields_ids_map.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use crate::FieldId; mod global; +pub use global::GlobalFieldsIdsMap; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FieldsIdsMap { diff --git a/milli/src/fields_ids_map/global.rs b/milli/src/fields_ids_map/global.rs index 857d13a2a..93908aea8 100644 --- a/milli/src/fields_ids_map/global.rs +++ b/milli/src/fields_ids_map/global.rs @@ -4,11 +4,13 @@ use std::sync::RwLock; use crate::{FieldId, FieldsIdsMap}; /// A fields ids map that can be globally updated to add fields +#[derive(Debug, Clone)] pub struct GlobalFieldsIdsMap<'indexing> { global: &'indexing RwLock, local: LocalFieldsIdsMap, } +#[derive(Debug, Clone)] struct LocalFieldsIdsMap { names_ids: BTreeMap, ids_names: BTreeMap, diff --git a/milli/src/lib.rs b/milli/src/lib.rs index 8b2468bea..45418c074 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -55,7 +55,7 @@ pub use self::error::{ }; pub use self::external_documents_ids::ExternalDocumentsIds; pub use self::fieldids_weights_map::FieldidsWeightsMap; -pub use self::fields_ids_map::FieldsIdsMap; +pub use self::fields_ids_map::{FieldsIdsMap, GlobalFieldsIdsMap}; pub use self::heed_codec::{ BEU16StrCodec, BEU32StrCodec, BoRoaringBitmapCodec, BoRoaringBitmapLenCodec, CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldIdWordCountCodec, ObkvCodec, diff --git a/milli/src/update/new/extract/extract_word_docids.rs b/milli/src/update/new/extract/extract_word_docids.rs index e2261748a..1f52ee086 100644 --- a/milli/src/update/new/extract/extract_word_docids.rs +++ b/milli/src/update/new/extract/extract_word_docids.rs @@ -1,32 +1,20 @@ use std::fs::File; use charabia::TokenizerBuilder; -use grenad::Merger; -use grenad::ReaderCursor; +use grenad::{Merger, ReaderCursor}; use heed::RoTxn; -use rayon::iter::IntoParallelIterator; -use rayon::iter::ParallelBridge; -use rayon::iter::ParallelIterator; +use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; -use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{ - update::{ - create_sorter, - new::{DocumentChange, ItemsPool}, - GrenadParameters, - }, - FieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE, -}; - -use super::{ - cache::{CachedSorter, DelAddRoaringBitmapMerger}, - tokenize_document::DocumentTokenizer, -}; +use super::cache::CachedSorter; +use super::tokenize_document::DocumentTokenizer; +use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub trait SearchableExtractor { fn run_extraction( index: &Index, - fields_ids_map: &FieldsIdsMap, + fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, document_changes: impl IntoParallelIterator>, ) -> Result> { @@ -62,12 +50,13 @@ pub trait SearchableExtractor { Ok(( index.read_txn()?, &document_tokenizer, + fields_ids_map.clone(), CachedSorter::new( // TODO use a better value 100.try_into().unwrap(), create_sorter( grenad::SortAlgorithm::Stable, - DelAddRoaringBitmapMerger, + MergeDeladdCboRoaringBitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -78,12 +67,12 @@ pub trait SearchableExtractor { }); document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, cached_sorter)| { + context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { Self::extract_document_change( &*rtxn, index, document_tokenizer, - &fields_ids_map, + fields_ids_map, cached_sorter, document_change?, ) @@ -91,7 +80,7 @@ pub trait SearchableExtractor { })?; let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); - for (_rtxn, _tokenizer, cache) in context_pool.into_items() { + for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { let sorter = cache.into_sorter()?; let readers = sorter.into_reader_cursors()?; builder.extend(readers); @@ -104,8 +93,8 @@ pub trait SearchableExtractor { rtxn: &RoTxn, index: &Index, document_tokenizer: &DocumentTokenizer, - fields_ids_map: &FieldsIdsMap, - cached_sorter: &mut CachedSorter, + fields_ids_map: &mut GlobalFieldsIdsMap, + cached_sorter: &mut CachedSorter, document_change: DocumentChange, ) -> Result<()>; } @@ -116,9 +105,8 @@ impl SearchableExtractor for WordDocidsExtractor { rtxn: &RoTxn, index: &Index, document_tokenizer: &DocumentTokenizer, - fields_ids_map: &FieldsIdsMap, - // TODO: DelAddRoaringBitmapMerger should be CBO - cached_sorter: &mut CachedSorter, + fields_ids_map: &mut GlobalFieldsIdsMap, + cached_sorter: &mut CachedSorter, document_change: DocumentChange, ) -> crate::Result<()> { match document_change { diff --git a/milli/src/update/new/extract/tokenize_document.rs b/milli/src/update/new/extract/tokenize_document.rs index 40f0b4374..9f0a1c4d8 100644 --- a/milli/src/update/new/extract/tokenize_document.rs +++ b/milli/src/update/new/extract/tokenize_document.rs @@ -1,11 +1,14 @@ -use crate::{ - update::new::KvReaderFieldId, FieldId, FieldsIdsMap, Index, InternalError, - LocalizedAttributesRule, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH, -}; +use std::collections::HashMap; + use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; use heed::RoTxn; use serde_json::Value; -use std::collections::HashMap; + +use crate::update::new::KvReaderFieldId; +use crate::{ + FieldId, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, LocalizedAttributesRule, + Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH, +}; pub struct DocumentTokenizer<'a> { pub tokenizer: &'a Tokenizer<'a>, @@ -18,18 +21,24 @@ impl<'a> DocumentTokenizer<'a> { pub fn tokenize_document( &self, obkv: &KvReaderFieldId, - field_id_map: &FieldsIdsMap, + field_id_map: &mut GlobalFieldsIdsMap, token_fn: &mut impl FnMut(FieldId, u16, &str), ) -> Result<()> { let mut field_position = HashMap::new(); + let mut field_name = String::new(); for (field_id, field_bytes) in obkv { - let Some(field_name) = field_id_map.name(field_id) else { + let Some(field_name) = field_id_map.name(field_id).map(|s| { + field_name.clear(); + field_name.push_str(s); + &field_name + }) else { unreachable!("field id not found in field id map"); }; let mut tokenize_field = |name: &str, value: &Value| { - let Some(field_id) = field_id_map.id(name) else { - unreachable!("field name not found in field id map"); + let Some(field_id) = field_id_map.id_or_insert(name) else { + /// TODO: better error + panic!("it's over 9000"); }; let position = @@ -75,7 +84,7 @@ impl<'a> DocumentTokenizer<'a> { // if the current field is searchable or contains a searchable attribute if self.searchable_attributes.map_or(true, |attributes| { - attributes.iter().any(|name| perm_json_p::contained_in(name, field_name)) + attributes.iter().any(|name| perm_json_p::contained_in(name, &field_name)) }) { // parse json. match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? { @@ -224,11 +233,12 @@ mod perm_json_p { #[cfg(test)] mod test { - use super::*; use charabia::TokenizerBuilder; use meili_snap::snapshot; use obkv::KvReader; use serde_json::json; + + use super::*; #[test] fn test_tokenize_document() { let mut fields_ids_map = FieldsIdsMap::new(); diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 3444d58f7..b4336c14a 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -27,6 +27,7 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { fn document_changes( self, + _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result> + Clone + 'p> { let (index, fields, primary_key) = param; diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index f5dcfcfe6..c54ffd140 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -73,13 +73,14 @@ impl DocumentOperation { } impl<'p> DocumentChanges<'p> for DocumentOperation { - type Parameter = (&'p Index, &'p RoTxn<'p>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>); + type Parameter = (&'p Index, &'p RoTxn<'p>, &'p PrimaryKey<'p>); fn document_changes( self, + fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result> + Clone + 'p> { - let (index, rtxn, fields_ids_map, primary_key) = param; + let (index, rtxn, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; let mut available_docids = AvailableIds::new(&documents_ids); @@ -174,7 +175,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { /// TODO is it the best way to provide FieldsIdsMap to the parallel iterator? let fields_ids_map = fields_ids_map.clone(); - // We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone + // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); Ok(docids_version_offsets diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index ebbb8582c..50bb5a401 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,5 @@ use std::fs::File; +use std::sync::RwLock; use std::thread::{self, Builder}; use big_s::S; @@ -22,7 +23,7 @@ use crate::documents::{ obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY, }; use crate::update::GrenadParameters; -use crate::{Index, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod document_deletion; mod document_operation; @@ -34,6 +35,7 @@ pub trait DocumentChanges<'p> { fn document_changes( self, + fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result> + Clone + 'p>; } @@ -46,6 +48,7 @@ pub trait DocumentChanges<'p> { pub fn index( wtxn: &mut RwTxn, index: &Index, + fields_ids_map: FieldsIdsMap, pool: &ThreadPool, document_changes: PI, ) -> Result<()> @@ -57,6 +60,9 @@ where let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } = extractors_merger_channels(100); + let fields_ids_map_lock = RwLock::new(fields_ids_map); + let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); + thread::scope(|s| { // TODO manage the errors correctly let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { @@ -65,7 +71,7 @@ where // word docids let merger = WordDocidsExtractor::run_extraction( index, - todo!(), + &global_fields_ids_map, /// TODO: GrenadParameters::default() should be removed in favor a passed parameter GrenadParameters::default(), document_changes.clone(), @@ -100,8 +106,13 @@ where handle.join().unwrap()?; handle2.join().unwrap()?; - Ok(()) - }) + Ok(()) as Result<_> + })?; + + let fields_ids_map = fields_ids_map_lock.into_inner().unwrap(); + index.put_fields_ids_map(wtxn, &fields_ids_map)?; + + Ok(()) } /// TODO move this elsewhere diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 6699a6ba7..fe49ffdd7 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -30,6 +30,7 @@ where /// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). fn document_changes( self, + _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result> + Clone + 'p> { let (fields_ids_map, concurrent_available_ids, primary_key) = param; diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index fc908e31a..36ff432f8 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -2,7 +2,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::DocumentChanges; use crate::update::new::DocumentChange; -use crate::Result; +use crate::{FieldsIdsMap, Result}; pub struct UpdateByFunction; @@ -11,6 +11,7 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { fn document_changes( self, + _fields_ids_map: &mut FieldsIdsMap, _param: Self::Parameter, ) -> Result> + Clone + 'p> { Ok((0..100).into_par_iter().map(|_| todo!()))