feat: Introduce a new thread to avoid waiting on doc indexes fetchs

This commit is contained in:
Clément Renault 2019-08-16 16:35:19 +02:00
parent d9c9fafd78
commit b7b60b5fe5
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
4 changed files with 93 additions and 52 deletions

View File

@ -6,6 +6,7 @@ edition = "2018"
[dependencies] [dependencies]
byteorder = "1.3.1" byteorder = "1.3.1"
crossbeam-channel = "0.3.9"
deunicode = "1.0.0" deunicode = "1.0.0"
hashbrown = "0.2.2" hashbrown = "0.2.2"
lazy_static = "1.2.0" lazy_static = "1.2.0"

View File

@ -1,3 +1,5 @@
#![feature(checked_duration_since)]
#[cfg(test)] #[cfg(test)]
#[macro_use] extern crate assert_matches; #[macro_use] extern crate assert_matches;

View File

@ -2,14 +2,15 @@ use std::hash::Hash;
use std::ops::Range; use std::ops::Range;
use std::rc::Rc; use std::rc::Rc;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use std::{mem, cmp, cmp::Reverse}; use std::{iter, mem, cmp, cmp::Reverse};
use fst::{Streamer, IntoStreamer}; use fst::{Streamer, IntoStreamer};
use hashbrown::HashMap; use hashbrown::HashMap;
use levenshtein_automata::DFA; use levenshtein_automata::DFA;
use log::info; use log::{info, error};
use meilidb_tokenizer::{is_cjk, split_query_string}; use meilidb_tokenizer::{is_cjk, split_query_string};
use rayon::slice::ParallelSliceMut; use rayon::slice::ParallelSliceMut;
use rayon::iter::{ParallelIterator, ParallelBridge};
use sdset::SetBuf; use sdset::SetBuf;
use slice_group_by::{GroupBy, GroupByMut}; use slice_group_by::{GroupBy, GroupByMut};
@ -315,66 +316,101 @@ fn multiword_rewrite_matches(
} }
impl<'c, S, FI> QueryBuilder<'c, S, FI> impl<'c, S, FI> QueryBuilder<'c, S, FI>
where S: Store, where S: 'static + Store + Send + Clone,
S::Error: Send,
{ {
fn query_all(&self, query: &str) -> Result<Vec<RawDocument>, S::Error> { fn query_all(&self, query: &str) -> Result<Vec<RawDocument>, S::Error> {
let (automatons, query_enhancer) = generate_automatons(query, &self.store)?; let (automatons, query_enhancer) = generate_automatons(query, &self.store)?;
let words = self.store.words()?; let searchables = self.searchable_attrs.clone();
let searchables = self.searchable_attrs.as_ref(); let store = self.store.clone();
let mut matches = Vec::new(); let mut matches = Vec::new();
let mut highlights = Vec::new(); let mut highlights = Vec::new();
let fetching_end_time = Instant::now() + Duration::from_millis(30); let recv_end_time = Instant::now() + Duration::from_millis(30);
let mut query_db = Duration::default();
let start = Instant::now(); let start = Instant::now();
'automatons: for automaton in automatons { let (sender, receiver) = crossbeam_channel::bounded(10);
let Automaton { index, is_exact, query_len, dfa, .. } = automaton;
let mut stream = words.search(&dfa).into_stream();
while let Some(input) = stream.next() { rayon::spawn(move || {
let distance = dfa.eval(input).to_u8(); enum Error<E> {
let is_exact = is_exact && distance == 0 && input.len() == query_len; SendError,
StoreError(E),
let start = Instant::now();
let doc_indexes = self.store.word_indexes(input)?;
let doc_indexes = match doc_indexes {
Some(doc_indexes) => doc_indexes,
None => continue,
};
query_db += start.elapsed();
for di in doc_indexes.as_slice() {
if Instant::now() > fetching_end_time {
break 'automatons
}
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
if let Some(attribute) = attribute {
let match_ = TmpMatch {
query_index: index as u32,
distance,
attribute,
word_index: di.word_index,
is_exact,
};
let highlight = Highlight {
attribute: di.attribute,
char_index: di.char_index,
char_length: di.char_length,
};
matches.push((di.document_id, match_));
highlights.push((di.document_id, highlight));
}
}
} }
let result = automatons
.into_iter()
.par_bridge()
.try_for_each_with((sender, store, searchables.as_ref()), |data, automaton| {
let (sender, store, searchables) = data;
let Automaton { index, is_exact, query_len, dfa, .. } = automaton;
let words = store.words().map_err(Error::StoreError)?;
let mut stream = words.search(&dfa).into_stream();
let mut matches = Vec::new();
let mut highlights = Vec::new();
while let Some(input) = stream.next() {
let distance = dfa.eval(input).to_u8();
let is_exact = is_exact && distance == 0 && input.len() == query_len;
let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?;
let doc_indexes = match doc_indexes {
Some(doc_indexes) => doc_indexes,
None => continue,
};
matches.reserve(doc_indexes.len());
highlights.reserve(doc_indexes.len());
for di in doc_indexes.as_slice() {
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
if let Some(attribute) = attribute {
let match_ = TmpMatch {
query_index: index as u32,
distance,
attribute,
word_index: di.word_index,
is_exact,
};
let highlight = Highlight {
attribute: di.attribute,
char_index: di.char_index,
char_length: di.char_length,
};
matches.push((di.document_id, match_));
highlights.push((di.document_id, highlight));
}
}
}
sender.send((matches, highlights)).map_err(|_| Error::SendError)
});
if let Err(Error::StoreError(e)) = result {
error!("{}", e);
}
});
let iter = receiver.recv().into_iter().chain(iter::from_fn(|| {
match recv_end_time.checked_duration_since(Instant::now()) {
Some(timeout) => receiver.recv_timeout(timeout).ok(),
None => None,
}
}));
for (mut rcv_matches, mut rcv_highlights) in iter {
matches.append(&mut rcv_matches);
highlights.append(&mut rcv_highlights);
} }
info!("main query all took {:.2?} (get indexes {:.2?})", start.elapsed(), query_db); drop(receiver);
info!("main query all took {:.2?}", start.elapsed());
info!("{} total matches to rewrite", matches.len()); info!("{} total matches to rewrite", matches.len());
let start = Instant::now(); let start = Instant::now();
@ -401,7 +437,8 @@ where S: Store,
} }
impl<'c, S, FI> QueryBuilder<'c, S, FI> impl<'c, S, FI> QueryBuilder<'c, S, FI>
where S: Store, where S: 'static + Store + Send + Clone,
S::Error: Send,
FI: Fn(DocumentId) -> bool, FI: Fn(DocumentId) -> bool,
{ {
pub fn query(self, query: &str, range: Range<usize>) -> Result<Vec<Document>, S::Error> { pub fn query(self, query: &str, range: Range<usize>) -> Result<Vec<Document>, S::Error> {
@ -478,7 +515,8 @@ impl<'c, I, FI, FD> DistinctQueryBuilder<'c, I, FI, FD>
} }
impl<'c, S, FI, FD, K> DistinctQueryBuilder<'c, S, FI, FD> impl<'c, S, FI, FD, K> DistinctQueryBuilder<'c, S, FI, FD>
where S: Store, where S: 'static + Store + Send + Clone,
S::Error: Send,
FI: Fn(DocumentId) -> bool, FI: Fn(DocumentId) -> bool,
FD: Fn(DocumentId) -> Option<K>, FD: Fn(DocumentId) -> Option<K>,
K: Hash + Eq, K: Hash + Eq,

View File

@ -1,4 +1,4 @@
#[derive(Default)] #[derive(Default, Clone)]
pub struct ReorderedAttrs { pub struct ReorderedAttrs {
count: usize, count: usize,
reorders: Vec<Option<u16>>, reorders: Vec<Option<u16>>,