457: Avoid iterating on big databases when useless r=Kerollmops a=Kerollmops

This PR makes the prefix database updates to avoid iterating on big grenad files when it is unnecessary. We introduced this regression in #436 but it went unnoticed.

---

According to the following benchmark results, we take more time when we index documents in one run than before #436. It looks like it is probably due to the fact that, now, instead of computing the prefixes database by iterating on the LMDB we directly iterate on the grenad file. Those could be slower to iterate on and could be the slowdown cause.

I just pushed a commit that tests this branch with the new unreleased version of grenad where some work was done to speed up the iteration on grenad files. [The benchmarks for this last commit](https://github.com/meilisearch/milli/actions/runs/1927187408) are currently running. You can [see the diff](https://github.com/meilisearch/grenad/compare/v0.4.1...main) between the v0.4 and the unreleased v0.5 version of grenad.

```diff
  group                                                             indexing_benchmark-multi-batch-indexing-before-speed-up_45f52620    indexing_stop-iterating-on-big-grenad-files_ac8b85c4
  -----                                                             ----------------------------------------------------------------    ----------------------------------------------------
+ indexing/Indexing songs in three batches with default settings    1.12      57.7±2.14s        ? ?/sec                                 1.00      51.3±2.76s        ? ?/sec
- indexing/Indexing wiki                                            1.00    917.3±30.01s        ? ?/sec                                 1.10   1008.4±38.27s        ? ?/sec
+ indexing/Indexing wiki in three batches                           1.10   1091.2±32.73s        ? ?/sec                                 1.00    995.5±24.33s        ? ?/sec
```

Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
bors[bot] 2022-03-09 16:46:34 +00:00 committed by GitHub
commit 290a29b5fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 98 deletions

View File

@ -50,35 +50,38 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
self.max_memory,
);
let mut new_word_docids_iter = new_word_docids.into_cursor()?;
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((word, data)) = new_word_docids_iter.move_on_next()? {
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
common_prefix_fst_words
.iter()
.find(|prefixes| word.starts_with(&prefixes[0].as_bytes()))
}
};
if !common_prefix_fst_words.is_empty() {
let mut new_word_docids_iter = new_word_docids.into_cursor()?;
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((word, data)) = new_word_docids_iter.move_on_next()? {
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
common_prefix_fst_words
.iter()
.find(|prefixes| word.starts_with(&prefixes[0].as_bytes()))
}
};
if let Some(prefixes) = current_prefixes {
for prefix in prefixes.iter() {
if word.starts_with(prefix.as_bytes()) {
match prefixes_cache.get_mut(prefix.as_bytes()) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache.insert(prefix.clone().into(), vec![data.to_owned()]);
if let Some(prefixes) = current_prefixes {
for prefix in prefixes.iter() {
if word.starts_with(prefix.as_bytes()) {
match prefixes_cache.get_mut(prefix.as_bytes()) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache
.insert(prefix.clone().into(), vec![data.to_owned()]);
}
}
}
}
}
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
}
// We fetch the docids associated to the newly added word prefix fst only.
let db = self.index.word_docids.remap_data_type::<ByteSlice>();

View File

@ -83,70 +83,76 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
self.max_memory,
);
// We compute the prefix docids associated with the common prefixes between
// the old and new word prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = new_wppd_iter.move_on_next()? {
let (w1, w2, prox) = StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
if prox > self.max_proximity {
continue;
if !common_prefix_fst_words.is_empty() {
// We compute the prefix docids associated with the common prefixes between
// the old and new word prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = new_wppd_iter.move_on_next()? {
let (w1, w2, prox) =
StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
if prox > self.max_proximity {
continue;
}
insert_current_prefix_data_in_sorter(
&mut buffer,
&mut current_prefixes,
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
common_prefix_fst_words,
self.max_prefix_length,
w1,
w2,
prox,
data,
)?;
}
insert_current_prefix_data_in_sorter(
&mut buffer,
&mut current_prefixes,
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
common_prefix_fst_words,
self.max_prefix_length,
w1,
w2,
prox,
data,
)?;
}
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
)?;
if !new_prefix_fst_words.is_empty() {
// We compute the prefix docids associated with the newly added prefixes
// in the new word prefix fst.
let mut db_iter = self
.index
.word_pair_proximity_docids
.remap_data_type::<ByteSlice>()
.iter(self.wtxn)?;
// We compute the prefix docids associated with the newly added prefixes
// in the new word prefix fst.
let mut db_iter =
self.index.word_pair_proximity_docids.remap_data_type::<ByteSlice>().iter(self.wtxn)?;
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some(((w1, w2, prox), data)) = db_iter.next().transpose()? {
if prox > self.max_proximity {
continue;
}
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some(((w1, w2, prox), data)) = db_iter.next().transpose()? {
if prox > self.max_proximity {
continue;
insert_current_prefix_data_in_sorter(
&mut buffer,
&mut current_prefixes,
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
&new_prefix_fst_words,
self.max_prefix_length,
w1,
w2,
prox,
data,
)?;
}
insert_current_prefix_data_in_sorter(
&mut buffer,
&mut current_prefixes,
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
&new_prefix_fst_words,
self.max_prefix_length,
w1,
w2,
prox,
data,
)?;
}
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
)?;
drop(db_iter);
// All of the word prefix pairs in the database that have a w2
// that is contained in the `suppr_pw` set must be removed as well.
let mut iter = self

View File

@ -74,42 +74,46 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> {
let mut new_word_position_docids_iter = new_word_position_docids.into_cursor()?;
// We fetch all the new common prefixes between the previous and new prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = new_word_position_docids_iter.move_on_next()? {
let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
if !common_prefix_fst_words.is_empty() {
// We fetch all the new common prefixes between the previous and new prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = new_word_position_docids_iter.move_on_next()? {
let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0]) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_position_docids_sorter,
)?;
common_prefix_fst_words.iter().find(|prefixes| word.starts_with(&prefixes[0]))
}
};
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0]) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_position_docids_sorter,
)?;
common_prefix_fst_words
.iter()
.find(|prefixes| word.starts_with(&prefixes[0]))
}
};
if let Some(prefixes) = current_prefixes {
for prefix in prefixes.iter() {
if word.starts_with(prefix) {
buffer.clear();
buffer.extend_from_slice(prefix.as_bytes());
buffer.extend_from_slice(&pos.to_be_bytes());
match prefixes_cache.get_mut(&buffer) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]);
if let Some(prefixes) = current_prefixes {
for prefix in prefixes.iter() {
if word.starts_with(prefix) {
buffer.clear();
buffer.extend_from_slice(prefix.as_bytes());
buffer.extend_from_slice(&pos.to_be_bytes());
match prefixes_cache.get_mut(&buffer) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]);
}
}
}
}
}
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_position_docids_sorter)?;
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_position_docids_sorter)?;
}
// We fetch the docids associated to the newly added word prefix fst only.
let db = self.index.word_position_docids.remap_data_type::<ByteSlice>();