diff --git a/meilidb-core/src/distinct_map.rs b/meilidb-core/src/distinct_map.rs new file mode 100644 index 000000000..c53ad0ea4 --- /dev/null +++ b/meilidb-core/src/distinct_map.rs @@ -0,0 +1,103 @@ +use std::hash::Hash; +use hashbrown::HashMap; + +pub struct DistinctMap { + inner: HashMap, + limit: usize, + len: usize, +} + +impl DistinctMap { + pub fn new(limit: usize) -> Self { + DistinctMap { + inner: HashMap::new(), + limit, + len: 0, + } + } + + pub fn len(&self) -> usize { + self.len + } +} + +pub struct BufferedDistinctMap<'a, K> { + internal: &'a mut DistinctMap, + inner: HashMap, + len: usize, +} + +impl<'a, K: Hash + Eq> BufferedDistinctMap<'a, K> { + pub fn new(internal: &'a mut DistinctMap) -> BufferedDistinctMap<'a, K> { + BufferedDistinctMap { + internal, + inner: HashMap::new(), + len: 0, + } + } + + pub fn register(&mut self, key: K) -> bool { + let internal_seen = self.internal.inner.get(&key).unwrap_or(&0); + let inner_seen = self.inner.entry(key).or_insert(0); + let seen = *internal_seen + *inner_seen; + + if seen < self.internal.limit { + *inner_seen += 1; + self.len += 1; + true + } else { + false + } + } + + pub fn register_without_key(&mut self) -> bool { + self.len += 1; + true + } + + pub fn transfert_to_internal(&mut self) { + for (k, v) in self.inner.drain() { + let value = self.internal.inner.entry(k).or_insert(0); + *value += v; + } + + self.internal.len += self.len; + self.len = 0; + } + + pub fn len(&self) -> usize { + self.internal.len() + self.len + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn easy_distinct_map() { + let mut map = DistinctMap::new(2); + let mut buffered = BufferedDistinctMap::new(&mut map); + + for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] { + buffered.register(x); + } + buffered.transfert_to_internal(); + assert_eq!(map.len(), 8); + + let mut map = DistinctMap::new(2); + let mut buffered = BufferedDistinctMap::new(&mut map); + assert_eq!(buffered.register(1), true); + assert_eq!(buffered.register(1), true); + assert_eq!(buffered.register(1), false); + assert_eq!(buffered.register(1), false); + + assert_eq!(buffered.register(2), true); + assert_eq!(buffered.register(3), true); + assert_eq!(buffered.register(2), true); + assert_eq!(buffered.register(2), false); + + buffered.transfert_to_internal(); + assert_eq!(map.len(), 5); + } +} diff --git a/meilidb-core/src/lib.rs b/meilidb-core/src/lib.rs index 9802ff2d5..83b0d9424 100644 --- a/meilidb-core/src/lib.rs +++ b/meilidb-core/src/lib.rs @@ -3,6 +3,7 @@ mod automaton; mod database; +mod distinct_map; mod error; mod number; mod query_builder; diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index efdb07ea1..7bbcf94fb 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -1,19 +1,24 @@ -use std::time::{Instant, Duration}; -use std::ops::Range; +use hashbrown::HashMap; +use std::hash::Hash; use std::mem; +use std::ops::Range; +use std::rc::Rc; +use std::time::{Instant, Duration}; use fst::{IntoStreamer, Streamer}; use sdset::SetBuf; use slice_group_by::{GroupBy, GroupByMut}; use crate::automaton::{Automaton, AutomatonProducer, QueryEnhancer}; +use crate::distinct_map::{DistinctMap, BufferedDistinctMap}; use crate::raw_document::{RawDocument, raw_documents_from}; use crate::{Document, DocumentId, Highlight, TmpMatch, criterion::Criteria}; use crate::{store, MResult, reordered_attrs::ReorderedAttrs}; -pub struct QueryBuilder<'a> { - criteria: Criteria<'a>, - searchables_attrs: Option, +pub struct QueryBuilder<'c, FI = fn(DocumentId) -> bool> { + criteria: Criteria<'c>, + searchable_attrs: Option, + filter: Option, timeout: Duration, main_store: store::Main, postings_lists_store: store::PostingsLists, @@ -185,22 +190,68 @@ fn fetch_raw_documents( Ok(raw_documents_from(matches, highlights)) } -impl<'a> QueryBuilder<'a> { +impl<'c> QueryBuilder<'c> { pub fn new( main: store::Main, postings_lists: store::PostingsLists, synonyms: store::Synonyms, - ) -> QueryBuilder<'a> { + ) -> QueryBuilder<'c> + { + QueryBuilder::with_criteria(main, postings_lists, synonyms, Criteria::default()) + } + + pub fn with_criteria( + main: store::Main, + postings_lists: store::PostingsLists, + synonyms: store::Synonyms, + criteria: Criteria<'c>, + ) -> QueryBuilder<'c> + { QueryBuilder { - criteria: Criteria::default(), - searchables_attrs: None, - timeout: Duration::from_secs(1), + criteria, + searchable_attrs: None, + filter: None, + timeout: Duration::from_millis(30), main_store: main, postings_lists_store: postings_lists, synonyms_store: synonyms, } } +} +impl<'c, FI> QueryBuilder<'c, FI> { + pub fn with_filter(self, function: F) -> QueryBuilder<'c, F> + where F: Fn(DocumentId) -> bool, + { + QueryBuilder { + criteria: self.criteria, + searchable_attrs: self.searchable_attrs, + filter: Some(function), + timeout: self.timeout, + main_store: self.main_store, + postings_lists_store: self.postings_lists_store, + synonyms_store: self.synonyms_store, + } + } + + pub fn with_fetch_timeout(self, timeout: Duration) -> QueryBuilder<'c, FI> { + QueryBuilder { timeout, ..self } + } + + pub fn with_distinct(self, function: F, size: usize) -> DistinctQueryBuilder<'c, FI, F> + where F: Fn(DocumentId) -> Option, + K: Hash + Eq, + { + DistinctQueryBuilder { inner: self, function, size } + } + + pub fn add_searchable_attribute(&mut self, attribute: u16) { + let reorders = self.searchable_attrs.get_or_insert_with(ReorderedAttrs::new); + reorders.insert_attribute(attribute); + } +} + +impl QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool { pub fn query( self, reader: &impl rkv::Readable, @@ -208,15 +259,22 @@ impl<'a> QueryBuilder<'a> { range: Range, ) -> MResult> { + // We delegate the filter work to the distinct query builder, + // specifying a distinct rule that has no effect. + if self.filter.is_some() { + let builder = self.with_distinct(|_| None as Option<()>, 1); + return builder.query(reader, query, range); + } + let start_processing = Instant::now(); - let mut raw_documents_processed = Vec::new(); + let mut raw_documents_processed = Vec::with_capacity(range.len()); let (automaton_producer, query_enhancer) = AutomatonProducer::new( - reader, - query, - self.main_store, - self.synonyms_store, - )?; + reader, + query, + self.main_store, + self.synonyms_store, + )?; let mut automaton_producer = automaton_producer.into_iter(); let mut automatons = Vec::new(); @@ -231,11 +289,16 @@ impl<'a> QueryBuilder<'a> { reader, &automatons, &query_enhancer, - self.searchables_attrs.as_ref(), + self.searchable_attrs.as_ref(), &self.main_store, &self.postings_lists_store, )?; + // stop processing when time is running out + if !raw_documents_processed.is_empty() && start_processing.elapsed() > self.timeout { + break + } + let mut groups = vec![raw_documents.as_mut_slice()]; 'criteria: for criterion in self.criteria.as_ref() { @@ -270,7 +333,7 @@ impl<'a> QueryBuilder<'a> { raw_documents_processed.clear(); raw_documents_processed.extend(iter); - // stop processing after there is no time + // stop processing when time is running out if start_processing.elapsed() > self.timeout { break } } @@ -285,6 +348,189 @@ impl<'a> QueryBuilder<'a> { } } +pub struct DistinctQueryBuilder<'c, FI, FD> { + inner: QueryBuilder<'c, FI>, + function: FD, + size: usize, +} + +impl<'c, FI, FD> DistinctQueryBuilder<'c, FI, FD> { + pub fn with_filter(self, function: F) -> DistinctQueryBuilder<'c, F, FD> + where F: Fn(DocumentId) -> bool, + { + DistinctQueryBuilder { + inner: self.inner.with_filter(function), + function: self.function, + size: self.size, + } + } + + pub fn with_fetch_timeout(self, timeout: Duration) -> DistinctQueryBuilder<'c, FI, FD> { + DistinctQueryBuilder { + inner: self.inner.with_fetch_timeout(timeout), + function: self.function, + size: self.size, + } + } + + pub fn add_searchable_attribute(&mut self, attribute: u16) { + self.inner.add_searchable_attribute(attribute); + } +} + +impl<'c, FI, FD, K> DistinctQueryBuilder<'c, FI, FD> +where FI: Fn(DocumentId) -> bool, + FD: Fn(DocumentId) -> Option, + K: Hash + Eq, +{ + pub fn query( + self, + reader: &impl rkv::Readable, + query: &str, + range: Range, + ) -> MResult> + { + let start_processing = Instant::now(); + let mut raw_documents_processed = Vec::new(); + + let (automaton_producer, query_enhancer) = AutomatonProducer::new( + reader, + query, + self.inner.main_store, + self.inner.synonyms_store, + )?; + + let mut automaton_producer = automaton_producer.into_iter(); + let mut automatons = Vec::new(); + + // aggregate automatons groups by groups after time + while let Some(auts) = automaton_producer.next() { + automatons.extend(auts); + + // we must retrieve the documents associated + // with the current automatons + let mut raw_documents = fetch_raw_documents( + reader, + &automatons, + &query_enhancer, + self.inner.searchable_attrs.as_ref(), + &self.inner.main_store, + &self.inner.postings_lists_store, + )?; + + // stop processing when time is running out + if !raw_documents_processed.is_empty() && start_processing.elapsed() > self.inner.timeout { + break + } + + let mut groups = vec![raw_documents.as_mut_slice()]; + let mut key_cache = HashMap::new(); + + let mut filter_map = HashMap::new(); + // these two variables informs on the current distinct map and + // on the raw offset of the start of the group where the + // range.start bound is located according to the distinct function + let mut distinct_map = DistinctMap::new(self.size); + let mut distinct_raw_offset = 0; + + 'criteria: for criterion in self.inner.criteria.as_ref() { + let tmp_groups = mem::replace(&mut groups, Vec::new()); + let mut buf_distinct = BufferedDistinctMap::new(&mut distinct_map); + let mut documents_seen = 0; + + for group in tmp_groups { + // if this group does not overlap with the requested range, + // push it without sorting and splitting it + if documents_seen + group.len() < distinct_raw_offset { + documents_seen += group.len(); + groups.push(group); + continue; + } + + group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); + + for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) { + // we must compute the real distinguished len of this sub-group + for document in group.iter() { + let filter_accepted = match &self.inner.filter { + Some(filter) => { + let entry = filter_map.entry(document.id); + *entry.or_insert_with(|| (filter)(document.id)) + }, + None => true, + }; + + if filter_accepted { + let entry = key_cache.entry(document.id); + let key = entry.or_insert_with(|| (self.function)(document.id).map(Rc::new)); + + match key.clone() { + Some(key) => buf_distinct.register(key), + None => buf_distinct.register_without_key(), + }; + } + + // the requested range end is reached: stop computing distinct + if buf_distinct.len() >= range.end { break } + } + + documents_seen += group.len(); + groups.push(group); + + // if this sub-group does not overlap with the requested range + // we must update the distinct map and its start index + if buf_distinct.len() < range.start { + buf_distinct.transfert_to_internal(); + distinct_raw_offset = documents_seen; + } + + // we have sort enough documents if the last document sorted is after + // the end of the requested range, we can continue to the next criterion + if buf_distinct.len() >= range.end { continue 'criteria } + } + } + } + + // once we classified the documents related to the current + // automatons we save that as the next valid result + let mut seen = BufferedDistinctMap::new(&mut distinct_map); + raw_documents_processed.clear(); + + for document in raw_documents.into_iter().skip(distinct_raw_offset) { + let filter_accepted = match &self.inner.filter { + Some(_) => filter_map.remove(&document.id).unwrap(), + None => true, + }; + + if filter_accepted { + let key = key_cache.remove(&document.id).unwrap(); + let distinct_accepted = match key { + Some(key) => seen.register(key), + None => seen.register_without_key(), + }; + + if distinct_accepted && seen.len() > range.start { + raw_documents_processed.push(document); + if raw_documents_processed.len() == range.len() { break } + } + } + } + + // stop processing when time is running out + if start_processing.elapsed() > self.inner.timeout { break } + } + + // make real documents now that we know + // those must be returned + let documents = raw_documents_processed + .into_iter() + .map(|d| Document::from_raw(d)) + .collect(); + + Ok(documents) + } +} + #[cfg(test)] mod tests { use super::*;