diff --git a/milli/src/documents/enriched.rs b/milli/src/documents/enriched.rs index 918b47c95..4f45a891a 100644 --- a/milli/src/documents/enriched.rs +++ b/milli/src/documents/enriched.rs @@ -7,6 +7,7 @@ use super::{ DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader, Error, }; +use crate::update::DocumentId; use crate::FieldId; /// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have @@ -66,10 +67,10 @@ impl EnrichedDocumentsBatchReader { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct EnrichedDocument<'a> { pub document: KvReader<'a, FieldId>, - pub external_id: &'a str, + pub document_id: DocumentId, } pub struct EnrichedDocumentsBatchCursor { @@ -110,13 +111,13 @@ impl EnrichedDocumentsBatchCursor { &mut self, ) -> Result, DocumentsBatchCursorError> { let document = self.documents.next_document()?; - let external_id = match self.external_ids.move_on_next()? { - Some((_, bytes)) => Some(str::from_utf8(bytes)?), + let document_id = match self.external_ids.move_on_next()? { + Some((_, bytes)) => serde_json::from_slice(bytes).map(Some)?, None => None, }; - match document.zip(external_id) { - Some((document, external_id)) => Ok(Some(EnrichedDocument { document, external_id })), + match document.zip(document_id) { + Some((document, document_id)) => Ok(Some(EnrichedDocument { document, document_id })), None => Ok(None), } } diff --git a/milli/src/documents/reader.rs b/milli/src/documents/reader.rs index 7bd6dbd51..70b8b0131 100644 --- a/milli/src/documents/reader.rs +++ b/milli/src/documents/reader.rs @@ -1,5 +1,5 @@ use std::convert::TryInto; -use std::{error, fmt, io, str}; +use std::{error, fmt, io}; use obkv::KvReader; @@ -95,7 +95,7 @@ impl DocumentsBatchCursor { #[derive(Debug)] pub enum DocumentsBatchCursorError { Grenad(grenad::Error), - Utf8(str::Utf8Error), + SerdeJson(serde_json::Error), } impl From for DocumentsBatchCursorError { @@ -104,9 +104,9 @@ impl From for DocumentsBatchCursorError { } } -impl From for DocumentsBatchCursorError { - fn from(error: str::Utf8Error) -> DocumentsBatchCursorError { - DocumentsBatchCursorError::Utf8(error) +impl From for DocumentsBatchCursorError { + fn from(error: serde_json::Error) -> DocumentsBatchCursorError { + DocumentsBatchCursorError::SerdeJson(error) } } @@ -116,7 +116,7 @@ impl fmt::Display for DocumentsBatchCursorError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { DocumentsBatchCursorError::Grenad(e) => e.fmt(f), - DocumentsBatchCursorError::Utf8(e) => e.fmt(f), + DocumentsBatchCursorError::SerdeJson(e) => e.fmt(f), } } } diff --git a/milli/src/error.rs b/milli/src/error.rs index 0419ceeda..0abb41eec 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -217,7 +217,7 @@ impl From for Error { fn from(error: DocumentsBatchCursorError) -> Error { match error { DocumentsBatchCursorError::Grenad(e) => Error::from(e), - DocumentsBatchCursorError::Utf8(e) => Error::from(e), + DocumentsBatchCursorError::SerdeJson(e) => Error::from(InternalError::from(e)), } } } diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index 5d00565a8..1a0c31c24 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -2,6 +2,7 @@ use std::io::{Read, Seek}; use std::result::Result as StdResult; use std::{fmt, iter}; +use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader}; @@ -89,14 +90,15 @@ pub fn enrich_documents_batch( Err(user_error) => return Ok(Err(user_error)), }; - external_ids.insert(count.to_be_bytes(), document_id.value())?; - if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) { if let Err(user_error) = validate_geo_from_json(&document_id, geo_value)? { return Ok(Err(UserError::from(user_error))); } } + let document_id = serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson)?; + external_ids.insert(count.to_be_bytes(), document_id)?; + count += 1; } @@ -210,7 +212,7 @@ impl PrimaryKey<'_> { /// /// In case the document id has been auto-generated, the document nth is kept to help /// users debug if there is an issue with the document itself. -#[derive(Clone)] +#[derive(Serialize, Deserialize, Clone)] pub enum DocumentId { Retrieved { value: String }, Generated { value: String, document_nth: u32 }, @@ -225,16 +227,20 @@ impl DocumentId { DocumentId::Generated { value, document_nth } } - fn value(&self) -> &str { + fn debug(&self) -> String { + format!("{:?}", self) + } + + pub fn is_generated(&self) -> bool { + matches!(self, DocumentId::Generated { .. }) + } + + pub fn value(&self) -> &str { match self { DocumentId::Retrieved { value } => value, DocumentId::Generated { value, .. } => value, } } - - fn debug(&self) -> String { - format!("{:?}", self) - } } impl fmt::Debug for DocumentId { diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index db1a768e6..615e1dfc7 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -22,7 +22,7 @@ use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; use self::enrich::enrich_documents_batch; pub use self::enrich::{ extract_float_from_value, validate_document_id, validate_document_id_value, - validate_geo_from_json, + validate_geo_from_json, DocumentId, }; pub use self::helpers::{ as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index e82556ec7..a34295a50 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -153,8 +153,9 @@ impl<'a, 'i> Transform<'a, 'i> { let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?; let primary_key = cursor.primary_key().to_string(); - self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; let primary_key_id_nested = primary_key.contains('.'); + let primary_key_id = + self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; let mut flattened_document = None; let mut obkv_buffer = Vec::new(); @@ -162,7 +163,7 @@ impl<'a, 'i> Transform<'a, 'i> { let mut documents_count = 0; let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); while let Some(enriched_document) = cursor.next_enriched_document()? { - let EnrichedDocument { document, external_id } = enriched_document; + let EnrichedDocument { document, document_id } = enriched_document; let mut field_buffer_cache = drop_and_reuse(field_buffer); if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { @@ -171,6 +172,14 @@ impl<'a, 'i> Transform<'a, 'i> { }); } + // When the document id has been auto-generated by the `enrich_documents_batch` + // we must insert this document id into the remaped document. + let external_id = document_id.value(); + if document_id.is_generated() { + let docid = serde_json::to_vec(external_id).map_err(InternalError::SerdeJson)?; + field_buffer_cache.push((primary_key_id, Cow::from(docid))); + } + for (k, v) in document.iter() { let mapped_id = *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 965ed4fd2..1bf27a5f0 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -3,7 +3,7 @@ pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::{DeleteDocuments, DocumentDeletionResult}; pub use self::facets::Facets; pub use self::index_documents::{ - DocumentAdditionResult, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, + DocumentAdditionResult, DocumentId, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, }; pub use self::indexer_config::IndexerConfig; pub use self::settings::{Setting, Settings};