mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
Introduce a first version of a number of candidates reducer
It works by ignoring the postings lists associated to documents that the previous words did not returned
This commit is contained in:
parent
106b886873
commit
99d35fb940
@ -1,11 +1,12 @@
|
|||||||
use std::ops::Deref;
|
|
||||||
use std::{cmp, fmt};
|
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
use std::ops::Deref;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use std::{cmp, fmt};
|
||||||
|
|
||||||
use compact_arena::{SmallArena, Idx32, mk_arena};
|
use compact_arena::{SmallArena, Idx32, mk_arena};
|
||||||
use fst::{IntoStreamer, Streamer};
|
use fst::{IntoStreamer, Streamer};
|
||||||
@ -496,6 +497,7 @@ fn fetch_matches<'txn, 'tag>(
|
|||||||
debug!("words fst len {} and size {}", words.len(), words.as_fst().as_bytes().len());
|
debug!("words fst len {} and size {}", words.len(), words.as_fst().as_bytes().len());
|
||||||
|
|
||||||
let mut total_postings_lists = Vec::new();
|
let mut total_postings_lists = Vec::new();
|
||||||
|
let mut documents_ids = HashSet::<DocumentId>::new();
|
||||||
|
|
||||||
let mut dfa_time = Duration::default();
|
let mut dfa_time = Duration::default();
|
||||||
let mut postings_lists_fetching_time = Duration::default();
|
let mut postings_lists_fetching_time = Duration::default();
|
||||||
@ -509,6 +511,8 @@ fn fetch_matches<'txn, 'tag>(
|
|||||||
|
|
||||||
let mut stream_next_time = Duration::default();
|
let mut stream_next_time = Duration::default();
|
||||||
let mut number_of_words = 0;
|
let mut number_of_words = 0;
|
||||||
|
let mut postings_lists_original_length = 0;
|
||||||
|
let mut postings_lists_length = 0;
|
||||||
|
|
||||||
let byte = query.as_bytes()[0];
|
let byte = query.as_bytes()[0];
|
||||||
let mut stream = if byte == u8::max_value() {
|
let mut stream = if byte == u8::max_value() {
|
||||||
@ -535,14 +539,22 @@ fn fetch_matches<'txn, 'tag>(
|
|||||||
|
|
||||||
let before_postings_lists_fetching = Instant::now();
|
let before_postings_lists_fetching = Instant::now();
|
||||||
if let Some(postings_list) = postings_lists_store.postings_list(reader, input)? {
|
if let Some(postings_list) = postings_lists_store.postings_list(reader, input)? {
|
||||||
|
postings_lists_original_length += postings_list.len();
|
||||||
|
|
||||||
let input = Rc::from(input);
|
let input = Rc::from(input);
|
||||||
let postings_list = Rc::new(postings_list);
|
let postings_list = Rc::new(postings_list);
|
||||||
let postings_list_view = PostingsListView::original(input, postings_list);
|
let postings_list_view = PostingsListView::original(input, postings_list);
|
||||||
|
|
||||||
let mut offset = 0;
|
let mut offset = 0;
|
||||||
for group in postings_list_view.linear_group_by_key(|di| di.document_id) {
|
for group in postings_list_view.linear_group_by_key(|di| di.document_id) {
|
||||||
let posting_list_index = arena.add(postings_list_view.range(offset, group.len()));
|
|
||||||
let document_id = group[0].document_id;
|
let document_id = group[0].document_id;
|
||||||
|
|
||||||
|
if query_index != 0 && !documents_ids.contains(&document_id) { continue }
|
||||||
|
documents_ids.insert(document_id);
|
||||||
|
|
||||||
|
postings_lists_length += group.len();
|
||||||
|
|
||||||
|
let posting_list_index = arena.add(postings_list_view.range(offset, group.len()));
|
||||||
let bare_match = BareMatch {
|
let bare_match = BareMatch {
|
||||||
document_id,
|
document_id,
|
||||||
query_index: query_index as u16,
|
query_index: query_index as u16,
|
||||||
@ -559,6 +571,8 @@ fn fetch_matches<'txn, 'tag>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
debug!("{:?} gives {} words", query, number_of_words);
|
debug!("{:?} gives {} words", query, number_of_words);
|
||||||
|
debug!("{:?} gives postings lists of length {} (original was {})",
|
||||||
|
query, postings_lists_length, postings_lists_original_length);
|
||||||
debug!("stream next took {:.02?}", stream_next_time);
|
debug!("stream next took {:.02?}", stream_next_time);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ use std::collections::HashMap;
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
|
||||||
use fst::{set::OpBuilder, SetBuilder, IntoStreamer, Streamer};
|
use fst::{set::OpBuilder, SetBuilder, IntoStreamer, Streamer};
|
||||||
use sdset::{duo::Union, SetOperation, SetBuf};
|
use sdset::{duo::Union, SetOperation, Set, SetBuf};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
@ -196,34 +196,64 @@ pub fn apply_documents_addition<'a, 'b>(
|
|||||||
let pplc_store = prefix_postings_lists_cache_store;
|
let pplc_store = prefix_postings_lists_cache_store;
|
||||||
pplc_store.clear(writer)?;
|
pplc_store.clear(writer)?;
|
||||||
|
|
||||||
const MAX_PREFIX_LENGTH: usize = 1;
|
let mut previous_prefix: Option<([u8; 4], Vec<_>)> = None;
|
||||||
|
|
||||||
// compute prefixes and store those in the PrefixPostingsListsCache.
|
// compute prefixes and store those in the PrefixPostingsListsCache.
|
||||||
let mut stream = words_fst.into_stream();
|
let mut stream = words_fst.into_stream();
|
||||||
while let Some(input) = stream.next() {
|
while let Some(input) = stream.next() {
|
||||||
for i in 1..=MAX_PREFIX_LENGTH {
|
if let Some(postings_list) = postings_lists_store.postings_list(writer, input)?.map(Cow::into_owned) {
|
||||||
let prefix = &input[..i];
|
let prefix = &input[..1];
|
||||||
if let Some(postings_list) = postings_lists_store.postings_list(writer, prefix)? {
|
|
||||||
if let (Ok(input), Ok(prefix)) = (std::str::from_utf8(input), std::str::from_utf8(prefix)) {
|
|
||||||
debug!("{:?} postings list (prefix {:?}) length {}", input, prefix, postings_list.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
// compute the new prefix postings lists
|
let mut arr = [0; 4];
|
||||||
let mut p = [0; 4];
|
|
||||||
let len = std::cmp::min(4, prefix.len());
|
let len = std::cmp::min(4, prefix.len());
|
||||||
p[..len].copy_from_slice(&prefix[..len]);
|
arr[..len].copy_from_slice(prefix);
|
||||||
|
let arr_prefix = arr;
|
||||||
|
|
||||||
let previous = match pplc_store.prefix_postings_list(writer, p)? {
|
// if let (Ok(input), Ok(prefix)) = (std::str::from_utf8(input), std::str::from_utf8(prefix)) {
|
||||||
Some(previous) => previous,
|
// debug!("{:?} postings list (prefix {:?}) length {}", input, prefix, postings_list.len());
|
||||||
None => Cow::Owned(SetBuf::default()),
|
// }
|
||||||
};
|
|
||||||
|
|
||||||
let new_postings_list = Union::new(&postings_list, &previous).into_set_buf();
|
match previous_prefix {
|
||||||
pplc_store.put_prefix_postings_list(writer, p, &new_postings_list)?;
|
Some((ref mut prev_prefix, ref mut prev_postings_list)) if *prev_prefix != arr_prefix => {
|
||||||
|
prev_postings_list.sort_unstable();
|
||||||
|
prev_postings_list.dedup();
|
||||||
|
|
||||||
debug!("new length {}", new_postings_list.len());
|
if let Ok(prefix) = std::str::from_utf8(&prev_prefix[..1]) {
|
||||||
|
debug!("writing the prefix of {:?} of length {}",
|
||||||
|
prefix, prev_postings_list.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
let pls = Set::new_unchecked(&prev_postings_list);
|
||||||
|
pplc_store.put_prefix_postings_list(writer, *prev_prefix, &pls)?;
|
||||||
|
|
||||||
|
*prev_prefix = arr_prefix;
|
||||||
|
prev_postings_list.clear();
|
||||||
|
prev_postings_list.extend_from_slice(&postings_list);
|
||||||
|
},
|
||||||
|
Some((_, ref mut prev_postings_list)) => {
|
||||||
|
prev_postings_list.extend_from_slice(&postings_list);
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
let mut arr = [0; 4];
|
||||||
|
let len = std::cmp::min(4, prefix.len());
|
||||||
|
arr[..len].copy_from_slice(&prefix[..len]);
|
||||||
|
|
||||||
|
let prev_prefix = arr;
|
||||||
|
previous_prefix = Some((prev_prefix, postings_list.to_vec()));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// debug!("new length {}", new_postings_list.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write the last prefix postings lists
|
||||||
|
if let Some((prev_prefix, mut prev_postings_list)) = previous_prefix.take() {
|
||||||
|
prev_postings_list.sort_unstable();
|
||||||
|
prev_postings_list.dedup();
|
||||||
|
|
||||||
|
let pls = Set::new_unchecked(&prev_postings_list);
|
||||||
|
pplc_store.put_prefix_postings_list(writer, prev_prefix, &pls)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -23,15 +23,12 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use fst::{IntoStreamer, Streamer};
|
|
||||||
use heed::Result as ZResult;
|
use heed::Result as ZResult;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{store, DocumentId, MResult};
|
use crate::{store, DocumentId, MResult};
|
||||||
use crate::database::{MainT, UpdateT};
|
use crate::database::{MainT, UpdateT};
|
||||||
use crate::bucket_sort::bucket_sort;
|
|
||||||
use crate::criterion::Criteria;
|
|
||||||
use meilisearch_schema::Schema;
|
use meilisearch_schema::Schema;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
Loading…
Reference in New Issue
Block a user