diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 25fb57119..29d2e61ef 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] byteorder = "1.3.1" +crossbeam-channel = "0.3.9" deunicode = "1.0.0" hashbrown = "0.2.2" lazy_static = "1.2.0" diff --git a/meilidb-core/src/lib.rs b/meilidb-core/src/lib.rs index 6f6e46359..0a7844292 100644 --- a/meilidb-core/src/lib.rs +++ b/meilidb-core/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(checked_duration_since)] + #[cfg(test)] #[macro_use] extern crate assert_matches; diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index 76e47e1ab..97a750d18 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -2,14 +2,15 @@ use std::hash::Hash; use std::ops::Range; use std::rc::Rc; use std::time::{Instant, Duration}; -use std::{mem, cmp, cmp::Reverse}; +use std::{iter, mem, cmp, cmp::Reverse}; use fst::{Streamer, IntoStreamer}; use hashbrown::HashMap; use levenshtein_automata::DFA; -use log::info; +use log::{info, error}; use meilidb_tokenizer::{is_cjk, split_query_string}; use rayon::slice::ParallelSliceMut; +use rayon::iter::{ParallelIterator, ParallelBridge}; use sdset::SetBuf; use slice_group_by::{GroupBy, GroupByMut}; @@ -315,66 +316,101 @@ fn multiword_rewrite_matches( } impl<'c, S, FI> QueryBuilder<'c, S, FI> -where S: Store, +where S: 'static + Store + Send + Clone, + S::Error: Send, { fn query_all(&self, query: &str) -> Result, S::Error> { let (automatons, query_enhancer) = generate_automatons(query, &self.store)?; - let words = self.store.words()?; - let searchables = self.searchable_attrs.as_ref(); + let searchables = self.searchable_attrs.clone(); + let store = self.store.clone(); let mut matches = Vec::new(); let mut highlights = Vec::new(); - let fetching_end_time = Instant::now() + Duration::from_millis(30); - let mut query_db = Duration::default(); + let recv_end_time = Instant::now() + Duration::from_millis(30); let start = Instant::now(); - 'automatons: for automaton in automatons { - let Automaton { index, is_exact, query_len, dfa, .. } = automaton; - let mut stream = words.search(&dfa).into_stream(); + let (sender, receiver) = crossbeam_channel::bounded(10); - while let Some(input) = stream.next() { - let distance = dfa.eval(input).to_u8(); - let is_exact = is_exact && distance == 0 && input.len() == query_len; - - let start = Instant::now(); - let doc_indexes = self.store.word_indexes(input)?; - let doc_indexes = match doc_indexes { - Some(doc_indexes) => doc_indexes, - None => continue, - }; - query_db += start.elapsed(); - - for di in doc_indexes.as_slice() { - - if Instant::now() > fetching_end_time { - break 'automatons - } - - let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); - if let Some(attribute) = attribute { - let match_ = TmpMatch { - query_index: index as u32, - distance, - attribute, - word_index: di.word_index, - is_exact, - }; - - let highlight = Highlight { - attribute: di.attribute, - char_index: di.char_index, - char_length: di.char_length, - }; - - matches.push((di.document_id, match_)); - highlights.push((di.document_id, highlight)); - } - } + rayon::spawn(move || { + enum Error { + SendError, + StoreError(E), } + + let result = automatons + .into_iter() + .par_bridge() + .try_for_each_with((sender, store, searchables.as_ref()), |data, automaton| { + let (sender, store, searchables) = data; + let Automaton { index, is_exact, query_len, dfa, .. } = automaton; + + let words = store.words().map_err(Error::StoreError)?; + let mut stream = words.search(&dfa).into_stream(); + + let mut matches = Vec::new(); + let mut highlights = Vec::new(); + + while let Some(input) = stream.next() { + let distance = dfa.eval(input).to_u8(); + let is_exact = is_exact && distance == 0 && input.len() == query_len; + + let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?; + let doc_indexes = match doc_indexes { + Some(doc_indexes) => doc_indexes, + None => continue, + }; + + matches.reserve(doc_indexes.len()); + highlights.reserve(doc_indexes.len()); + + for di in doc_indexes.as_slice() { + + let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); + if let Some(attribute) = attribute { + let match_ = TmpMatch { + query_index: index as u32, + distance, + attribute, + word_index: di.word_index, + is_exact, + }; + + let highlight = Highlight { + attribute: di.attribute, + char_index: di.char_index, + char_length: di.char_length, + }; + + matches.push((di.document_id, match_)); + highlights.push((di.document_id, highlight)); + } + } + } + + sender.send((matches, highlights)).map_err(|_| Error::SendError) + }); + + if let Err(Error::StoreError(e)) = result { + error!("{}", e); + } + }); + + let iter = receiver.recv().into_iter().chain(iter::from_fn(|| { + match recv_end_time.checked_duration_since(Instant::now()) { + Some(timeout) => receiver.recv_timeout(timeout).ok(), + None => None, + } + })); + + for (mut rcv_matches, mut rcv_highlights) in iter { + matches.append(&mut rcv_matches); + highlights.append(&mut rcv_highlights); } - info!("main query all took {:.2?} (get indexes {:.2?})", start.elapsed(), query_db); + drop(receiver); + + info!("main query all took {:.2?}", start.elapsed()); info!("{} total matches to rewrite", matches.len()); let start = Instant::now(); @@ -401,7 +437,8 @@ where S: Store, } impl<'c, S, FI> QueryBuilder<'c, S, FI> -where S: Store, +where S: 'static + Store + Send + Clone, + S::Error: Send, FI: Fn(DocumentId) -> bool, { pub fn query(self, query: &str, range: Range) -> Result, S::Error> { @@ -478,7 +515,8 @@ impl<'c, I, FI, FD> DistinctQueryBuilder<'c, I, FI, FD> } impl<'c, S, FI, FD, K> DistinctQueryBuilder<'c, S, FI, FD> -where S: Store, +where S: 'static + Store + Send + Clone, + S::Error: Send, FI: Fn(DocumentId) -> bool, FD: Fn(DocumentId) -> Option, K: Hash + Eq, diff --git a/meilidb-core/src/reordered_attrs.rs b/meilidb-core/src/reordered_attrs.rs index ad7b2c324..ed11045ab 100644 --- a/meilidb-core/src/reordered_attrs.rs +++ b/meilidb-core/src/reordered_attrs.rs @@ -1,4 +1,4 @@ -#[derive(Default)] +#[derive(Default, Clone)] pub struct ReorderedAttrs { count: usize, reorders: Vec>,