diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 29d2e61ef..b214edde8 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -6,13 +6,12 @@ edition = "2018" [dependencies] byteorder = "1.3.1" -crossbeam-channel = "0.3.9" deunicode = "1.0.0" hashbrown = "0.2.2" lazy_static = "1.2.0" log = "0.4.6" meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } -rayon = "1.0.3" +rayon = "1.2.0" sdset = "0.3.2" serde = { version = "1.0.88", features = ["derive"] } slice-group-by = "0.2.6" diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index 5847f9f1b..a013af40c 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -2,12 +2,12 @@ use std::hash::Hash; use std::ops::Range; use std::rc::Rc; use std::time::{Instant, Duration}; -use std::{iter, mem, cmp, cmp::Reverse}; +use std::{mem, cmp, cmp::Reverse}; use fst::{Streamer, IntoStreamer}; use hashbrown::HashMap; use levenshtein_automata::DFA; -use log::{info, error}; +use log::info; use meilidb_tokenizer::{is_cjk, split_query_string}; use rayon::slice::ParallelSliceMut; use rayon::iter::{ParallelIterator, ParallelBridge}; @@ -246,12 +246,6 @@ fn multiword_rewrite_matches( // for each attribute of each document for same_document_attribute in matches.linear_group_by_key(|(id, m)| (*id, m.attribute)) { - let elapsed = start.elapsed(); - if timeout.map_or(false, |timeout| elapsed > timeout) { - info!("abort multiword rewrite after {:.2?}", elapsed); - break; - } - // padding will only be applied // to word indices in the same attribute let mut padding = 0; @@ -328,6 +322,9 @@ fn multiword_rewrite_matches( padding += biggest; } + + // check the timeout *after* having processed at least one element + if timeout.map_or(false, |timeout| start.elapsed() > timeout) { break } } info!("main multiword rewrite took {:.2?}", start.elapsed()); @@ -350,124 +347,102 @@ where S: Store + Sync, let store = &self.store; let fetch_timeout = &self.fetch_timeout; - rayon::scope(move |s| { - enum Error { - SendError, - StoreError(E), - } + let mut matches = Vec::new(); + let mut highlights = Vec::new(); - let mut matches = Vec::new(); - let mut highlights = Vec::new(); + let timeout = fetch_timeout.map(|d| d * 75 / 100); + let start = Instant::now(); - let recv_end_time = fetch_timeout.map(|d| Instant::now() + d * 75 / 100); - let start = Instant::now(); + let results: Vec<_> = automatons + .into_iter() + .par_bridge() + .map_with((store, searchables), |(store, searchables), automaton| { + let Automaton { index, is_exact, query_len, .. } = automaton; + let dfa = automaton.dfa(); - let (sender, receiver) = crossbeam_channel::unbounded(); - - s.spawn(move |_| { - let result = automatons - .into_iter() - .par_bridge() - .try_for_each_with((sender, store, searchables), |data, automaton| { - let (sender, store, searchables) = data; - let Automaton { index, is_exact, query_len, .. } = automaton; - let dfa = automaton.dfa(); - - let words = store.words().map_err(Error::StoreError)?; - let mut stream = words.search(&dfa).into_stream(); - - let mut matches = Vec::new(); - let mut highlights = Vec::new(); - - while let Some(input) = stream.next() { - let distance = dfa.eval(input).to_u8(); - let is_exact = is_exact && distance == 0 && input.len() == query_len; - - let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?; - let doc_indexes = match doc_indexes { - Some(doc_indexes) => doc_indexes, - None => continue, - }; - - matches.reserve(doc_indexes.len()); - highlights.reserve(doc_indexes.len()); - - for di in doc_indexes.as_slice() { - - let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); - if let Some(attribute) = attribute { - let match_ = TmpMatch { - query_index: index as u32, - distance, - attribute, - word_index: di.word_index, - is_exact, - }; - - let highlight = Highlight { - attribute: di.attribute, - char_index: di.char_index, - char_length: di.char_length, - }; - - matches.push((di.document_id, match_)); - highlights.push((di.document_id, highlight)); - } - } - } - - sender.send((matches, highlights)).map_err(|_| Error::SendError) - }); - - if let Err(Error::StoreError(e)) = result { - error!("{}", e); - } - }); - - let iter = receiver.recv().into_iter().chain(iter::from_fn(|| { - let recv_end_time = match recv_end_time { - Some(time) => time, - None => return receiver.recv().ok(), + let words = match store.words() { + Ok(words) => words, + Err(err) => return Some(Err(err)), }; - match recv_end_time.checked_duration_since(Instant::now()) { - Some(timeout) => receiver.recv_timeout(timeout).ok(), - None => None, + let mut stream = words.search(&dfa).into_stream(); + let mut matches = Vec::new(); + let mut highlights = Vec::new(); + + while let Some(input) = stream.next() { + let distance = dfa.eval(input).to_u8(); + let is_exact = is_exact && distance == 0 && input.len() == query_len; + + let doc_indexes = match store.word_indexes(input) { + Ok(Some(doc_indexes)) => doc_indexes, + Ok(None) => continue, + Err(err) => return Some(Err(err)), + }; + + matches.reserve(doc_indexes.len()); + highlights.reserve(doc_indexes.len()); + + for di in doc_indexes.as_slice() { + let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); + if let Some(attribute) = attribute { + let match_ = TmpMatch { + query_index: index as u32, + distance, + attribute, + word_index: di.word_index, + is_exact, + }; + + let highlight = Highlight { + attribute: di.attribute, + char_index: di.char_index, + char_length: di.char_length, + }; + + matches.push((di.document_id, match_)); + highlights.push((di.document_id, highlight)); + } + } + + // check the timeout *after* having processed at least one element + if timeout.map_or(false, |timeout| start.elapsed() > timeout) { break } } - })); - for (mut rcv_matches, mut rcv_highlights) in iter { - matches.append(&mut rcv_matches); - highlights.append(&mut rcv_highlights); - } + Some(Ok((matches, highlights))) + }) + .while_some() + .collect(); - drop(receiver); + for result in results { + let (mut rcv_matches, mut rcv_highlights) = result?; + matches.append(&mut rcv_matches); + highlights.append(&mut rcv_highlights); + } - info!("main query all took {:.2?}", start.elapsed()); - info!("{} total matches to rewrite", matches.len()); + info!("main query all took {:.2?}", start.elapsed()); + info!("{} total matches to rewrite", matches.len()); - let start = Instant::now(); - let timeout = fetch_timeout.map(|d| d * 25 / 100); - let matches = multiword_rewrite_matches(matches, &query_enhancer, timeout); - info!("multiword rewrite took {:.2?}", start.elapsed()); + let start = Instant::now(); + let timeout = fetch_timeout.map(|d| d * 25 / 100); + let matches = multiword_rewrite_matches(matches, &query_enhancer, timeout); + info!("multiword rewrite took {:.2?}", start.elapsed()); - let start = Instant::now(); - let highlights = { - highlights.par_sort_unstable_by_key(|(id, _)| *id); - SetBuf::new_unchecked(highlights) - }; - info!("sorting highlights took {:.2?}", start.elapsed()); + let start = Instant::now(); + let highlights = { + highlights.par_sort_unstable_by_key(|(id, _)| *id); + SetBuf::new_unchecked(highlights) + }; + info!("sorting highlights took {:.2?}", start.elapsed()); - info!("{} total matches to classify", matches.len()); + info!("{} total matches to classify", matches.len()); - let start = Instant::now(); - let raw_documents = raw_documents_from(matches, highlights); - info!("making raw documents took {:.2?}", start.elapsed()); + let start = Instant::now(); + let raw_documents = raw_documents_from(matches, highlights); + info!("making raw documents took {:.2?}", start.elapsed()); - info!("{} total documents to classify", raw_documents.len()); + info!("{} total documents to classify", raw_documents.len()); - Ok(raw_documents) - }) + Ok(raw_documents) } }