From da01511fff35696ee86c88cc265f662e5bf396fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 14 Nov 2024 18:16:42 +0100 Subject: [PATCH] Fix a lot of primary key related tests --- crates/index-scheduler/src/batch.rs | 43 +-------- crates/index-scheduler/src/lib.rs | 9 +- .../update/new/indexer/document_operation.rs | 90 +++++++++++++++---- 3 files changed, 84 insertions(+), 58 deletions(-) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 43c5e5df6..0ebd2d120 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -32,9 +32,7 @@ use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; -use meilisearch_types::milli::update::new::indexer::{ - self, retrieve_or_guess_primary_key, UpdateByFunction, -}; +use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSettings}; use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, @@ -43,7 +41,6 @@ use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; -use raw_collections::RawMap; use roaring::RoaringBitmap; use time::macros::format_description; use time::OffsetDateTime; @@ -1278,16 +1275,6 @@ impl IndexScheduler { // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // this is made difficult by the fact we're doing private clones of the index scheduler and sending it // to a fresh thread. - - /// TODO manage errors correctly - let first_addition_uuid = operations - .iter() - .find_map(|op| match op { - DocumentOperation::Add(content_uuid) => Some(content_uuid), - _ => None, - }) - .unwrap(); - let mut content_files = Vec::new(); for operation in &operations { if let DocumentOperation::Add(content_uuid) = operation { @@ -1303,28 +1290,6 @@ impl IndexScheduler { let db_fields_ids_map = index.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); - let first_document = match content_files.first() { - Some(mmap) => { - let mut iter = serde_json::Deserializer::from_slice(mmap).into_iter(); - iter.next().transpose().map_err(|e| e.into()).map_err(Error::IoError)? - } - None => None, - }; - - let (primary_key, primary_key_has_been_set) = retrieve_or_guess_primary_key( - &rtxn, - index, - &mut new_fields_ids_map, - primary_key.as_deref(), - first_document - .map(|raw| RawMap::from_raw_value(raw, &indexer_alloc)) - .transpose() - .map_err(|error| { - milli::Error::UserError(milli::UserError::SerdeJson(error)) - })?, - )? - .map_err(milli::Error::from)?; - let indexer_config = self.index_mapper.indexer_config(); let mut content_files_iter = content_files.iter(); let mut indexer = indexer::DocumentOperation::new(method); @@ -1356,11 +1321,11 @@ impl IndexScheduler { } }; - let (document_changes, operation_stats) = indexer.into_changes( + let (document_changes, operation_stats, primary_key) = indexer.into_changes( &indexer_alloc, index, &rtxn, - &primary_key, + primary_key.as_deref(), &mut new_fields_ids_map, )?; @@ -1403,7 +1368,7 @@ impl IndexScheduler { index, &db_fields_ids_map, new_fields_ids_map, - primary_key_has_been_set.then_some(primary_key), + primary_key, &document_changes, embedders, &|| must_stop_processing.get(), diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index b57a0fe9f..83431d45c 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -4296,11 +4296,11 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed"); // The second batch should fail. - handle.advance_one_failed_batch(); + handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails"); // The second batch should fail. - handle.advance_one_failed_batch(); + handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails"); // Is the primary key still what we expect? @@ -4361,7 +4361,7 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed"); // The second batch should fail and contains two tasks. - handle.advance_one_failed_batch(); + handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails"); // Is the primary key still what we expect? @@ -4440,7 +4440,8 @@ mod tests { snapshot!(primary_key, @"id"); // We're trying to `bork` again, but now there is already a primary key set for this index. - handle.advance_one_failed_batch(); + // NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed. + handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_task_fails"); // Finally the last task should succeed since its primary key is the same as the valid one. diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 0b586a795..634a7f207 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -3,12 +3,14 @@ use bumpalo::Bump; use hashbrown::hash_map::Entry; use heed::RoTxn; use memmap2::Mmap; +use raw_collections::RawMap; use rayon::slice::ParallelSlice; use serde_json::value::RawValue; use serde_json::Deserializer; use super::super::document_change::DocumentChange; use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; +use super::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::update::new::document::Versions; use crate::update::new::{Deletion, Insertion, Update}; @@ -41,16 +43,17 @@ impl<'pl> DocumentOperation<'pl> { self, indexer: &'pl Bump, index: &Index, - rtxn: &RoTxn, - primary_key: &PrimaryKey, + rtxn: &'pl RoTxn<'pl>, + primary_key_from_op: Option<&'pl str>, new_fields_ids_map: &mut FieldsIdsMap, - ) -> Result<(DocumentOperationChanges<'pl>, Vec)> { + ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> { let Self { operations, method } = self; let documents_ids = index.documents_ids(rtxn)?; let mut operations_stats = Vec::new(); let mut available_docids = AvailableIds::new(&documents_ids); let mut docids_version_offsets = hashbrown::HashMap::new(); + let mut primary_key = None; for operation in operations { let (bytes, document_count, result) = match operation { @@ -58,7 +61,8 @@ impl<'pl> DocumentOperation<'pl> { indexer, index, rtxn, - primary_key, + primary_key_from_op, + &mut primary_key, new_fields_ids_map, &mut available_docids, &docids_version_offsets, @@ -98,30 +102,30 @@ impl<'pl> DocumentOperation<'pl> { docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations)); let docids_version_offsets = docids_version_offsets.into_bump_slice(); - Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats)) + Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) } } -fn extract_addition_payload_changes<'s, 'pl: 's>( +fn extract_addition_payload_changes<'r, 'pl: 'r>( indexer: &'pl Bump, index: &Index, - rtxn: &RoTxn, - primary_key: &PrimaryKey, - fields_ids_map: &mut FieldsIdsMap, + rtxn: &'r RoTxn<'r>, + primary_key_from_op: Option<&'r str>, + primary_key: &mut Option>, + new_fields_ids_map: &mut FieldsIdsMap, available_docids: &mut AvailableIds, - main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, + main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, method: MergeMethod, payload: &'pl [u8], -) -> (u64, u64, Result>>) { +) -> (u64, u64, Result>>) { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); /// TODO manage the error let mut previous_offset = 0; let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>(); loop { - let doc = match iter.next().transpose() { - Ok(Some(doc)) => doc, - Ok(None) => break, + let optdoc = match iter.next().transpose() { + Ok(optdoc) => optdoc, Err(e) => { return ( payload.len() as u64, @@ -131,7 +135,63 @@ fn extract_addition_payload_changes<'s, 'pl: 's>( } }; - let external_id = match primary_key.extract_fields_and_docid(doc, fields_ids_map, indexer) { + // Only guess the primary key if it is the first document + let retrieved_primary_key = if previous_offset == 0 { + let optdoc = match optdoc { + Some(doc) => match RawMap::from_raw_value(doc, indexer) { + Ok(docmap) => Some(docmap), + Err(error) => { + return ( + payload.len() as u64, + new_docids_version_offsets.len() as u64, + Err(Error::UserError(UserError::SerdeJson(error))), + ) + } + }, + None => None, + }; + + let result = retrieve_or_guess_primary_key( + rtxn, + index, + new_fields_ids_map, + primary_key_from_op, + optdoc, + ); + + let (pk, _has_been_changed) = match result { + Ok(Ok(pk)) => pk, + Ok(Err(user_error)) => { + return ( + payload.len() as u64, + new_docids_version_offsets.len() as u64, + Err(Error::UserError(user_error)), + ) + } + Err(error) => { + return ( + payload.len() as u64, + new_docids_version_offsets.len() as u64, + Err(error), + ) + } + }; + + primary_key.get_or_insert(pk) + } else { + primary_key.as_ref().unwrap() + }; + + let doc = match optdoc { + Some(doc) => doc, + None => break, + }; + + let external_id = match retrieved_primary_key.extract_fields_and_docid( + doc, + new_fields_ids_map, + indexer, + ) { Ok(edi) => edi, Err(e) => { return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e))