From bc51d6157adceee09f529789e66f66f9d08d8fac Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 2 Nov 2023 13:37:54 +0100 Subject: [PATCH] Fix transform reindexing path --- milli/src/external_documents_ids.rs | 2 +- milli/src/update/index_documents/transform.rs | 82 +++++++++++++------ 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/milli/src/external_documents_ids.rs b/milli/src/external_documents_ids.rs index ee8d29ffc..a002fc064 100644 --- a/milli/src/external_documents_ids.rs +++ b/milli/src/external_documents_ids.rs @@ -18,7 +18,7 @@ pub struct DocumentOperation { pub kind: DocumentOperationKind, } -pub struct ExternalDocumentsIds(Database>); +pub struct ExternalDocumentsIds(pub Database>); impl ExternalDocumentsIds { pub fn new(db: Database>) -> ExternalDocumentsIds { diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 3863d5a54..82cf55d42 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -14,14 +14,15 @@ use serde_json::Value; use smartstring::SmartString; use super::helpers::{ - create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions, - obkvs_merge_additions_and_deletions, MergeFn, + create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions, + obkvs_merge_additions_and_deletions, sorter_into_reader, MergeFn, }; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd}; +use crate::update::index_documents::GrenadParameters; use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; use crate::{ FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32, @@ -772,24 +773,35 @@ impl<'a, 'i> Transform<'a, 'i> { let documents_ids = self.index.documents_ids(wtxn)?; let documents_count = documents_ids.len() as usize; - // We create a final writer to write the new documents in order from the sorter. - let mut original_writer = create_writer( + // We initialize the sorter with the user indexing settings. + let mut original_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - tempfile::tempfile()?, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory.map(|mem| mem / 2), ); - // We create a final writer to write the new documents in order from the sorter. - let mut flattened_writer = create_writer( + // We initialize the sorter with the user indexing settings. + let mut flattened_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - tempfile::tempfile()?, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory.map(|mem| mem / 2), ); let mut obkv_buffer = Vec::new(); - let mut document_sorter_buffer = Vec::new(); - for result in self.index.all_documents(wtxn)? { - let (docid, obkv) = result?; + let mut document_sorter_key_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + for result in self.index.external_documents_ids().0.iter(wtxn)? { + let (external_id, docid) = result?; + let obkv = self.index.documents.get(wtxn, &docid)?.ok_or( + InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, + )?; + let docid = docid.get(); obkv_buffer.clear(); let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); @@ -802,9 +814,18 @@ impl<'a, 'i> Transform<'a, 'i> { } let buffer = obkv_writer.into_inner()?; - document_sorter_buffer.clear(); - into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?; - original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + into_del_add_obkv( + KvReaderU16::new(buffer), + false, + true, + &mut document_sorter_value_buffer, + )?; + original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; // Once we have the document. We're going to flatten it // and insert it in the flattened sorter. @@ -839,18 +860,27 @@ impl<'a, 'i> Transform<'a, 'i> { let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; writer.insert(fid, &value)?; } - document_sorter_buffer.clear(); - into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?; - flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; + document_sorter_value_buffer.clear(); + into_del_add_obkv( + KvReaderU16::new(&buffer), + false, + true, + &mut document_sorter_value_buffer, + )?; + flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; } - // Once we have written all the documents, we extract - // the file and reset the seek to be able to read it again. - let mut original_documents = original_writer.into_inner()?; - original_documents.rewind()?; + let grenad_params = GrenadParameters { + chunk_compression_type: self.indexer_settings.chunk_compression_type, + chunk_compression_level: self.indexer_settings.chunk_compression_level, + max_memory: self.indexer_settings.max_memory, + max_nb_chunks: self.indexer_settings.max_nb_chunks, // default value, may be chosen. + }; - let mut flattened_documents = flattened_writer.into_inner()?; - flattened_documents.rewind()?; + // Once we have written all the documents, we merge everything into a Reader. + let original_documents = sorter_into_reader(original_sorter, grenad_params)?; + + let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; let output = TransformOutput { primary_key, @@ -862,10 +892,8 @@ impl<'a, 'i> Transform<'a, 'i> { // FIXME: remove this now unused field replaced_documents_ids: RoaringBitmap::default(), documents_count, - original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, - flattened_documents: flattened_documents - .into_inner() - .map_err(|err| err.into_error())?, + original_documents: original_documents.into_inner().into_inner(), + flattened_documents: flattened_documents.into_inner().into_inner(), }; let new_facets = output.compute_real_facets(wtxn, self.index)?;