From 3e9198ebaa04fe0865aaed582a88f55be3388fd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 11 Sep 2024 15:59:30 +0200 Subject: [PATCH] Support guessing primary key again --- index-scheduler/src/batch.rs | 47 ++++++++------- milli/src/update/new/indexer/mod.rs | 57 ++++++++----------- milli/src/update/new/indexer/top_level_map.rs | 30 +++++++++- 3 files changed, 79 insertions(+), 55 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 506ba6581..f9463a137 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -29,14 +29,17 @@ use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; -use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges}; +use meilisearch_types::milli::update::new::indexer::{ + self, retrieve_or_guess_primary_key, DocumentChanges, +}; +use meilisearch_types::milli::update::new::TopLevelMap; use meilisearch_types::milli::update::{ IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, }; use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter, Object}; +use meilisearch_types::milli::{self, Filter, InternalError, Object}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -1296,22 +1299,34 @@ impl IndexScheduler { }) .unwrap(); - // let content_file = self.file_store.get_update(*first_addition_uuid)?; - // let reader = - // DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?; - // let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); - // let primary_key = - // guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap(); - let mut content_files = Vec::new(); for operation in &operations { if let DocumentOperation::Add(content_uuid) = operation { let content_file = self.file_store.get_update(*content_uuid)?; let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; - content_files.push(mmap); + if !mmap.is_empty() { + content_files.push(mmap); + } } } + let mut fields_ids_map = index.fields_ids_map(&rtxn)?; + let first_document = match content_files.first() { + Some(mmap) => { + let mut iter = serde_json::Deserializer::from_slice(mmap).into_iter(); + iter.next().transpose().map_err(|e| e.into()).map_err(Error::IoError)? + } + None => None, + }; + + let primary_key = retrieve_or_guess_primary_key( + &rtxn, + index, + &mut fields_ids_map, + first_document.as_ref(), + )? + .unwrap(); + let mut content_files_iter = content_files.iter(); let mut indexer = indexer::DocumentOperation::new(method); for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { @@ -1364,21 +1379,9 @@ impl IndexScheduler { } if !tasks.iter().all(|res| res.error.is_some()) { - let mut fields_ids_map = index.fields_ids_map(&rtxn)?; /// TODO create a pool if needed // let pool = indexer_config.thread_pool.unwrap(); let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); - // let fields_ids_map = RwLock::new(fields_ids_map); - - /// TODO correctly guess the primary key in a NDJSON - let pk = match std::env::var("MEILI_PRIMARY_KEY") { - Ok(pk) => pk, - Err(VarError::NotPresent) => "id".to_string(), - Err(e) => panic!("primary key error: {e}"), - }; - - fields_ids_map.insert(&pk); - let primary_key = PrimaryKey::new(&pk, &fields_ids_map).unwrap(); let param = (index, &rtxn, &primary_key); let document_changes = indexer.document_changes(&mut fields_ids_map, param)?; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 5ef3439cc..0273d4fe2 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -22,7 +22,7 @@ use crate::documents::{ }; use crate::update::new::channel::{DatabaseType, ExtractorSender}; use crate::update::GrenadParameters; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; mod document_deletion; mod document_operation; @@ -242,53 +242,46 @@ fn extract_and_send_docids( Ok(sender.send_searchable::(merger).unwrap()) } +/// Returns the primary key *field id* that has already been set for this index or the +/// one we will guess by searching for the first key that contains "id" as a substring. /// TODO move this elsewhere -pub fn guess_primary_key<'a>( +pub fn retrieve_or_guess_primary_key<'a>( rtxn: &'a RoTxn<'a>, index: &Index, - mut cursor: DocumentsBatchCursor, - documents_batch_index: &'a DocumentsBatchIndex, + fields_ids_map: &mut FieldsIdsMap, + first_document: Option<&'a TopLevelMap<'_>>, ) -> Result, UserError>> { - // The primary key *field id* that has already been set for this index or the one - // we will guess by searching for the first key that contains "id" as a substring. match index.primary_key(rtxn)? { - Some(primary_key) => match PrimaryKey::new(primary_key, documents_batch_index) { + Some(primary_key) => match PrimaryKey::new(primary_key, fields_ids_map) { Some(primary_key) => Ok(Ok(primary_key)), - None => match cursor.next_document()? { - Some(first_document) => Ok(Err(UserError::MissingDocumentId { - primary_key: primary_key.to_string(), - document: obkv_to_object(first_document, documents_batch_index)?, - })), - None => unreachable!("Called with reader.is_empty()"), - }, + None => unreachable!("Why is the primary key not in the fidmap?"), }, None => { - let mut guesses: Vec<(u16, &str)> = documents_batch_index - .iter() - .filter(|(_, name)| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY)) - .map(|(field_id, name)| (*field_id, name.as_str())) + let first_document = match first_document { + Some(document) => document, + None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), + }; + + let mut guesses: Vec<&str> = first_document + .keys() + .map(AsRef::as_ref) + .filter(|name| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY)) .collect(); - // sort the keys in a deterministic, obvious way, so that fields are always in the same order. - guesses.sort_by(|(_, left_name), (_, right_name)| { - // shortest name first - left_name.len().cmp(&right_name.len()).then_with( - // then alphabetical order - || left_name.cmp(right_name), - ) - }); + // sort the keys in lexicographical order, so that fields are always in the same order. + guesses.sort_unstable(); match guesses.as_slice() { [] => Ok(Err(UserError::NoPrimaryKeyCandidateFound)), - [(field_id, name)] => { + [name] => { tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); - Ok(Ok(PrimaryKey::Flat { name, field_id: *field_id })) + match fields_ids_map.insert(name) { + Some(field_id) => Ok(Ok(PrimaryKey::Flat { name, field_id })), + None => Ok(Err(UserError::AttributeLimitReached)), + } } multiple => Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { - candidates: multiple - .iter() - .map(|(_, candidate)| candidate.to_string()) - .collect(), + candidates: multiple.iter().map(|candidate| candidate.to_string()).collect(), })), } } diff --git a/milli/src/update/new/indexer/top_level_map.rs b/milli/src/update/new/indexer/top_level_map.rs index d82e42dca..f79b6e9ee 100644 --- a/milli/src/update/new/indexer/top_level_map.rs +++ b/milli/src/update/new/indexer/top_level_map.rs @@ -1,13 +1,41 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; -use std::fmt; +use std::{fmt, ops}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; +use serde_json::{Map, Value}; #[derive(Deserialize, Serialize)] pub struct TopLevelMap<'p>(#[serde(borrow)] pub BTreeMap, &'p RawValue>); +impl TryFrom<&'_ TopLevelMap<'_>> for Map { + type Error = serde_json::Error; + + fn try_from(tlmap: &TopLevelMap<'_>) -> Result { + let mut object = Map::new(); + for (k, v) in &tlmap.0 { + let value = serde_json::from_str(v.get())?; + object.insert(k.to_string(), value); + } + Ok(object) + } +} + +impl<'p> ops::Deref for TopLevelMap<'p> { + type Target = BTreeMap, &'p RawValue>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ops::DerefMut for TopLevelMap<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + #[derive(Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub struct CowStr<'p>(#[serde(borrow)] pub Cow<'p, str>);