Make the clear all operation clear caches

This commit is contained in:
Clément Renault 2020-01-16 16:19:04 +01:00
parent 96139da0d2
commit be31a14326
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 136 additions and 65 deletions

View File

@ -9,6 +9,8 @@ pub fn apply_clear_all(
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
prefix_documents_cache: store::PrefixDocumentsCache,
prefix_postings_lists_cache: store::PrefixPostingsListsCache,
) -> MResult<()> {
main_store.put_words_fst(writer, &fst::Set::default())?;
main_store.put_ranked_map(writer, &RankedMap::default())?;
@ -17,6 +19,8 @@ pub fn apply_clear_all(
documents_fields_counts_store.clear(writer)?;
postings_lists_store.clear(writer)?;
docs_words_store.clear(writer)?;
prefix_documents_cache.clear(writer)?;
prefix_postings_lists_cache.clear(writer)?;
Ok(())
}

View File

@ -1,16 +1,15 @@
use std::collections::HashMap;
use fst::{set::OpBuilder, SetBuilder, IntoStreamer, Streamer};
use sdset::{duo::Union, SetOperation, Set};
use fst::{set::OpBuilder, SetBuilder};
use sdset::{duo::Union, SetOperation};
use serde::{Deserialize, Serialize};
use log::debug;
use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer};
use crate::store;
use crate::update::{apply_documents_deletion, next_update_id, Update};
use crate::update::{apply_documents_deletion, compute_short_prefixes, next_update_id, Update};
use crate::{Error, MResult, RankedMap};
pub struct DocumentsAddition<D> {
@ -143,6 +142,7 @@ pub fn apply_documents_addition<'a, 'b>(
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
prefix_postings_lists_cache_store,
documents_ids,
)?;
@ -179,70 +179,18 @@ pub fn apply_documents_addition<'a, 'b>(
postings_lists_store,
docs_words_store,
prefix_documents_cache_store,
prefix_postings_lists_cache_store,
&ranked_map,
number_of_inserted_documents,
indexer,
)?;
// retrieve the words fst to compute all those prefixes
let words_fst = match main_store.words_fst(writer)? {
Some(fst) => fst,
None => return Ok(()),
};
// clear the prefixes
let pplc_store = prefix_postings_lists_cache_store;
pplc_store.clear(writer)?;
for prefix_len in 1..=2 {
// compute prefixes and store those in the PrefixPostingsListsCache store.
let mut previous_prefix: Option<([u8; 4], Vec<_>)> = None;
let mut stream = words_fst.into_stream();
while let Some(input) = stream.next() {
// We skip the prefixes that are shorter than the current length
// we want to cache (<). We must ignore the input when it is exactly the
// same word as the prefix because if we match exactly on it we need
// to consider it as an exact match and not as a prefix (=).
if input.len() <= prefix_len { continue }
if let Some(postings_list) = postings_lists_store.postings_list(writer, input)?.map(|p| p.matches.into_owned()) {
let prefix = &input[..prefix_len];
let mut arr_prefix = [0; 4];
arr_prefix[..prefix_len].copy_from_slice(prefix);
match previous_prefix {
Some((ref mut prev_prefix, ref mut prev_pl)) if *prev_prefix != arr_prefix => {
prev_pl.sort_unstable();
prev_pl.dedup();
if let Ok(prefix) = std::str::from_utf8(&prev_prefix[..prefix_len]) {
debug!("writing the prefix of {:?} of length {}", prefix, prev_pl.len());
}
let pls = Set::new_unchecked(&prev_pl);
pplc_store.put_prefix_postings_list(writer, *prev_prefix, &pls)?;
*prev_prefix = arr_prefix;
prev_pl.clear();
prev_pl.extend_from_slice(&postings_list);
},
Some((_, ref mut prev_pl)) => prev_pl.extend_from_slice(&postings_list),
None => previous_prefix = Some((arr_prefix, postings_list.to_vec())),
}
}
}
// write the last prefix postings lists
if let Some((prev_prefix, mut prev_pl)) = previous_prefix.take() {
prev_pl.sort_unstable();
prev_pl.dedup();
let pls = Set::new_unchecked(&prev_pl);
pplc_store.put_prefix_postings_list(writer, prev_prefix, &pls)?;
}
}
compute_short_prefixes(
writer,
main_store,
postings_lists_store,
prefix_postings_lists_cache_store,
)?;
Ok(())
}
@ -255,6 +203,7 @@ pub fn apply_documents_partial_addition<'a, 'b>(
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
prefix_documents_cache_store: store::PrefixDocumentsCache,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
addition: Vec<HashMap<String, serde_json::Value>>,
) -> MResult<()> {
let mut documents_additions = HashMap::new();
@ -303,6 +252,7 @@ pub fn apply_documents_partial_addition<'a, 'b>(
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
prefix_postings_lists_cache_store,
documents_ids,
)?;
@ -339,10 +289,20 @@ pub fn apply_documents_partial_addition<'a, 'b>(
postings_lists_store,
docs_words_store,
prefix_documents_cache_store,
prefix_postings_lists_cache_store,
&ranked_map,
number_of_inserted_documents,
indexer,
)
)?;
compute_short_prefixes(
writer,
main_store,
postings_lists_store,
prefix_postings_lists_cache_store,
)?;
Ok(())
}
pub fn reindex_all_documents(
@ -353,6 +313,7 @@ pub fn reindex_all_documents(
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
prefix_documents_cache_store: store::PrefixDocumentsCache,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
) -> MResult<()> {
let schema = match main_store.schema(writer)? {
Some(schema) => schema,
@ -415,12 +376,20 @@ pub fn reindex_all_documents(
postings_lists_store,
docs_words_store,
prefix_documents_cache_store,
prefix_postings_lists_cache_store,
&ranked_map,
number_of_inserted_documents,
indexer,
)?;
}
compute_short_prefixes(
writer,
main_store,
postings_lists_store,
prefix_postings_lists_cache_store,
)?;
Ok(())
}
@ -430,6 +399,7 @@ pub fn write_documents_addition_index(
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
_prefix_documents_cache_store: store::PrefixDocumentsCache,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
ranked_map: &RankedMap,
number_of_inserted_documents: usize,
indexer: RawIndexer,
@ -478,5 +448,12 @@ pub fn write_documents_addition_index(
main_store.put_ranked_map(writer, ranked_map)?;
main_store.put_number_of_documents(writer, |old| old + number_of_inserted_documents as u64)?;
compute_short_prefixes(
writer,
main_store,
postings_lists_store,
prefix_postings_lists_cache_store,
)?;
Ok(())
}

View File

@ -8,7 +8,7 @@ use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::serde::extract_document_id;
use crate::store;
use crate::update::{next_update_id, Update};
use crate::update::{next_update_id, compute_short_prefixes, Update};
use crate::{DocumentId, Error, MResult, RankedMap};
pub struct DocumentsDeletion {
@ -90,6 +90,7 @@ pub fn apply_documents_deletion(
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
deletion: Vec<DocumentId>,
) -> MResult<()> {
let idset = SetBuf::from_dirty(deletion);
@ -189,5 +190,12 @@ pub fn apply_documents_deletion(
main_store.put_ranked_map(writer, &ranked_map)?;
main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
compute_short_prefixes(
writer,
main_store,
postings_lists_store,
prefix_postings_lists_cache_store,
)?;
Ok(())
}

View File

@ -26,6 +26,8 @@ use chrono::{DateTime, Utc};
use heed::Result as ZResult;
use log::debug;
use serde::{Deserialize, Serialize};
use fst::{IntoStreamer, Streamer};
use sdset::Set;
use crate::{store, DocumentId, MResult};
use crate::database::{MainT, UpdateT};
@ -262,6 +264,8 @@ pub fn update_task<'a, 'b>(
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
index.prefix_documents_cache,
index.prefix_postings_lists_cache,
);
(update_type, result, start.elapsed())
@ -279,6 +283,7 @@ pub fn update_task<'a, 'b>(
index.postings_lists,
index.docs_words,
index.prefix_documents_cache,
index.prefix_postings_lists_cache,
);
(update_type, result, start.elapsed())
@ -327,6 +332,7 @@ pub fn update_task<'a, 'b>(
index.postings_lists,
index.docs_words,
index.prefix_documents_cache,
index.prefix_postings_lists_cache,
documents,
);
@ -346,6 +352,7 @@ pub fn update_task<'a, 'b>(
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
index.prefix_postings_lists_cache,
documents,
);
@ -389,6 +396,7 @@ pub fn update_task<'a, 'b>(
index.postings_lists,
index.docs_words,
index.prefix_documents_cache,
index.prefix_postings_lists_cache,
stop_words,
);
@ -412,3 +420,73 @@ pub fn update_task<'a, 'b>(
Ok(status)
}
fn compute_short_prefixes(
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
) -> MResult<()>
{
// retrieve the words fst to compute all those prefixes
let words_fst = match main_store.words_fst(writer)? {
Some(fst) => fst,
None => return Ok(()),
};
// clear the prefixes
let pplc_store = prefix_postings_lists_cache_store;
pplc_store.clear(writer)?;
for prefix_len in 1..=2 {
// compute prefixes and store those in the PrefixPostingsListsCache store.
let mut previous_prefix: Option<([u8; 4], Vec<_>)> = None;
let mut stream = words_fst.into_stream();
while let Some(input) = stream.next() {
// We skip the prefixes that are shorter than the current length
// we want to cache (<). We must ignore the input when it is exactly the
// same word as the prefix because if we match exactly on it we need
// to consider it as an exact match and not as a prefix (=).
if input.len() <= prefix_len { continue }
if let Some(postings_list) = postings_lists_store.postings_list(writer, input)?.map(|p| p.matches.into_owned()) {
let prefix = &input[..prefix_len];
let mut arr_prefix = [0; 4];
arr_prefix[..prefix_len].copy_from_slice(prefix);
match previous_prefix {
Some((ref mut prev_prefix, ref mut prev_pl)) if *prev_prefix != arr_prefix => {
prev_pl.sort_unstable();
prev_pl.dedup();
if let Ok(prefix) = std::str::from_utf8(&prev_prefix[..prefix_len]) {
debug!("writing the prefix of {:?} of length {}", prefix, prev_pl.len());
}
let pls = Set::new_unchecked(&prev_pl);
pplc_store.put_prefix_postings_list(writer, *prev_prefix, &pls)?;
*prev_prefix = arr_prefix;
prev_pl.clear();
prev_pl.extend_from_slice(&postings_list);
},
Some((_, ref mut prev_pl)) => prev_pl.extend_from_slice(&postings_list),
None => previous_prefix = Some((arr_prefix, postings_list.to_vec())),
}
}
}
// write the last prefix postings lists
if let Some((prev_prefix, mut prev_pl)) = previous_prefix.take() {
prev_pl.sort_unstable();
prev_pl.dedup();
let pls = Set::new_unchecked(&prev_pl);
pplc_store.put_prefix_postings_list(writer, prev_prefix, &pls)?;
}
}
Ok(())
}

View File

@ -14,6 +14,7 @@ pub fn apply_schema_update(
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
prefix_documents_cache_store: store::PrefixDocumentsCache,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
) -> MResult<()> {
use UnsupportedOperation::{
CanOnlyIntroduceNewSchemaAttributesAtEnd, CannotRemoveSchemaAttribute,
@ -57,6 +58,7 @@ pub fn apply_schema_update(
postings_lists_store,
docs_words_store,
prefix_documents_cache_store,
prefix_postings_lists_cache_store,
)?
}

View File

@ -69,6 +69,7 @@ pub fn apply_stop_words_deletion(
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
prefix_documents_cache_store: store::PrefixDocumentsCache,
prefix_postings_lists_cache_store: store::PrefixPostingsListsCache,
deletion: BTreeSet<String>,
) -> MResult<()> {
let mut stop_words_builder = SetBuilder::memory();
@ -112,6 +113,7 @@ pub fn apply_stop_words_deletion(
postings_lists_store,
docs_words_store,
prefix_documents_cache_store,
prefix_postings_lists_cache_store,
)?;
}
}