From bcb1aa3d2294aa54d77e581f04b23ccde5cbb553 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 2 Sep 2024 19:39:48 +0200 Subject: [PATCH] Find a temporary solution to par into iter on an HashMap Spoiler: Do not use an HashMap but drain it into a Vec --- Cargo.lock | 3 +- Cargo.toml | 3 + index-scheduler/src/batch.rs | 156 ++++++++++-------- milli/src/fields_ids_map.rs | 2 + milli/src/fields_ids_map/global.rs | 84 ++++++++++ milli/src/update/new/global_fields_ids_map.rs | 65 -------- .../update/new/indexer/document_deletion.rs | 2 +- .../update/new/indexer/document_operation.rs | 12 +- milli/src/update/new/indexer/mod.rs | 66 +++++++- milli/src/update/new/indexer/partial_dump.rs | 6 +- .../update/new/indexer/update_by_function.rs | 5 +- milli/src/update/new/mod.rs | 2 +- 12 files changed, 254 insertions(+), 152 deletions(-) create mode 100644 milli/src/fields_ids_map/global.rs delete mode 100644 milli/src/update/new/global_fields_ids_map.rs diff --git a/Cargo.lock b/Cargo.lock index 281c0ab9d..e169dbd52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4657,8 +4657,7 @@ dependencies = [ [[package]] name = "roaring" version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" +source = "git+https://github.com/RoaringBitmap/roaring-rs?branch=clone-iter-slice#348e58c2312fc37c0f351373cc7338cea86cf828" dependencies = [ "bytemuck", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 0fbfa9b12..3b9219ebc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,3 +64,6 @@ opt-level = 3 opt-level = 3 [profile.bench.package.yada] opt-level = 3 + +[patch.crates-io] +roaring = { git = "https://github.com/RoaringBitmap/roaring-rs", branch = "clone-iter-slice" } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 1a056dde9..6ec2b17bf 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,19 +22,21 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; +use std::sync::RwLock; use dump::IndexMetadata; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; +use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges}; use meilisearch_types::milli::update::{ - IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, + self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, }; use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter, Object}; +use meilisearch_types::milli::{self, Filter, Object, UserError}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -1284,58 +1286,72 @@ impl IndexScheduler { let must_stop_processing = self.must_stop_processing.clone(); let indexer_config = self.index_mapper.indexer_config(); - if let Some(primary_key) = primary_key { - match index.primary_key(index_wtxn)? { - // if a primary key was set AND had already been defined in the index - // but to a different value, we can make the whole batch fail. - Some(pk) => { - if primary_key != pk { - return Err(milli::Error::from( - milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()), - ) - .into()); - } - } - // if the primary key was set and there was no primary key set for this index - // we set it to the received value before starting the indexing process. - None => { - let mut builder = - milli::update::Settings::new(index_wtxn, index, indexer_config); - builder.set_primary_key(primary_key); - builder.execute( - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.clone().get(), - )?; - primary_key_has_been_set = true; - } - } - } + /// TODO manage errors correctly + let rtxn = index.read_txn()?; + let first_addition_uuid = operations + .iter() + .find_map(|op| match op { + DocumentOperation::Add(content_uuid) => Some(content_uuid), + _ => None, + }) + .unwrap(); + let content_file = self.file_store.get_update(*first_addition_uuid)?; + let reader = + DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?; + let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + let primary_key = + guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap(); - let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; + // if let Some(primary_key) = primary_key { + // match index.primary_key(index_wtxn)? { + // // if a primary key was set AND had already been defined in the index + // // but to a different value, we can make the whole batch fail. + // Some(pk) => { + // if primary_key != pk { + // return Err(milli::Error::from( + // milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()), + // ) + // .into()); + // } + // } + // // if the primary key was set and there was no primary key set for this index + // // we set it to the received value before starting the indexing process. + // None => { + // todo!(); + // let mut builder = + // milli::update::Settings::new(index_wtxn, index, indexer_config); + // builder.set_primary_key(primary_key); + // builder.execute( + // |indexing_step| tracing::debug!(update = ?indexing_step), + // || must_stop_processing.clone().get(), + // )?; + // primary_key_has_been_set = true; + // } + // } + // } - let embedder_configs = index.embedding_configs(index_wtxn)?; - // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) - let embedders = self.embedders(embedder_configs)?; + // let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; - let mut builder = milli::update::IndexDocuments::new( - index_wtxn, - index, - indexer_config, - config, - |indexing_step| tracing::trace!(?indexing_step, "Update"), - || must_stop_processing.get(), - )?; + // let embedder_configs = index.embedding_configs(index_wtxn)?; + // // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) + // let embedders = self.embedders(embedder_configs)?; + // let mut builder = milli::update::IndexDocuments::new( + // index_wtxn, + // index, + // indexer_config, + // config, + // |indexing_step| tracing::trace!(?indexing_step, "Update"), + // || must_stop_processing.get(), + // )?; + + let mut indexer = indexer::DocumentOperation::new(method); for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { match operation { DocumentOperation::Add(content_uuid) => { let content_file = self.file_store.get_update(content_uuid)?; - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; - let (new_builder, user_result) = builder.add_documents(reader)?; - builder = new_builder; - - builder = builder.with_embedders(embedders.clone()); + let stats = indexer.add_documents(content_file)?; + // builder = builder.with_embedders(embedders.clone()); let received_documents = if let Some(Details::DocumentAdditionOrUpdate { @@ -1349,30 +1365,17 @@ impl IndexScheduler { unreachable!(); }; - match user_result { - Ok(count) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents, - indexed_documents: Some(count), - }) - } - Err(e) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents, - indexed_documents: Some(0), - }); - task.error = Some(milli::Error::from(e).into()); - } - } + task.status = Status::Succeeded; + task.details = Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(stats.document_count as u64), + }) } DocumentOperation::Delete(document_ids) => { - let (new_builder, user_result) = - builder.remove_documents(document_ids)?; - builder = new_builder; + let count = document_ids.len(); + indexer.delete_documents(document_ids); // Uses Invariant: remove documents actually always returns Ok for the inner result - let count = user_result.unwrap(); + // let count = user_result.unwrap(); let provided_ids = if let Some(Details::DocumentDeletion { provided_ids, .. }) = task.details @@ -1386,15 +1389,26 @@ impl IndexScheduler { task.status = Status::Succeeded; task.details = Some(Details::DocumentDeletion { provided_ids, - deleted_documents: Some(count), + deleted_documents: Some(count as u64), }); } } } if !tasks.iter().all(|res| res.error.is_some()) { - let addition = builder.execute()?; - tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); + let mut fields_ids_map = index.fields_ids_map(&rtxn)?; + /// TODO create a pool if needed + // let pool = indexer_config.thread_pool.unwrap(); + let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); + // let fields_ids_map = RwLock::new(fields_ids_map); + let param = (index, &rtxn, &mut fields_ids_map, &primary_key); + let document_changes = indexer.document_changes(param)?; + indexer::index(index_wtxn, index, &pool, document_changes)?; + + /// TODO we must store it or not? + let fields_ids_map = fields_ids_map; + + // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } else if primary_key_has_been_set { // Everything failed but we've set a primary key. // We need to remove it. diff --git a/milli/src/fields_ids_map.rs b/milli/src/fields_ids_map.rs index f9d7c3704..39d67f20c 100644 --- a/milli/src/fields_ids_map.rs +++ b/milli/src/fields_ids_map.rs @@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize}; use crate::FieldId; +mod global; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FieldsIdsMap { names_ids: BTreeMap, diff --git a/milli/src/fields_ids_map/global.rs b/milli/src/fields_ids_map/global.rs new file mode 100644 index 000000000..857d13a2a --- /dev/null +++ b/milli/src/fields_ids_map/global.rs @@ -0,0 +1,84 @@ +use std::collections::BTreeMap; +use std::sync::RwLock; + +use crate::{FieldId, FieldsIdsMap}; + +/// A fields ids map that can be globally updated to add fields +pub struct GlobalFieldsIdsMap<'indexing> { + global: &'indexing RwLock, + local: LocalFieldsIdsMap, +} + +struct LocalFieldsIdsMap { + names_ids: BTreeMap, + ids_names: BTreeMap, +} + +impl LocalFieldsIdsMap { + fn new(global: &RwLock) -> Self { + let global = global.read().unwrap(); + Self { names_ids: global.names_ids.clone(), ids_names: global.ids_names.clone() } + } + + fn insert(&mut self, name: &str, field_id: FieldId) { + self.names_ids.insert(name.to_owned(), field_id); + self.ids_names.insert(field_id, name.to_owned()); + } + + fn name(&self, id: FieldId) -> Option<&str> { + self.ids_names.get(&id).map(String::as_str) + } + + fn id(&self, name: &str) -> Option { + self.names_ids.get(name).copied() + } +} + +impl<'indexing> GlobalFieldsIdsMap<'indexing> { + pub fn new(global: &'indexing RwLock) -> Self { + Self { local: LocalFieldsIdsMap::new(global), global } + } + + /// Returns the field id related to a field name, it will create a new field id if the + /// name is not already known. Returns `None` if the maximum field id as been reached. + pub fn id_or_insert(&mut self, name: &str) -> Option { + if let Some(field_id) = self.local.id(name) { + return Some(field_id); + } + + { + // optimistically lookup the global map + let global = self.global.read().unwrap(); + + if let Some(field_id) = global.id(name) { + self.local.insert(name, field_id); + return Some(field_id); + } + } + + { + let mut global = self.global.write().unwrap(); + + if let Some(field_id) = global.id(name) { + self.local.insert(name, field_id); + return Some(field_id); + } + + let field_id = global.insert(name)?; + self.local.insert(name, field_id); + Some(field_id) + } + } + + /// Get the name of a field based on its id. + pub fn name(&mut self, id: FieldId) -> Option<&str> { + if self.local.name(id).is_none() { + let global = self.global.read().unwrap(); + + let name = global.name(id)?; + self.local.insert(name, id); + } + + self.local.name(id) + } +} diff --git a/milli/src/update/new/global_fields_ids_map.rs b/milli/src/update/new/global_fields_ids_map.rs deleted file mode 100644 index 4bd7b27d9..000000000 --- a/milli/src/update/new/global_fields_ids_map.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::sync::{Arc, RwLock}; - -use crate::{FieldId, FieldsIdsMap}; - -/// A fields ids map that can be globally updated to add fields -pub struct GlobalFieldsIdsMap { - global: Arc>, - local: FieldsIdsMap, -} - -impl GlobalFieldsIdsMap { - pub fn new(global: FieldsIdsMap) -> Self { - Self { local: global.clone(), global: Arc::new(RwLock::new(global)) } - } - - /// Returns the number of fields ids in the map. - pub fn global_len(&self) -> usize { - todo!() - } - - /// Returns `true` if the map is empty. - pub fn global_is_empty(&self) -> bool { - todo!() - } - - /// Returns the field id related to a field name, it will create a new field id if the - /// name is not already known. Returns `None` if the maximum field id as been reached. - pub fn insert(&mut self, name: &str) -> Option { - match self.names_ids.get(name) { - Some(id) => Some(*id), - None => { - let id = self.next_id?; - self.next_id = id.checked_add(1); - self.names_ids.insert(name.to_owned(), id); - self.ids_names.insert(id, name.to_owned()); - Some(id) - } - } - } - - /// Get the id of a field based on its name. - pub fn id(&self, name: &str) -> Option { - self.names_ids.get(name).copied() - } - - /// Get the name of a field based on its id. - pub fn name(&self, id: FieldId) -> Option<&str> { - self.ids_names.get(&id).map(String::as_str) - } - - /// Iterate over the ids and names in the ids order. - pub fn iter(&self) -> impl Iterator { - self.ids_names.iter().map(|(id, name)| (*id, name.as_str())) - } - - /// Iterate over the ids in the order of the ids. - pub fn ids(&'_ self) -> impl Iterator + '_ { - self.ids_names.keys().copied() - } - - /// Iterate over the names in the order of the ids. - pub fn names(&self) -> impl Iterator { - self.ids_names.values().map(AsRef::as_ref) - } -} diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 5e43b5816..3444d58f7 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -28,7 +28,7 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { fn document_changes( self, param: Self::Parameter, - ) -> Result> + 'p> { + ) -> Result> + Clone + 'p> { let (index, fields, primary_key) = param; let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 26228c354..568df654e 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -34,6 +34,7 @@ pub struct PayloadStats { pub bytes: u64, } +#[derive(Clone)] enum InnerDocOp { Addition(DocumentOffset), Deletion, @@ -41,6 +42,7 @@ enum InnerDocOp { /// Represents an offset where a document lives /// in an mmapped grenad reader file. +#[derive(Clone)] pub struct DocumentOffset { /// The mmapped grenad reader file. pub content: Arc, // grenad::Reader @@ -76,7 +78,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { fn document_changes( self, param: Self::Parameter, - ) -> Result> + 'p> { + ) -> Result> + Clone + 'p> { let (index, rtxn, fields_ids_map, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; @@ -170,6 +172,11 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { } } + /// TODO is it the best way to provide FieldsIdsMap to the parallel iterator? + let fields_ids_map = fields_ids_map.clone(); + // We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone + let docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); + Ok(docids_version_offsets .into_par_iter() .map_with( @@ -177,6 +184,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { move |context_pool, (external_docid, (internal_docid, operations))| { context_pool.with(|rtxn| { use IndexDocumentsMethod as Idm; + let document_merge_function = match self.index_documents_method { Idm::ReplaceDocuments => merge_document_for_replacements, Idm::UpdateDocuments => merge_document_for_updates, @@ -185,7 +193,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { document_merge_function( rtxn, index, - fields_ids_map, + &fields_ids_map, internal_docid, external_docid, &operations, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index ba4356288..ca5bb71eb 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,9 +1,10 @@ +use std::fs::File; use std::thread::{self, Builder}; use big_s::S; pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; -use heed::RwTxn; +use heed::{RoTxn, RwTxn}; pub use partial_dump::PartialDump; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::ThreadPool; @@ -15,7 +16,11 @@ use super::channel::{ }; use super::document_change::DocumentChange; use super::merger::merge_grenad_entries; -use crate::{Index, Result}; +use super::StdResult; +use crate::documents::{ + obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY, +}; +use crate::{Index, Result, UserError}; mod document_deletion; mod document_operation; @@ -28,7 +33,7 @@ pub trait DocumentChanges<'p> { fn document_changes( self, param: Self::Parameter, - ) -> Result> + 'p>; + ) -> Result> + Clone + 'p>; } /// This is the main function of this crate. @@ -40,7 +45,7 @@ pub fn index( wtxn: &mut RwTxn, index: &Index, pool: &ThreadPool, - document_changes: PI, + _document_changes: PI, ) -> Result<()> where PI: IntoParallelIterator> + Send, @@ -88,3 +93,56 @@ where Ok(()) }) } + +/// TODO move this elsewhere +pub fn guess_primary_key<'a>( + rtxn: &'a RoTxn<'a>, + index: &Index, + mut cursor: DocumentsBatchCursor, + documents_batch_index: &'a DocumentsBatchIndex, +) -> Result, UserError>> { + // The primary key *field id* that has already been set for this index or the one + // we will guess by searching for the first key that contains "id" as a substring. + match index.primary_key(rtxn)? { + Some(primary_key) => match PrimaryKey::new(primary_key, documents_batch_index) { + Some(primary_key) => Ok(Ok(primary_key)), + None => match cursor.next_document()? { + Some(first_document) => Ok(Err(UserError::MissingDocumentId { + primary_key: primary_key.to_string(), + document: obkv_to_object(first_document, documents_batch_index)?, + })), + None => unreachable!("Called with reader.is_empty()"), + }, + }, + None => { + let mut guesses: Vec<(u16, &str)> = documents_batch_index + .iter() + .filter(|(_, name)| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY)) + .map(|(field_id, name)| (*field_id, name.as_str())) + .collect(); + + // sort the keys in a deterministic, obvious way, so that fields are always in the same order. + guesses.sort_by(|(_, left_name), (_, right_name)| { + // shortest name first + left_name.len().cmp(&right_name.len()).then_with( + // then alphabetical order + || left_name.cmp(right_name), + ) + }); + + match guesses.as_slice() { + [] => Ok(Err(UserError::NoPrimaryKeyCandidateFound)), + [(field_id, name)] => { + tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); + Ok(Ok(PrimaryKey::Flat { name, field_id: *field_id })) + } + multiple => Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { + candidates: multiple + .iter() + .map(|(_, candidate)| candidate.to_string()) + .collect(), + })), + } + } + } +} diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index d324322a7..6699a6ba7 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -7,7 +7,7 @@ use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; pub struct PartialDump { - pub iter: I, + iter: I, } impl PartialDump { @@ -19,7 +19,7 @@ impl PartialDump { impl<'p, I> DocumentChanges<'p> for PartialDump where I: IntoIterator, - I::IntoIter: Send + 'p, + I::IntoIter: Send + Clone + 'p, I::Item: Send, { type Parameter = (&'p FieldsIdsMap, &'p ConcurrentAvailableIds, &'p PrimaryKey<'p>); @@ -31,7 +31,7 @@ where fn document_changes( self, param: Self::Parameter, - ) -> Result> + 'p> { + ) -> Result> + Clone + 'p> { let (fields_ids_map, concurrent_available_ids, primary_key) = param; Ok(self.iter.into_iter().par_bridge().map(|object| { diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index 91e1fd4ee..fc908e31a 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -12,8 +12,7 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { fn document_changes( self, _param: Self::Parameter, - ) -> Result> + 'p> { - todo!(); - Ok(vec![].into_par_iter()) + ) -> Result> + Clone + 'p> { + Ok((0..100).into_par_iter().map(|_| todo!())) } } diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index cd94bd5d2..ad61d8343 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -7,8 +7,8 @@ use crate::FieldId; mod document_change; mod merger; // mod extract; -// mod global_fields_ids_map; mod channel; +//mod global_fields_ids_map; pub mod indexer; mod items_pool;