Make the original sorter optional

This commit is contained in:
Clément Renault 2024-05-21 14:53:26 +02:00
parent abe29772db
commit 1aa8ed9ef7
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 42 additions and 18 deletions

View File

@ -6,6 +6,7 @@ mod typed_chunk;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::iter;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::result::Result as StdResult; use std::result::Result as StdResult;
use std::sync::Arc; use std::sync::Arc;
@ -373,8 +374,11 @@ where
} }
}; };
let original_documents = grenad::Reader::new(original_documents)?;
let flattened_documents = grenad::Reader::new(flattened_documents)?; let flattened_documents = grenad::Reader::new(flattened_documents)?;
let original_documents = match original_documents {
Some(original_documents) => Some(grenad::Reader::new(original_documents)?),
None => None,
};
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
@ -397,7 +401,17 @@ where
puffin::profile_scope!("extract_and_send_grenad_chunks"); puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks // split obkv file into several chunks
let original_chunk_iter = let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size); match original_documents {
Some(original_documents) => {
grenad_obkv_into_chunks(
original_documents,
pool_params,
documents_chunk_size
)
.map(either::Either::Left)
},
None => Ok(either::Right(iter::empty())),
};
// split obkv file into several chunks // split obkv file into several chunks
let flattened_chunk_iter = let flattened_chunk_iter =

View File

@ -33,7 +33,7 @@ pub struct TransformOutput {
pub settings_diff: InnerIndexSettingsDiff, pub settings_diff: InnerIndexSettingsDiff,
pub field_distribution: FieldDistribution, pub field_distribution: FieldDistribution,
pub documents_count: usize, pub documents_count: usize,
pub original_documents: File, pub original_documents: Option<File>,
pub flattened_documents: File, pub flattened_documents: File,
} }
@ -822,7 +822,9 @@ impl<'a, 'i> Transform<'a, 'i> {
settings_diff, settings_diff,
field_distribution, field_distribution,
documents_count: self.documents_count, documents_count: self.documents_count,
original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, original_documents: Some(
original_documents.into_inner().map_err(|err| err.into_error())?,
),
flattened_documents: flattened_documents flattened_documents: flattened_documents
.into_inner() .into_inner()
.map_err(|err| err.into_error())?, .map_err(|err| err.into_error())?,
@ -891,14 +893,18 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize; let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut original_sorter = create_sorter( let mut original_sorter = if settings_diff.reindex_vectors() {
Some(create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_first, keep_first,
self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level, self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks, self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2), self.indexer_settings.max_memory.map(|mem| mem / 2),
); ))
} else {
None
};
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut flattened_sorter = create_sorter( let mut flattened_sorter = create_sorter(
@ -929,7 +935,9 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_key_buffer.clear(); document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
if let Some(original_sorter) = original_sorter.as_mut() {
original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?; original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
}
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
} }
@ -941,16 +949,18 @@ impl<'a, 'i> Transform<'a, 'i> {
}; };
// Once we have written all the documents, we merge everything into a Reader. // Once we have written all the documents, we merge everything into a Reader.
let original_documents = sorter_into_reader(original_sorter, grenad_params)?;
let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?;
let original_documents = match original_sorter {
Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?),
None => None,
};
Ok(TransformOutput { Ok(TransformOutput {
primary_key, primary_key,
field_distribution, field_distribution,
settings_diff, settings_diff,
documents_count, documents_count,
original_documents: original_documents.into_inner().into_inner(), original_documents: original_documents.map(|od| od.into_inner().into_inner()),
flattened_documents: flattened_documents.into_inner().into_inner(), flattened_documents: flattened_documents.into_inner().into_inner(),
}) })
} }