Compute chunk size based on the input data size ant the number of indexing threads

This commit is contained in:
ManyTheFish 2024-01-22 16:23:12 +01:00 committed by Louis Dureuil
parent 023c2d755f
commit be1b054b05
No known key found for this signature in database
13 changed files with 991 additions and 795 deletions

View file

@ -5,20 +5,21 @@ mod transform;
mod typed_chunk;
use std::collections::{HashMap, HashSet};
use std::io::{Cursor, Read, Seek};
use std::io::{Read, Seek};
use std::iter::FromIterator;
use std::num::NonZeroU32;
use std::result::Result as StdResult;
use crossbeam_channel::{Receiver, Sender};
use grenad::{Merger, MergerBuilder};
use heed::types::Str;
use heed::Database;
use rand::SeedableRng;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use slice_group_by::GroupBy;
use tracing::debug_span;
use typed_chunk::{write_typed_chunk_into_index, TypedChunk};
use tracing::debug;
use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk};
use self::enrich::enrich_documents_batch;
pub use self::enrich::{extract_finite_float_from_value, DocumentId};
@ -26,8 +27,7 @@ pub use self::helpers::{
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader,
ClonableMmap, MergeFn,
merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn,
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
@ -95,8 +95,8 @@ pub struct IndexDocumentsConfig {
impl<'t, 'i, 'a, FP, FA> IndexDocuments<'t, 'i, 'a, FP, FA>
where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
FP: Fn(UpdateIndexingStep) + Sync + Send,
FA: Fn() -> bool + Sync + Send,
{
pub fn new(
wtxn: &'t mut heed::RwTxn<'i>,
@ -326,9 +326,6 @@ where
}
};
let original_documents = grenad::Reader::new(original_documents)?;
let flattened_documents = grenad::Reader::new(flattened_documents)?;
// create LMDB writer channel
let (lmdb_writer_sx, lmdb_writer_rx): (
Sender<Result<TypedChunk>>,
@ -367,11 +364,7 @@ where
let stop_words = self.index.stop_words(self.wtxn)?;
let separators = self.index.allowed_separators(self.wtxn)?;
let separators: Option<Vec<_>> =
separators.as_ref().map(|x| x.iter().map(String::as_str).collect());
let dictionary = self.index.dictionary(self.wtxn)?;
let dictionary: Option<Vec<_>> =
dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect());
let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?;
let proximity_precision = self.index.proximity_precision(self.wtxn)?.unwrap_or_default();
@ -381,141 +374,202 @@ where
max_memory: self.indexer_config.max_memory,
max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen.
};
let documents_chunk_size =
self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4); // 4MiB
let documents_chunk_size = match self.indexer_config.documents_chunk_size {
Some(chunk_size) => chunk_size,
None => {
let default_chunk_size = 1024 * 1024 * 4; // 4MiB
let min_chunk_size = 1024 * 512; // 512KiB
// compute the chunk size from the number of available threads and the inputed data size.
let total_size = flattened_documents.metadata().map(|m| m.len());
let current_num_threads = pool.current_num_threads();
// if we have more than 2 thread, create a number of chunk equal to 3/4 threads count
let chunk_count = if current_num_threads > 2 {
(current_num_threads * 3 / 4).max(2)
} else {
current_num_threads
};
total_size
.map_or(default_chunk_size, |size| (size as usize) / chunk_count)
.max(min_chunk_size)
}
};
let original_documents = grenad::Reader::new(original_documents)?;
let flattened_documents = grenad::Reader::new(flattened_documents)?;
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
let cloned_embedder = self.embedders.clone();
let mut final_documents_ids = RoaringBitmap::new();
let mut databases_seen = 0;
let mut word_position_docids = None;
let mut word_fid_docids = None;
let mut word_docids = None;
let mut exact_word_docids = None;
let mut chunk_accumulator = ChunkAccumulator::default();
let mut dimension = HashMap::new();
let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap());
let current_span = tracing::Span::current();
// Run extraction pipeline in parallel.
pool.install(|| {
let child_span = tracing::trace_span!(target: "indexing::details", parent: &current_span, "extract_and_send_grenad_chunks");
rayon::spawn(move || {
let child_span = tracing::trace_span!(target: "indexing::details", parent: &current_span, "extract_and_send_grenad_chunks");
let _enter = child_span.enter();
puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks
let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
// split obkv file into several chunks
let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
// split obkv file into several chunks
let flattened_chunk_iter =
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
// split obkv file into several chunks
let flattened_chunk_iter =
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
let result = original_chunk_iter.and_then(|original_chunk| {
let flattened_chunk = flattened_chunk_iter?;
// extract all databases from the chunked obkv douments
extract::data_from_obkv_documents(
original_chunk,
flattened_chunk,
pool_params,
lmdb_writer_sx.clone(),
searchable_fields,
faceted_fields,
primary_key_id,
geo_fields_ids,
field_id_map,
stop_words,
separators.as_deref(),
dictionary.as_deref(),
max_positions_per_attributes,
exact_attributes,
proximity_precision,
cloned_embedder,
)
let separators: Option<Vec<_>> =
separators.as_ref().map(|x| x.iter().map(String::as_str).collect());
let dictionary: Option<Vec<_>> =
dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect());
let result = original_chunk_iter.and_then(|original_chunk| {
let flattened_chunk = flattened_chunk_iter?;
// extract all databases from the chunked obkv douments
extract::data_from_obkv_documents(
original_chunk,
flattened_chunk,
pool_params,
lmdb_writer_sx.clone(),
searchable_fields,
faceted_fields,
primary_key_id,
geo_fields_ids,
field_id_map,
stop_words,
separators.as_deref(),
dictionary.as_deref(),
max_positions_per_attributes,
exact_attributes,
proximity_precision,
cloned_embedder,
)
});
if let Err(e) = result {
let _ = lmdb_writer_sx.send(Err(e));
}
// needs to be dropped to avoid channel waiting lock.
drop(lmdb_writer_sx);
});
if let Err(e) = result {
let _ = lmdb_writer_sx.send(Err(e));
}
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// needs to be dropped to avoid channel waiting lock.
drop(lmdb_writer_sx);
});
loop {
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let index_is_empty = self.index.number_of_documents(self.wtxn)? == 0;
let mut final_documents_ids = RoaringBitmap::new();
match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(500)) {
Err(status) => {
if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
(self.progress)(UpdateIndexingStep::IndexDocuments {
documents_seen: documents_seen_count as usize,
total_documents: documents_count,
});
debug!(documents = documents_seen_count, total = documents_count, "Seen");
}
if is_merged_database {
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
}
// If no more chunk remains in the chunk accumulator and the channel is disconected, break.
} else if status == crossbeam_channel::RecvTimeoutError::Disconnected {
break;
}
}
Ok(result) => {
let typed_chunk = match result? {
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => {
let cloneable_chunk =
unsafe { as_cloneable_grenad(&word_docids_reader)? };
let word_docids = word_docids.get_or_insert_with(|| {
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
});
word_docids.push(cloneable_chunk.into_cursor()?);
let cloneable_chunk =
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
let exact_word_docids =
exact_word_docids.get_or_insert_with(|| {
MergerBuilder::new(
merge_deladd_cbo_roaring_bitmaps as MergeFn,
)
});
exact_word_docids.push(cloneable_chunk.into_cursor()?);
let cloneable_chunk =
unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
let word_fid_docids = word_fid_docids.get_or_insert_with(|| {
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
});
word_fid_docids.push(cloneable_chunk.into_cursor()?);
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
}
}
TypedChunk::WordPositionDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
let word_position_docids =
word_position_docids.get_or_insert_with(|| {
MergerBuilder::new(
merge_deladd_cbo_roaring_bitmaps as MergeFn,
)
});
word_position_docids.push(cloneable_chunk.into_cursor()?);
TypedChunk::WordPositionDocids(chunk)
}
TypedChunk::VectorPoints {
expected_dimension,
remove_vectors,
embeddings,
manual_vectors,
embedder_name,
} => {
dimension.insert(embedder_name.clone(), expected_dimension);
TypedChunk::VectorPoints {
remove_vectors,
embeddings,
expected_dimension,
manual_vectors,
embedder_name,
}
}
otherwise => otherwise,
};
let mut databases_seen = 0;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
let mut word_position_docids = None;
let mut word_fid_docids = None;
let mut word_docids = None;
let mut exact_word_docids = None;
let mut dimension = HashMap::new();
for result in lmdb_writer_rx {
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let typed_chunk = match result? {
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
word_docids = Some(cloneable_chunk);
let cloneable_chunk =
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
exact_word_docids = Some(cloneable_chunk);
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
word_fid_docids = Some(cloneable_chunk);
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
chunk_accumulator.insert(typed_chunk);
}
}
TypedChunk::WordPositionDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_position_docids = Some(cloneable_chunk);
TypedChunk::WordPositionDocids(chunk)
}
TypedChunk::VectorPoints {
expected_dimension,
remove_vectors,
embeddings,
manual_vectors,
embedder_name,
} => {
dimension.insert(embedder_name.clone(), expected_dimension);
TypedChunk::VectorPoints {
remove_vectors,
embeddings,
expected_dimension,
manual_vectors,
embedder_name,
}
}
otherwise => otherwise,
};
}
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunk, self.index, self.wtxn, index_is_empty)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
(self.progress)(UpdateIndexingStep::IndexDocuments {
documents_seen: documents_seen_count as usize,
total_documents: documents_count,
});
debug_span!("Seen", documents = documents_seen_count, total = documents_count);
}
if is_merged_database {
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
}
}
Ok(())
})?;
// We write the field distribution into the main database
self.index.put_field_distribution(self.wtxn, &field_distribution)?;
@ -548,10 +602,10 @@ where
}
self.execute_prefix_databases(
word_docids,
exact_word_docids,
word_position_docids,
word_fid_docids,
word_docids.map(MergerBuilder::build),
exact_word_docids.map(MergerBuilder::build),
word_position_docids.map(MergerBuilder::build),
word_fid_docids.map(MergerBuilder::build),
)?;
Ok(number_of_documents)
@ -565,10 +619,10 @@ where
)]
pub fn execute_prefix_databases(
self,
word_docids: Option<grenad::Reader<CursorClonableMmap>>,
exact_word_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_position_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_fid_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
word_fid_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
) -> Result<()>
where
FP: Fn(UpdateIndexingStep) + Sync,
@ -751,7 +805,7 @@ where
)]
fn execute_word_prefix_docids(
txn: &mut heed::RwTxn,
reader: grenad::Reader<Cursor<ClonableMmap>>,
merger: Merger<CursorClonableMmap, MergeFn>,
word_docids_db: Database<Str, CboRoaringBitmapCodec>,
word_prefix_docids_db: Database<Str, CboRoaringBitmapCodec>,
indexer_config: &IndexerConfig,
@ -761,13 +815,12 @@ fn execute_word_prefix_docids(
) -> Result<()> {
puffin::profile_function!();
let cursor = reader.into_cursor()?;
let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db);
builder.chunk_compression_type = indexer_config.chunk_compression_type;
builder.chunk_compression_level = indexer_config.chunk_compression_level;
builder.max_nb_chunks = indexer_config.max_nb_chunks;
builder.max_memory = indexer_config.max_memory;
builder.execute(cursor, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?;
builder.execute(merger, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?;
Ok(())
}