First iteration on exposing puffin profiling

This commit is contained in:
Kerollmops 2023-07-10 18:41:54 +02:00
parent 9daccdf7f0
commit eef95de30e
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
33 changed files with 210 additions and 17 deletions

39
Cargo.lock generated
View File

@ -1973,6 +1973,7 @@ dependencies = [
"meilisearch-types",
"nelson",
"page_size 0.5.0",
"puffin",
"roaring",
"serde",
"serde_json",
@ -2498,6 +2499,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "lz4_flex"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83"
[[package]]
name = "manifest-dir-macros"
version = "0.1.17"
@ -2587,6 +2594,8 @@ dependencies = [
"pin-project-lite",
"platform-dirs",
"prometheus",
"puffin",
"puffin_http",
"rand",
"rayon",
"regex",
@ -2731,6 +2740,7 @@ dependencies = [
"obkv",
"once_cell",
"ordered-float",
"puffin",
"rand",
"rand_pcg",
"rayon",
@ -3256,6 +3266,35 @@ version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "puffin"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76425abd4e1a0ad4bd6995dd974b52f414fca9974171df8e3708b3e660d05a21"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"cfg-if",
"instant",
"lz4_flex",
"once_cell",
"parking_lot",
"serde",
]
[[package]]
name = "puffin_http"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bffc600c35913d282ae1e96a6ffcdf36dc7a7cdb9310e0ba15914d258c8193"
dependencies = [
"anyhow",
"crossbeam-channel",
"log",
"puffin",
]
[[package]]
name = "quote"
version = "1.0.28"

View File

@ -22,6 +22,7 @@ log = "0.4.17"
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.5.0"
puffin = "0.16.0"
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }

View File

@ -471,6 +471,8 @@ impl IndexScheduler {
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
puffin::profile_function!();
let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
@ -575,6 +577,9 @@ impl IndexScheduler {
self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?;
self.breakpoint(crate::Breakpoint::InsideProcessBatch);
}
puffin::profile_function!(format!("{:?}", batch));
match batch {
Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
@ -1111,6 +1116,8 @@ impl IndexScheduler {
index: &'i Index,
operation: IndexOperation,
) -> Result<Vec<Task>> {
puffin::profile_function!();
match operation {
IndexOperation::DocumentClear { mut tasks, .. } => {
let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?;

View File

@ -1032,6 +1032,8 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start);
}
puffin::GlobalProfiler::lock().new_frame();
self.cleanup_task_queue()?;
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;

View File

@ -67,6 +67,8 @@ permissive-json-pointer = { path = "../permissive-json-pointer" }
pin-project-lite = "0.2.9"
platform-dirs = "0.3.0"
prometheus = { version = "0.13.3", features = ["process"] }
puffin = "0.16.0"
puffin_http = "0.13.0"
rand = "0.8.5"
rayon = "1.7.0"
regex = "1.7.3"

View File

@ -29,6 +29,9 @@ fn setup(opt: &Opt) -> anyhow::Result<()> {
async fn main() -> anyhow::Result<()> {
let (opt, config_read_from) = Opt::try_build()?;
puffin::set_scopes_on(true);
let _server = puffin_http::Server::new(&format!("0.0.0.0:{}", puffin_http::DEFAULT_PORT))?;
anyhow::ensure!(
!(cfg!(windows) && opt.experimental_reduce_indexing_memory_usage),
"The `experimental-reduce-indexing-memory-usage` flag is not supported on Windows"

View File

@ -67,6 +67,9 @@ filter-parser = { path = "../filter-parser" }
# documents words self-join
itertools = "0.10.5"
# profiling
puffin = "0.16.0"
# logging
log = "0.4.17"
logging_timer = "1.1.0"

View File

@ -15,6 +15,8 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
}
pub fn execute(self) -> Result<u64> {
puffin::profile_function!();
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let Index {
env: _env,

View File

@ -110,6 +110,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
Some(docid)
}
pub fn execute(self) -> Result<DocumentDeletionResult> {
puffin::profile_function!();
let DetailedDocumentDeletionResult { deleted_documents, remaining_documents } =
self.execute_inner()?;

View File

@ -31,6 +31,8 @@ pub fn enrich_documents_batch<R: Read + Seek>(
autogenerate_docids: bool,
reader: DocumentsBatchReader<R>,
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
puffin::profile_function!();
let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;

View File

@ -30,6 +30,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
stop_words: Option<&fst::Set<&[u8]>>,
max_positions_per_attributes: Option<u32>,
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread();

View File

@ -20,6 +20,8 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut facet_number_docids_sorter = create_sorter(

View File

@ -18,6 +18,8 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
docid_fid_facet_string: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut facet_string_docids_sorter = create_sorter(

View File

@ -34,6 +34,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters,
faceted_fields: &HashSet<FieldId>,
) -> Result<ExtractedFacetValues> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter(

View File

@ -22,6 +22,8 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter(

View File

@ -19,6 +19,8 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
primary_key_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId),
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,

View File

@ -19,6 +19,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
primary_key_id: FieldId,
vectors_fid: FieldId,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,

View File

@ -27,6 +27,8 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
exact_attributes: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_docids_sorter = create_sorter(

View File

@ -15,6 +15,8 @@ pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter(

View File

@ -21,6 +21,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorter = create_sorter(

View File

@ -18,6 +18,8 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter(

View File

@ -52,6 +52,8 @@ pub(crate) fn data_from_obkv_documents(
max_positions_per_attributes: Option<u32>,
exact_attributes: HashSet<FieldId>,
) -> Result<()> {
puffin::profile_function!();
original_obkv_chunks
.par_bridge()
.map(|original_documents_chunk| {
@ -238,11 +240,13 @@ fn spawn_extraction_task<FE, FS, M>(
M::Output: Send,
{
rayon::spawn(move || {
puffin::profile_scope!("extract_multiple_chunks", name);
let chunks: Result<M> =
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect();
rayon::spawn(move || match chunks {
Ok(chunks) => {
debug!("merge {} database", name);
puffin::profile_scope!("merge_multiple_chunks", name);
let reader = chunks.merge(merge_fn, &indexer);
let _ = lmdb_writer_sx.send(reader.map(serialize_fn));
}

View File

@ -214,6 +214,7 @@ pub fn sorter_into_lmdb_database(
sorter: Sorter<MergeFn>,
merge: MergeFn,
) -> Result<()> {
puffin::profile_function!();
debug!("Writing MTBL sorter...");
let before = Instant::now();

View File

@ -137,6 +137,8 @@ where
mut self,
reader: DocumentsBatchReader<R>,
) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add
if reader.is_empty() {
return Ok((self, Ok(0)));
@ -175,6 +177,8 @@ where
mut self,
to_delete: Vec<String>,
) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add
if to_delete.is_empty() {
return Ok((self, Ok(0)));
@ -194,6 +198,8 @@ where
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
puffin::profile_function!();
if self.added_documents == 0 {
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
@ -232,6 +238,8 @@ where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let TransformOutput {
primary_key,
fields_ids_map,
@ -322,6 +330,7 @@ where
// Run extraction pipeline in parallel.
pool.install(|| {
puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks
let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
@ -477,6 +486,8 @@ where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
@ -511,26 +522,36 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
let current_prefix_fst;
let common_prefix_fst_words_tmp;
let common_prefix_fst_words: Vec<_>;
let new_prefix_fst_words;
let del_prefix_fst_words;
{
puffin::profile_scope!("compute_prefix_diffs");
current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// We retrieve the common words between the previous and new prefix word fst.
let common_prefix_fst_words = fst_stream_into_vec(
common_prefix_fst_words_tmp = fst_stream_into_vec(
previous_words_prefixes_fst.op().add(&current_prefix_fst).intersection(),
);
let common_prefix_fst_words: Vec<_> = common_prefix_fst_words
common_prefix_fst_words = common_prefix_fst_words_tmp
.as_slice()
.linear_group_by_key(|x| x.chars().next().unwrap())
.collect();
// We retrieve the newly added words between the previous and new prefix word fst.
let new_prefix_fst_words = fst_stream_into_vec(
new_prefix_fst_words = fst_stream_into_vec(
current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(),
);
// We compute the set of prefixes that are no more part of the prefix fst.
let del_prefix_fst_words = fst_stream_into_hashset(
del_prefix_fst_words = fst_stream_into_hashset(
previous_words_prefixes_fst.op().add(&current_prefix_fst).difference(),
);
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -668,6 +689,8 @@ fn execute_word_prefix_docids(
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
let cursor = reader.into_cursor()?;
let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db);
builder.chunk_compression_type = indexer_config.chunk_compression_type;

View File

@ -558,6 +558,8 @@ impl<'a, 'i> Transform<'a, 'i> {
where
F: Fn(UpdateIndexingStep) + Sync,
{
puffin::profile_function!();
let primary_key = self
.index
.primary_key(wtxn)?

View File

@ -49,6 +49,66 @@ pub(crate) enum TypedChunk {
ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>),
}
impl TypedChunk {
pub fn to_debug_string(&self) -> String {
match self {
TypedChunk::FieldIdDocidFacetStrings(grenad) => {
format!("FieldIdDocidFacetStrings {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdDocidFacetNumbers(grenad) => {
format!("FieldIdDocidFacetNumbers {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::Documents(grenad) => {
format!("Documents {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdWordcountDocids(grenad) => {
format!("FieldIdWordcountDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::NewDocumentsIds(grenad) => {
format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => format!(
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {} }}",
word_docids_reader.len(),
exact_word_docids_reader.len()
),
TypedChunk::WordPositionDocids(grenad) => {
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordFidDocids(grenad) => {
format!("WordFidDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordPairProximityDocids(grenad) => {
format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetStringDocids(grenad) => {
format!("FieldIdFacetStringDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetNumberDocids(grenad) => {
format!("FieldIdFacetNumberDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetExistsDocids(grenad) => {
format!("FieldIdFacetExistsDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsNullDocids(grenad) => {
format!("FieldIdFacetIsNullDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsEmptyDocids(grenad) => {
format!("FieldIdFacetIsEmptyDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::GeoPoints(grenad) => {
format!("GeoPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::VectorPoints(grenad) => {
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::ScriptLanguageDocids(grenad) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", grenad.len())
}
}
}
}
/// Write typed chunk in the corresponding LMDB database of the provided index.
/// Return new documents seen.
pub(crate) fn write_typed_chunk_into_index(
@ -57,6 +117,8 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn: &mut RwTxn,
index_is_empty: bool,
) -> Result<(RoaringBitmap, bool)> {
puffin::profile_function!(typed_chunk.to_debug_string());
let mut is_merged_database = false;
match typed_chunk {
TypedChunk::Documents(obkv_documents_iter) => {
@ -336,6 +398,8 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
puffin::profile_function!(format!("number of entries: {}", data.len()));
let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>();
@ -378,6 +442,8 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
puffin::profile_function!(format!("number of entries: {}", data.len()));
if !index_is_empty {
return write_entries_into_database(
data,

View File

@ -50,6 +50,8 @@ impl<'t, 'u, 'i> PrefixWordPairsProximityDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&'a [String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
index_word_prefix_database(
self.wtxn,
self.index.word_pair_proximity_docids,

View File

@ -27,6 +27,8 @@ pub fn index_prefix_word_database(
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> Result<()> {
puffin::profile_function!();
let max_proximity = max_proximity - 1;
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");

View File

@ -191,6 +191,7 @@ pub fn index_word_prefix_database(
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");
// Make a prefix trie from the common prefixes that are shorter than self.max_prefix_length

View File

@ -303,6 +303,8 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
// if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition.

View File

@ -45,6 +45,8 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(

View File

@ -50,6 +50,7 @@ impl<'t, 'u, 'i> WordPrefixIntegerDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word levels integers docids into LMDB on disk...");
let mut prefix_integer_docids_sorter = create_sorter(

View File

@ -42,6 +42,8 @@ impl<'t, 'u, 'i> WordsPrefixesFst<'t, 'u, 'i> {
#[logging_timer::time("WordsPrefixesFst::{}")]
pub fn execute(self) -> Result<()> {
puffin::profile_function!();
let words_fst = self.index.words_fst(self.wtxn)?;
let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length];