Make the flattened sorter optional

This commit is contained in:
Clément Renault 2024-05-21 16:16:36 +02:00
parent 943f8dba0c
commit 500ddc76b5
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 49 additions and 31 deletions

View File

@ -360,7 +360,10 @@ where
let min_chunk_size = 1024 * 512; // 512KiB let min_chunk_size = 1024 * 512; // 512KiB
// compute the chunk size from the number of available threads and the inputed data size. // compute the chunk size from the number of available threads and the inputed data size.
let total_size = flattened_documents.metadata().map(|m| m.len()); let total_size = match flattened_documents.as_ref() {
Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()),
None => Ok(default_chunk_size as u64),
};
let current_num_threads = pool.current_num_threads(); let current_num_threads = pool.current_num_threads();
// if we have more than 2 thread, create a number of chunk equal to 3/4 threads count // if we have more than 2 thread, create a number of chunk equal to 3/4 threads count
let chunk_count = if current_num_threads > 2 { let chunk_count = if current_num_threads > 2 {
@ -374,11 +377,14 @@ where
} }
}; };
let flattened_documents = grenad::Reader::new(flattened_documents)?;
let original_documents = match original_documents { let original_documents = match original_documents {
Some(original_documents) => Some(grenad::Reader::new(original_documents)?), Some(original_documents) => Some(grenad::Reader::new(original_documents)?),
None => None, None => None,
}; };
let flattened_documents = match flattened_documents {
Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?),
None => None,
};
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
@ -400,22 +406,20 @@ where
let _enter = child_span.enter(); let _enter = child_span.enter();
puffin::profile_scope!("extract_and_send_grenad_chunks"); puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks // split obkv file into several chunks
let original_chunk_iter = let original_chunk_iter = match original_documents {
match original_documents { Some(original_documents) => {
Some(original_documents) => { grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left)
grenad_obkv_into_chunks( },
original_documents, None => Ok(either::Right(iter::empty())),
pool_params, };
documents_chunk_size
)
.map(either::Either::Left)
},
None => Ok(either::Right(iter::empty())),
};
// split obkv file into several chunks // split obkv file into several chunks
let flattened_chunk_iter = let flattened_chunk_iter = match flattened_documents {
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); Some(flattened_documents) => {
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left)
},
None => Ok(either::Right(iter::empty())),
};
let result = original_chunk_iter.and_then(|original_chunk| { let result = original_chunk_iter.and_then(|original_chunk| {
let flattened_chunk = flattened_chunk_iter?; let flattened_chunk = flattened_chunk_iter?;

View File

@ -34,7 +34,7 @@ pub struct TransformOutput {
pub field_distribution: FieldDistribution, pub field_distribution: FieldDistribution,
pub documents_count: usize, pub documents_count: usize,
pub original_documents: Option<File>, pub original_documents: Option<File>,
pub flattened_documents: File, pub flattened_documents: Option<File>,
} }
/// Extract the external ids, deduplicate and compute the new internal documents ids /// Extract the external ids, deduplicate and compute the new internal documents ids
@ -825,9 +825,9 @@ impl<'a, 'i> Transform<'a, 'i> {
original_documents: Some( original_documents: Some(
original_documents.into_inner().map_err(|err| err.into_error())?, original_documents.into_inner().map_err(|err| err.into_error())?,
), ),
flattened_documents: flattened_documents flattened_documents: Some(
.into_inner() flattened_documents.into_inner().map_err(|err| err.into_error())?,
.map_err(|err| err.into_error())?, ),
}) })
} }
@ -840,6 +840,9 @@ impl<'a, 'i> Transform<'a, 'i> {
original_obkv_buffer: &mut Vec<u8>, original_obkv_buffer: &mut Vec<u8>,
flattened_obkv_buffer: &mut Vec<u8>, flattened_obkv_buffer: &mut Vec<u8>,
) -> Result<()> { ) -> Result<()> {
/// TODO do a XOR of the faceted fields
/// TODO if reindex_searchable returns true store all searchables else none
/// TODO no longer useful after Tamo's PR
let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone(); let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone();
let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone(); let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone();
let mut obkv_writer = KvWriter::<_, FieldId>::memory(); let mut obkv_writer = KvWriter::<_, FieldId>::memory();
@ -907,14 +910,19 @@ impl<'a, 'i> Transform<'a, 'i> {
}; };
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut flattened_sorter = create_sorter( let mut flattened_sorter =
grenad::SortAlgorithm::Stable, if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
keep_first, Some(create_sorter(
self.indexer_settings.chunk_compression_type, grenad::SortAlgorithm::Stable,
self.indexer_settings.chunk_compression_level, keep_first,
self.indexer_settings.max_nb_chunks, self.indexer_settings.chunk_compression_type,
self.indexer_settings.max_memory.map(|mem| mem / 2), self.indexer_settings.chunk_compression_level,
); self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
))
} else {
None
};
let mut original_obkv_buffer = Vec::new(); let mut original_obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new();
@ -938,7 +946,9 @@ impl<'a, 'i> Transform<'a, 'i> {
if let Some(original_sorter) = original_sorter.as_mut() { if let Some(original_sorter) = original_sorter.as_mut() {
original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?; original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
} }
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; if let Some(flattened_sorter) = flattened_sorter.as_mut() {
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
}
} }
let grenad_params = GrenadParameters { let grenad_params = GrenadParameters {
@ -949,7 +959,10 @@ impl<'a, 'i> Transform<'a, 'i> {
}; };
// Once we have written all the documents, we merge everything into a Reader. // Once we have written all the documents, we merge everything into a Reader.
let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; let flattened_documents = match flattened_sorter {
Some(flattened_sorter) => Some(sorter_into_reader(flattened_sorter, grenad_params)?),
None => None,
};
let original_documents = match original_sorter { let original_documents = match original_sorter {
Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?), Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?),
None => None, None => None,
@ -961,7 +974,7 @@ impl<'a, 'i> Transform<'a, 'i> {
settings_diff, settings_diff,
documents_count, documents_count,
original_documents: original_documents.map(|od| od.into_inner().into_inner()), original_documents: original_documents.map(|od| od.into_inner().into_inner()),
flattened_documents: flattened_documents.into_inner().into_inner(), flattened_documents: flattened_documents.map(|fd| fd.into_inner().into_inner()),
}) })
} }
} }

View File

@ -1099,6 +1099,7 @@ impl InnerIndexSettingsDiff {
} }
pub fn reindex_searchable(&self) -> bool { pub fn reindex_searchable(&self) -> bool {
// TODO no longer useful after Tamo's PR
self.old self.old
.fields_ids_map .fields_ids_map
.iter() .iter()