456: Remove useless grenad merging r=Kerollmops a=Kerollmops

This PR must be merged after #454.

This PR removes the part of code that was merging all of the grenad Readers merging that we don't need as the indexer should have merged them and, therefore, we should only have one final grenad Reader. We reduce the amount of CPU usage and memory pressure we were doing uselessly.

`@ManyTheFish` are you sure I can skip merging the `word_docids` database?

Here is the benchmark comparison with the previously merged PR #454:
```
group                                              indexing_reintroduce-appending-sorted-values_c05e42a8    indexing_remove-useless-grenad-merging_d5b8b5a2
-----                                              -----------------------------------------------------    -----------------------------------------------
indexing/Indexing movies with default settings     1.06      16.6±1.04s        ? ?/sec                      1.00      15.7±0.93s        ? ?/sec
indexing/Indexing songs with default settings      1.16      60.1±7.07s        ? ?/sec                      1.00      51.7±5.98s        ? ?/sec
indexing/Indexing songs without faceted numbers    1.06      55.4±6.14s        ? ?/sec                      1.00      52.2±4.13s        ? ?/sec
```

And the comparison with multi-batch indexing before #436, we can see that we gain time for benchmarks that index datasets in multiple batches but there is _so much_ variance that it's not clear.

```
group                                                             indexing_benchmark-multi-batch-indexing-before-speed-up_45f52620    indexing_remove-useless-grenad-merging_d5b8b5a2
-----                                                             ----------------------------------------------------------------    -----------------------------------------------
indexing/Indexing geo_point                                       1.07       6.6±0.08s        ? ?/sec                                 1.00       6.2±0.11s        ? ?/sec
indexing/Indexing songs in three batches with default settings    1.12      57.7±2.14s        ? ?/sec                                 1.00      51.5±3.80s        ? ?/sec
indexing/Indexing songs with default settings                     1.00      47.5±2.52s        ? ?/sec                                 1.09      51.7±5.98s        ? ?/sec
indexing/Indexing songs without any facets                        1.00      43.5±1.43s        ? ?/sec                                 1.12      48.8±3.73s        ? ?/sec
indexing/Indexing songs without faceted numbers                   1.00      47.1±2.23s        ? ?/sec                                 1.11      52.2±4.13s        ? ?/sec
indexing/Indexing wiki                                            1.00    917.3±30.01s        ? ?/sec                                 1.09    998.7±38.92s        ? ?/sec
indexing/Indexing wiki in three batches                           1.09   1091.2±32.73s        ? ?/sec                                 1.00    996.5±15.70s        ? ?/sec
```

What do you think `@irevoire?` Should we change the benchmarks to make them do more runs?

Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
bors[bot] 2022-03-01 16:48:08 +00:00 committed by GitHub
commit 51cf44d6fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 79 deletions

View File

@ -279,9 +279,9 @@ where
let index_documents_ids = self.index.documents_ids(self.wtxn)?;
let index_is_empty = index_documents_ids.len() == 0;
let mut final_documents_ids = RoaringBitmap::new();
let mut word_pair_proximity_docids = Vec::new();
let mut word_position_docids = Vec::new();
let mut word_docids = Vec::new();
let mut word_pair_proximity_docids = None;
let mut word_position_docids = None;
let mut word_docids = None;
let mut databases_seen = 0;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -293,17 +293,17 @@ where
let typed_chunk = match result? {
TypedChunk::WordDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_docids.push(cloneable_chunk);
word_docids = Some(cloneable_chunk);
TypedChunk::WordDocids(chunk)
}
TypedChunk::WordPairProximityDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_pair_proximity_docids.push(cloneable_chunk);
word_pair_proximity_docids = Some(cloneable_chunk);
TypedChunk::WordPairProximityDocids(chunk)
}
TypedChunk::WordPositionDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_position_docids.push(cloneable_chunk);
word_position_docids = Some(cloneable_chunk);
TypedChunk::WordPositionDocids(chunk)
}
otherwise => otherwise,
@ -356,9 +356,9 @@ where
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute_prefix_databases(
self,
word_docids: Vec<grenad::Reader<CursorClonableMmap>>,
word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>,
word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>,
word_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_pair_proximity_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_position_docids: Option<grenad::Reader<CursorClonableMmap>>,
) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
@ -424,18 +424,20 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the word prefix docids update operation.
let mut builder = WordPrefixDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute(
word_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
if let Some(word_docids) = word_docids {
// Run the word prefix docids update operation.
let mut builder = WordPrefixDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute(
word_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -443,18 +445,20 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the word prefix pair proximity docids update operation.
let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute(
word_pair_proximity_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
if let Some(word_pair_proximity_docids) = word_pair_proximity_docids {
// Run the word prefix pair proximity docids update operation.
let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute(
word_pair_proximity_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -462,24 +466,26 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the words prefix position docids update operation.
let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
if let Some(value) = self.config.words_positions_level_group_size {
builder.level_group_size(value);
if let Some(word_position_docids) = word_position_docids {
// Run the words prefix position docids update operation.
let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
if let Some(value) = self.config.words_positions_level_group_size {
builder.level_group_size(value);
}
if let Some(value) = self.config.words_positions_min_level_size {
builder.min_level_size(value);
}
builder.execute(
word_position_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
}
if let Some(value) = self.config.words_positions_min_level_size {
builder.min_level_size(value);
}
builder.execute(
word_position_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {

View File

@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};
use grenad::{CompressionType, MergerBuilder};
use grenad::CompressionType;
use heed::types::ByteSlice;
use crate::update::index_documents::{
@ -35,7 +35,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
#[logging_timer::time("WordPrefixDocids::{}")]
pub fn execute(
self,
new_word_docids: Vec<grenad::Reader<CursorClonableMmap>>,
new_word_docids: grenad::Reader<CursorClonableMmap>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
@ -50,15 +50,10 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
self.max_memory,
);
let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps);
for reader in new_word_docids {
word_docids_merger.push(reader.into_cursor()?);
}
let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?;
let mut new_word_docids_iter = new_word_docids.into_cursor()?;
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((word, data)) = word_docids_iter.next()? {
while let Some((word, data)) = new_word_docids_iter.move_on_next()? {
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => {

View File

@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};
use grenad::{CompressionType, MergerBuilder};
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::BytesDecode;
use log::debug;
@ -64,7 +64,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
#[logging_timer::time("WordPrefixPairProximityDocids::{}")]
pub fn execute(
self,
new_word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>,
new_word_pair_proximity_docids: grenad::Reader<CursorClonableMmap>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
@ -74,14 +74,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
let new_prefix_fst_words: Vec<_> =
new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect();
// We retrieve and merge the created word pair proximities docids entries
// for the newly added documents.
let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps);
for reader in new_word_pair_proximity_docids {
wppd_merger.push(reader.into_cursor()?);
}
let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?;
let mut new_wppd_iter = new_word_pair_proximity_docids.into_cursor()?;
let mut word_prefix_pair_proximity_docids_sorter = create_sorter(
merge_cbo_roaring_bitmaps,
self.chunk_compression_type,
@ -95,7 +88,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = wppd_iter.next()? {
while let Some((key, data)) = new_wppd_iter.move_on_next()? {
let (w1, w2, prox) = StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
if prox > self.max_proximity {
continue;

View File

@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::{cmp, str};
use grenad::{CompressionType, MergerBuilder};
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesDecode, BytesEncode};
use log::debug;
@ -57,7 +57,7 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> {
#[logging_timer::time("WordPrefixPositionDocids::{}")]
pub fn execute(
self,
new_word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>,
new_word_position_docids: grenad::Reader<CursorClonableMmap>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
@ -72,18 +72,13 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> {
self.max_memory,
);
let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps);
for reader in new_word_position_docids {
word_position_docids_merger.push(reader.into_cursor()?);
}
let mut word_position_docids_iter =
word_position_docids_merger.build().into_stream_merger_iter()?;
let mut new_word_position_docids_iter = new_word_position_docids.into_cursor()?;
// We fetch all the new common prefixes between the previous and new prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = word_position_docids_iter.next()? {
while let Some((key, data)) = new_word_position_docids_iter.move_on_next()? {
let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
current_prefixes = match current_prefixes.take() {