diff --git a/milli/src/external_documents_ids.rs b/milli/src/external_documents_ids.rs index 1bf08396a..e0a71b7cd 100644 --- a/milli/src/external_documents_ids.rs +++ b/milli/src/external_documents_ids.rs @@ -74,10 +74,6 @@ impl ExternalDocumentsIds { for DocumentOperation { external_id, internal_id, kind } in operations { match kind { DocumentOperationKind::Create => { - // TODO should we get before insert to be able to detect bugs? - // if matches!(kind, DocumentOperationKind::Create) { - // panic!("Attempting to create an already-existing document"); - // } self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?; } DocumentOperationKind::Delete => { @@ -90,6 +86,11 @@ impl ExternalDocumentsIds { Ok(()) } + + /// Returns an iterator over all the external ids. + pub fn iter<'t>(&self, rtxn: &'t RoTxn) -> heed::Result>> { + self.0.iter(rtxn) + } } /// An iterator over mappings between requested internal ids and external ids. diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index 863bc07c3..1f5edeeeb 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -7,7 +7,8 @@ use serde_json::{from_slice, Value}; use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; use crate::error::UserError; -use crate::{FieldId, InternalError, Result, VectorOrArrayOfVectors}; +use crate::update::index_documents::helpers::try_split_at; +use crate::{DocumentId, FieldId, InternalError, Result, VectorOrArrayOfVectors}; /// Extracts the embedding vector contained in each document under the `_vectors` field. /// @@ -16,7 +17,6 @@ use crate::{FieldId, InternalError, Result, VectorOrArrayOfVectors}; pub fn extract_vector_points( obkv_documents: grenad::Reader, indexer: GrenadParameters, - primary_key_id: FieldId, vectors_fid: FieldId, ) -> Result>> { puffin::profile_function!(); @@ -28,15 +28,17 @@ pub fn extract_vector_points( ); let mut cursor = obkv_documents.into_cursor()?; - while let Some((docid_bytes, value)) = cursor.move_on_next()? { + while let Some((key, value)) = cursor.move_on_next()? { + // this must always be serialized as (docid, external_docid); + let (docid_bytes, external_id_bytes) = + try_split_at(key, std::mem::size_of::()).unwrap(); + debug_assert!(std::str::from_utf8(external_id_bytes).is_ok()); + let obkv = obkv::KvReader::new(value); // since we only needs the primary key when we throw an error we create this getter to // lazily get it when needed - let document_id = || -> Value { - let document_id = obkv.get(primary_key_id).unwrap(); - from_slice(document_id).unwrap() - }; + let document_id = || -> Value { std::str::from_utf8(external_id_bytes).unwrap().into() }; // first we retrieve the _vectors field if let Some(vectors) = obkv.get(vectors_fid) { diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 41722a53e..ee8713ee8 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -63,7 +63,6 @@ pub(crate) fn data_from_obkv_documents( indexer, lmdb_writer_sx.clone(), vectors_field_id, - primary_key_id, ) }) .collect::>()?; @@ -274,7 +273,6 @@ fn send_original_documents_data( indexer: GrenadParameters, lmdb_writer_sx: Sender>, vectors_field_id: Option, - primary_key_id: FieldId, ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -283,12 +281,7 @@ fn send_original_documents_data( let documents_chunk_cloned = original_documents_chunk.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); rayon::spawn(move || { - let result = extract_vector_points( - documents_chunk_cloned, - indexer, - primary_key_id, - vectors_field_id, - ); + let result = extract_vector_points(documents_chunk_cloned, indexer, vectors_field_id); let _ = match result { Ok(vector_points) => { lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points))) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index c32f907b2..129b67cf0 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -1387,6 +1387,8 @@ mod tests { index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap(); let rtxn = index.read_txn().unwrap(); + let all_documents_count = index.all_documents(&rtxn).unwrap().count(); + assert_eq!(all_documents_count, 1); let external_documents_ids = index.external_documents_ids(); assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some()); } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 23b5c78c1..7c500799d 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -14,14 +14,15 @@ use serde_json::Value; use smartstring::SmartString; use super::helpers::{ - create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions, - obkvs_merge_additions_and_deletions, MergeFn, + create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions, + obkvs_merge_additions_and_deletions, sorter_into_reader, MergeFn, }; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd}; +use crate::update::index_documents::GrenadParameters; use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; use crate::{ FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32, @@ -174,7 +175,8 @@ impl<'a, 'i> Transform<'a, 'i> { self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; let mut obkv_buffer = Vec::new(); - let mut document_sorter_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + let mut document_sorter_key_buffer = Vec::new(); let mut documents_count = 0; let mut docid_buffer: Vec = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); @@ -268,57 +270,66 @@ impl<'a, 'i> Transform<'a, 'i> { // we associate the base document with the new key, everything will get merged later. let keep_original_version = self.index_documents_method == IndexDocumentsMethod::UpdateDocuments; - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(base_obkv), true, keep_original_version, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; - self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; let base_obkv = KvReader::new(base_obkv); if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(&flattened_obkv), true, keep_original_version, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; } } if !skip_insertion { self.new_documents_ids.insert(docid); - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(&obkv_buffer), false, true, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; // We use the extracted/generated user id as the key for this document. - self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; let flattened_obkv = KvReader::new(&obkv_buffer); if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( KvReaderU16::new(&obkv), false, true, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )? } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; } documents_count += 1; @@ -372,37 +383,42 @@ impl<'a, 'i> Transform<'a, 'i> { let external_documents_ids = self.index.external_documents_ids(); let mut documents_deleted = 0; - let mut document_sorter_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + let mut document_sorter_key_buffer = Vec::new(); for to_remove in to_remove { if should_abort() { return Err(Error::InternalError(InternalError::AbortedIndexation)); } // Check if the document has been added in the current indexing process. - let deleted_from_current = match self - .new_external_documents_ids_builder - .entry((*to_remove).into()) - { - // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. - HEntry::Occupied(entry) => { - let doc_id = *entry.get() as u32; - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); - obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap(); - self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; - self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; + let deleted_from_current = + match self.new_external_documents_ids_builder.entry((*to_remove).into()) { + // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. + HEntry::Occupied(entry) => { + let docid = *entry.get() as u32; + // Key is the concatenation of the internal docid and the external one. + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); + obkv::KvWriterU16::new(&mut document_sorter_value_buffer).finish().unwrap(); + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; - // we must NOT update the list of replaced_documents_ids - // Either: - // 1. It's already in it and there is nothing to do - // 2. It wasn't in it because the document was created by a previous batch and since - // we're removing it there is nothing to do. - self.new_documents_ids.remove(doc_id); - entry.remove_entry(); - true - } - HEntry::Vacant(_) => false, - }; + // we must NOT update the list of replaced_documents_ids + // Either: + // 1. It's already in it and there is nothing to do + // 2. It wasn't in it because the document was created by a previous batch and since + // we're removing it there is nothing to do. + self.new_documents_ids.remove(docid); + entry.remove_entry(); + true + } + HEntry::Vacant(_) => false, + }; // If the document was already in the db we mark it as a `to_delete` document. // Then we push the document in sorters in deletion mode. @@ -422,31 +438,37 @@ impl<'a, 'i> Transform<'a, 'i> { key: None, })?; + // Key is the concatenation of the internal docid and the external one. + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes()); // push it as to delete in the original_sorter - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); into_del_add_obkv( KvReaderU16::new(base_obkv), true, false, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; - self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; // flatten it and push it as to delete in the flattened_sorter let flattened_obkv = KvReader::new(base_obkv); if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); into_del_add_obkv( KvReaderU16::new(&obkv), true, false, - &mut document_sorter_buffer, + &mut document_sorter_value_buffer, )?; } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; true } @@ -754,24 +776,35 @@ impl<'a, 'i> Transform<'a, 'i> { let documents_ids = self.index.documents_ids(wtxn)?; let documents_count = documents_ids.len() as usize; - // We create a final writer to write the new documents in order from the sorter. - let mut original_writer = create_writer( + // We initialize the sorter with the user indexing settings. + let mut original_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - tempfile::tempfile()?, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory.map(|mem| mem / 2), ); - // We create a final writer to write the new documents in order from the sorter. - let mut flattened_writer = create_writer( + // We initialize the sorter with the user indexing settings. + let mut flattened_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - tempfile::tempfile()?, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory.map(|mem| mem / 2), ); let mut obkv_buffer = Vec::new(); - let mut document_sorter_buffer = Vec::new(); - for result in self.index.all_documents(wtxn)? { - let (docid, obkv) = result?; + let mut document_sorter_key_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + for result in self.index.external_documents_ids().iter(wtxn)? { + let (external_id, docid) = result?; + let obkv = self.index.documents.get(wtxn, &docid)?.ok_or( + InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, + )?; + let docid = docid.get(); obkv_buffer.clear(); let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); @@ -784,9 +817,18 @@ impl<'a, 'i> Transform<'a, 'i> { } let buffer = obkv_writer.into_inner()?; - document_sorter_buffer.clear(); - into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?; - original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + into_del_add_obkv( + KvReaderU16::new(buffer), + false, + true, + &mut document_sorter_value_buffer, + )?; + original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; // Once we have the document. We're going to flatten it // and insert it in the flattened sorter. @@ -821,18 +863,27 @@ impl<'a, 'i> Transform<'a, 'i> { let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; writer.insert(fid, &value)?; } - document_sorter_buffer.clear(); - into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?; - flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + document_sorter_value_buffer.clear(); + into_del_add_obkv( + KvReaderU16::new(&buffer), + false, + true, + &mut document_sorter_value_buffer, + )?; + flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; } - // Once we have written all the documents, we extract - // the file and reset the seek to be able to read it again. - let mut original_documents = original_writer.into_inner()?; - original_documents.rewind()?; + let grenad_params = GrenadParameters { + chunk_compression_type: self.indexer_settings.chunk_compression_type, + chunk_compression_level: self.indexer_settings.chunk_compression_level, + max_memory: self.indexer_settings.max_memory, + max_nb_chunks: self.indexer_settings.max_nb_chunks, // default value, may be chosen. + }; - let mut flattened_documents = flattened_writer.into_inner()?; - flattened_documents.rewind()?; + // Once we have written all the documents, we merge everything into a Reader. + let original_documents = sorter_into_reader(original_sorter, grenad_params)?; + + let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; let output = TransformOutput { primary_key, @@ -844,10 +895,8 @@ impl<'a, 'i> Transform<'a, 'i> { // FIXME: remove this now unused field replaced_documents_ids: RoaringBitmap::default(), documents_count, - original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, - flattened_documents: flattened_documents - .into_inner() - .map_err(|err| err.into_error())?, + original_documents: original_documents.into_inner().into_inner(), + flattened_documents: flattened_documents.into_inner().into_inner(), }; let new_facets = output.compute_real_facets(wtxn, self.index)?; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 1b38be03b..7c3f587d2 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -17,6 +17,7 @@ use crate::distance::NDotProductPoint; use crate::error::UserError; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; +use crate::index::db_name::DOCUMENTS; use crate::index::Hnsw; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; @@ -24,7 +25,7 @@ use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_arr use crate::update::index_documents::validate_document_id_value; use crate::{ lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, - Result, BEU32, + Result, SerializationError, BEU32, }; pub(crate) enum TypedChunk { @@ -124,13 +125,15 @@ pub(crate) fn write_typed_chunk_into_index( let mut operations: Vec = Default::default(); let mut docids = index.documents_ids(wtxn)?; - let primary_key = index.primary_key(wtxn)?.unwrap(); - let primary_key = index.fields_ids_map(wtxn)?.id(primary_key).unwrap(); let mut cursor = obkv_documents_iter.into_cursor()?; - while let Some((docid, reader)) = cursor.move_on_next()? { + while let Some((key, reader)) = cursor.move_on_next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let reader: KvReader = KvReader::new(reader); - let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap(); + + let (document_id_bytes, external_id_bytes) = try_split_array_at(key) + .ok_or(SerializationError::Decoding { db_name: Some(DOCUMENTS) })?; + let docid = DocumentId::from_be_bytes(document_id_bytes); + let external_id = std::str::from_utf8(external_id_bytes)?; for (field_id, value) in reader.iter() { let del_add_reader = KvReaderDelAdd::new(value); @@ -140,45 +143,10 @@ pub(crate) fn write_typed_chunk_into_index( ) { (None, None) => {} (None, Some(value)) => { - // if primary key, new document - if field_id == primary_key { - // FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk - // rather than re-extract it here - // FIXME: unwraps - let document_id = serde_json::from_slice(value) - .map_err(InternalError::SerdeJson) - .unwrap(); - let external_id = - validate_document_id_value(document_id).unwrap().unwrap(); - operations.push(DocumentOperation { - external_id, - internal_id: docid, - kind: DocumentOperationKind::Create, - }); - docids.insert(docid); - } // anyway, write writer.insert(field_id, value)?; } - (Some(value), None) => { - // if primary key, deleted document - if field_id == primary_key { - // FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk - // rather than re-extract it here - // FIXME: unwraps - let document_id = serde_json::from_slice(value) - .map_err(InternalError::SerdeJson) - .unwrap(); - let external_id = - validate_document_id_value(document_id).unwrap().unwrap(); - operations.push(DocumentOperation { - external_id, - internal_id: docid, - kind: DocumentOperationKind::Delete, - }); - docids.remove(docid); - } - } + (Some(_), None) => {} (Some(_), Some(value)) => { // updated field, write writer.insert(field_id, value)?; @@ -190,8 +158,20 @@ pub(crate) fn write_typed_chunk_into_index( if !writer.is_empty() { db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?; + operations.push(DocumentOperation { + external_id: external_id.to_string(), + internal_id: docid, + kind: DocumentOperationKind::Create, + }); + docids.insert(docid); } else { db.delete(wtxn, &BEU32::new(docid))?; + operations.push(DocumentOperation { + external_id: external_id.to_string(), + internal_id: docid, + kind: DocumentOperationKind::Delete, + }); + docids.remove(docid); } } let external_documents_docids = index.external_documents_ids();