mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
Merge pull request #176 from meilisearch/no-more-hanging-threads
Replace the rayon::scope by always checking time
This commit is contained in:
commit
5b8bc09826
@ -6,13 +6,12 @@ edition = "2018"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
byteorder = "1.3.1"
|
byteorder = "1.3.1"
|
||||||
crossbeam-channel = "0.3.9"
|
|
||||||
deunicode = "1.0.0"
|
deunicode = "1.0.0"
|
||||||
hashbrown = "0.2.2"
|
hashbrown = "0.2.2"
|
||||||
lazy_static = "1.2.0"
|
lazy_static = "1.2.0"
|
||||||
log = "0.4.6"
|
log = "0.4.6"
|
||||||
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
|
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
|
||||||
rayon = "1.0.3"
|
rayon = "1.2.0"
|
||||||
sdset = "0.3.2"
|
sdset = "0.3.2"
|
||||||
serde = { version = "1.0.88", features = ["derive"] }
|
serde = { version = "1.0.88", features = ["derive"] }
|
||||||
slice-group-by = "0.2.6"
|
slice-group-by = "0.2.6"
|
||||||
|
@ -2,12 +2,12 @@ use std::hash::Hash;
|
|||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
use std::{iter, mem, cmp, cmp::Reverse};
|
use std::{mem, cmp, cmp::Reverse};
|
||||||
|
|
||||||
use fst::{Streamer, IntoStreamer};
|
use fst::{Streamer, IntoStreamer};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use levenshtein_automata::DFA;
|
use levenshtein_automata::DFA;
|
||||||
use log::{info, error};
|
use log::info;
|
||||||
use meilidb_tokenizer::{is_cjk, split_query_string};
|
use meilidb_tokenizer::{is_cjk, split_query_string};
|
||||||
use rayon::slice::ParallelSliceMut;
|
use rayon::slice::ParallelSliceMut;
|
||||||
use rayon::iter::{ParallelIterator, ParallelBridge};
|
use rayon::iter::{ParallelIterator, ParallelBridge};
|
||||||
@ -246,12 +246,6 @@ fn multiword_rewrite_matches(
|
|||||||
// for each attribute of each document
|
// for each attribute of each document
|
||||||
for same_document_attribute in matches.linear_group_by_key(|(id, m)| (*id, m.attribute)) {
|
for same_document_attribute in matches.linear_group_by_key(|(id, m)| (*id, m.attribute)) {
|
||||||
|
|
||||||
let elapsed = start.elapsed();
|
|
||||||
if timeout.map_or(false, |timeout| elapsed > timeout) {
|
|
||||||
info!("abort multiword rewrite after {:.2?}", elapsed);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// padding will only be applied
|
// padding will only be applied
|
||||||
// to word indices in the same attribute
|
// to word indices in the same attribute
|
||||||
let mut padding = 0;
|
let mut padding = 0;
|
||||||
@ -328,6 +322,9 @@ fn multiword_rewrite_matches(
|
|||||||
|
|
||||||
padding += biggest;
|
padding += biggest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check the timeout *after* having processed at least one element
|
||||||
|
if timeout.map_or(false, |timeout| start.elapsed() > timeout) { break }
|
||||||
}
|
}
|
||||||
info!("main multiword rewrite took {:.2?}", start.elapsed());
|
info!("main multiword rewrite took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
@ -350,124 +347,102 @@ where S: Store + Sync,
|
|||||||
let store = &self.store;
|
let store = &self.store;
|
||||||
let fetch_timeout = &self.fetch_timeout;
|
let fetch_timeout = &self.fetch_timeout;
|
||||||
|
|
||||||
rayon::scope(move |s| {
|
let mut matches = Vec::new();
|
||||||
enum Error<E> {
|
let mut highlights = Vec::new();
|
||||||
SendError,
|
|
||||||
StoreError(E),
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut matches = Vec::new();
|
let timeout = fetch_timeout.map(|d| d * 75 / 100);
|
||||||
let mut highlights = Vec::new();
|
let start = Instant::now();
|
||||||
|
|
||||||
let recv_end_time = fetch_timeout.map(|d| Instant::now() + d * 75 / 100);
|
let results: Vec<_> = automatons
|
||||||
let start = Instant::now();
|
.into_iter()
|
||||||
|
.par_bridge()
|
||||||
|
.map_with((store, searchables), |(store, searchables), automaton| {
|
||||||
|
let Automaton { index, is_exact, query_len, .. } = automaton;
|
||||||
|
let dfa = automaton.dfa();
|
||||||
|
|
||||||
let (sender, receiver) = crossbeam_channel::unbounded();
|
let words = match store.words() {
|
||||||
|
Ok(words) => words,
|
||||||
s.spawn(move |_| {
|
Err(err) => return Some(Err(err)),
|
||||||
let result = automatons
|
|
||||||
.into_iter()
|
|
||||||
.par_bridge()
|
|
||||||
.try_for_each_with((sender, store, searchables), |data, automaton| {
|
|
||||||
let (sender, store, searchables) = data;
|
|
||||||
let Automaton { index, is_exact, query_len, .. } = automaton;
|
|
||||||
let dfa = automaton.dfa();
|
|
||||||
|
|
||||||
let words = store.words().map_err(Error::StoreError)?;
|
|
||||||
let mut stream = words.search(&dfa).into_stream();
|
|
||||||
|
|
||||||
let mut matches = Vec::new();
|
|
||||||
let mut highlights = Vec::new();
|
|
||||||
|
|
||||||
while let Some(input) = stream.next() {
|
|
||||||
let distance = dfa.eval(input).to_u8();
|
|
||||||
let is_exact = is_exact && distance == 0 && input.len() == query_len;
|
|
||||||
|
|
||||||
let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?;
|
|
||||||
let doc_indexes = match doc_indexes {
|
|
||||||
Some(doc_indexes) => doc_indexes,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
matches.reserve(doc_indexes.len());
|
|
||||||
highlights.reserve(doc_indexes.len());
|
|
||||||
|
|
||||||
for di in doc_indexes.as_slice() {
|
|
||||||
|
|
||||||
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
|
|
||||||
if let Some(attribute) = attribute {
|
|
||||||
let match_ = TmpMatch {
|
|
||||||
query_index: index as u32,
|
|
||||||
distance,
|
|
||||||
attribute,
|
|
||||||
word_index: di.word_index,
|
|
||||||
is_exact,
|
|
||||||
};
|
|
||||||
|
|
||||||
let highlight = Highlight {
|
|
||||||
attribute: di.attribute,
|
|
||||||
char_index: di.char_index,
|
|
||||||
char_length: di.char_length,
|
|
||||||
};
|
|
||||||
|
|
||||||
matches.push((di.document_id, match_));
|
|
||||||
highlights.push((di.document_id, highlight));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sender.send((matches, highlights)).map_err(|_| Error::SendError)
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(Error::StoreError(e)) = result {
|
|
||||||
error!("{}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let iter = receiver.recv().into_iter().chain(iter::from_fn(|| {
|
|
||||||
let recv_end_time = match recv_end_time {
|
|
||||||
Some(time) => time,
|
|
||||||
None => return receiver.recv().ok(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
match recv_end_time.checked_duration_since(Instant::now()) {
|
let mut stream = words.search(&dfa).into_stream();
|
||||||
Some(timeout) => receiver.recv_timeout(timeout).ok(),
|
let mut matches = Vec::new();
|
||||||
None => None,
|
let mut highlights = Vec::new();
|
||||||
|
|
||||||
|
while let Some(input) = stream.next() {
|
||||||
|
let distance = dfa.eval(input).to_u8();
|
||||||
|
let is_exact = is_exact && distance == 0 && input.len() == query_len;
|
||||||
|
|
||||||
|
let doc_indexes = match store.word_indexes(input) {
|
||||||
|
Ok(Some(doc_indexes)) => doc_indexes,
|
||||||
|
Ok(None) => continue,
|
||||||
|
Err(err) => return Some(Err(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
matches.reserve(doc_indexes.len());
|
||||||
|
highlights.reserve(doc_indexes.len());
|
||||||
|
|
||||||
|
for di in doc_indexes.as_slice() {
|
||||||
|
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
|
||||||
|
if let Some(attribute) = attribute {
|
||||||
|
let match_ = TmpMatch {
|
||||||
|
query_index: index as u32,
|
||||||
|
distance,
|
||||||
|
attribute,
|
||||||
|
word_index: di.word_index,
|
||||||
|
is_exact,
|
||||||
|
};
|
||||||
|
|
||||||
|
let highlight = Highlight {
|
||||||
|
attribute: di.attribute,
|
||||||
|
char_index: di.char_index,
|
||||||
|
char_length: di.char_length,
|
||||||
|
};
|
||||||
|
|
||||||
|
matches.push((di.document_id, match_));
|
||||||
|
highlights.push((di.document_id, highlight));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check the timeout *after* having processed at least one element
|
||||||
|
if timeout.map_or(false, |timeout| start.elapsed() > timeout) { break }
|
||||||
}
|
}
|
||||||
}));
|
|
||||||
|
|
||||||
for (mut rcv_matches, mut rcv_highlights) in iter {
|
Some(Ok((matches, highlights)))
|
||||||
matches.append(&mut rcv_matches);
|
})
|
||||||
highlights.append(&mut rcv_highlights);
|
.while_some()
|
||||||
}
|
.collect();
|
||||||
|
|
||||||
drop(receiver);
|
for result in results {
|
||||||
|
let (mut rcv_matches, mut rcv_highlights) = result?;
|
||||||
|
matches.append(&mut rcv_matches);
|
||||||
|
highlights.append(&mut rcv_highlights);
|
||||||
|
}
|
||||||
|
|
||||||
info!("main query all took {:.2?}", start.elapsed());
|
info!("main query all took {:.2?}", start.elapsed());
|
||||||
info!("{} total matches to rewrite", matches.len());
|
info!("{} total matches to rewrite", matches.len());
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let timeout = fetch_timeout.map(|d| d * 25 / 100);
|
let timeout = fetch_timeout.map(|d| d * 25 / 100);
|
||||||
let matches = multiword_rewrite_matches(matches, &query_enhancer, timeout);
|
let matches = multiword_rewrite_matches(matches, &query_enhancer, timeout);
|
||||||
info!("multiword rewrite took {:.2?}", start.elapsed());
|
info!("multiword rewrite took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let highlights = {
|
let highlights = {
|
||||||
highlights.par_sort_unstable_by_key(|(id, _)| *id);
|
highlights.par_sort_unstable_by_key(|(id, _)| *id);
|
||||||
SetBuf::new_unchecked(highlights)
|
SetBuf::new_unchecked(highlights)
|
||||||
};
|
};
|
||||||
info!("sorting highlights took {:.2?}", start.elapsed());
|
info!("sorting highlights took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
info!("{} total matches to classify", matches.len());
|
info!("{} total matches to classify", matches.len());
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let raw_documents = raw_documents_from(matches, highlights);
|
let raw_documents = raw_documents_from(matches, highlights);
|
||||||
info!("making raw documents took {:.2?}", start.elapsed());
|
info!("making raw documents took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
info!("{} total documents to classify", raw_documents.len());
|
info!("{} total documents to classify", raw_documents.len());
|
||||||
|
|
||||||
Ok(raw_documents)
|
Ok(raw_documents)
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user