refactor index serach for better error handling

This commit is contained in:
mpostma 2021-03-03 11:53:01 +01:00
parent 7c7143d435
commit 70d935a2da
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A

View File

@ -87,85 +87,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender<anyhow::Result<SearchResult>>) { async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender<anyhow::Result<SearchResult>>) {
let index = self.store.get(uuid).await.unwrap().unwrap(); let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = perform_search(&index, query);
let before_search = Instant::now();
let rtxn = index.read_txn().unwrap();
let mut search = index.search(&rtxn);
if let Some(ref query) = query.q {
search.query(query);
}
search.limit(query.limit);
search.offset(query.offset.unwrap_or_default());
//if let Some(ref facets) = query.facet_filters {
//if let Some(facets) = parse_facets(facets, index, &rtxn)? {
//search.facet_condition(facets);
//}
//}
let milli::SearchResult {
documents_ids,
found_words,
candidates,
..
} = search.execute().unwrap();
let mut documents = Vec::new();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let displayed_fields_ids = index.displayed_fields_ids(&rtxn).unwrap();
let attributes_to_retrieve_ids = match query.attributes_to_retrieve {
Some(ref attrs) if attrs.iter().any(|f| f == "*") => None,
Some(ref attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f))
.collect::<Vec<_>>()
.into(),
None => None,
};
let displayed_fields_ids = match (displayed_fields_ids, attributes_to_retrieve_ids) {
(_, Some(ids)) => ids,
(Some(ids), None) => ids,
(None, None) => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let stop_words = fst::Set::default();
let highlighter = crate::data::search::Highlighter::new(&stop_words);
for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
let mut object = milli::obkv_to_json(&displayed_fields_ids, &fields_ids_map, obkv).unwrap();
if let Some(ref attributes_to_highlight) = query.attributes_to_highlight {
highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight);
}
documents.push(object);
}
let nb_hits = candidates.len();
let facet_distributions = match query.facet_distributions {
Some(ref fields) => {
let mut facet_distribution = index.facets_distribution(&rtxn);
if fields.iter().all(|f| f != "*") {
facet_distribution.facets(fields);
}
Some(facet_distribution.candidates(candidates).execute().unwrap())
}
None => None,
};
let result = Ok(SearchResult {
hits: documents,
nb_hits,
query: query.q.clone().unwrap_or_default(),
limit: query.limit,
offset: query.offset.unwrap_or_default(),
processing_time_ms: before_search.elapsed().as_millis(),
facet_distributions,
});
ret.send(result) ret.send(result)
}); });
@ -177,7 +99,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
} }
async fn handle_update(&self, meta: Processing<UpdateMeta>, data: File, ret: oneshot::Sender<UpdateResult>) { async fn handle_update(&self, meta: Processing<UpdateMeta>, data: File, ret: oneshot::Sender<UpdateResult>) {
info!("processing update"); info!("processing update {}", meta.id());
let uuid = meta.index_uuid().clone(); let uuid = meta.index_uuid().clone();
let index = self.store.get_or_create(uuid).await.unwrap(); let index = self.store.get_or_create(uuid).await.unwrap();
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
@ -187,6 +109,87 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
} }
} }
fn perform_search(index: &Index, query: SearchQuery) -> anyhow::Result<SearchResult> {
let before_search = Instant::now();
let rtxn = index.read_txn()?;
let mut search = index.search(&rtxn);
if let Some(ref query) = query.q {
search.query(query);
}
search.limit(query.limit);
search.offset(query.offset.unwrap_or_default());
//if let Some(ref facets) = query.facet_filters {
//if let Some(facets) = parse_facets(facets, index, &rtxn)? {
//search.facet_condition(facets);
//}
//}
let milli::SearchResult {
documents_ids,
found_words,
candidates,
..
} = search.execute()?;
let mut documents = Vec::new();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let displayed_fields_ids = index.displayed_fields_ids(&rtxn).unwrap();
let attributes_to_retrieve_ids = match query.attributes_to_retrieve {
Some(ref attrs) if attrs.iter().any(|f| f == "*") => None,
Some(ref attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f))
.collect::<Vec<_>>()
.into(),
None => None,
};
let displayed_fields_ids = match (displayed_fields_ids, attributes_to_retrieve_ids) {
(_, Some(ids)) => ids,
(Some(ids), None) => ids,
(None, None) => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let stop_words = fst::Set::default();
let highlighter = crate::data::search::Highlighter::new(&stop_words);
for (_id, obkv) in index.documents(&rtxn, documents_ids)? {
let mut object = milli::obkv_to_json(&displayed_fields_ids, &fields_ids_map, obkv).unwrap();
if let Some(ref attributes_to_highlight) = query.attributes_to_highlight {
highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight);
}
documents.push(object);
}
let nb_hits = candidates.len();
let facet_distributions = match query.facet_distributions {
Some(ref fields) => {
let mut facet_distribution = index.facets_distribution(&rtxn);
if fields.iter().all(|f| f != "*") {
facet_distribution.facets(fields);
}
Some(facet_distribution.candidates(candidates).execute()?)
}
None => None,
};
let result = SearchResult {
hits: documents,
nb_hits,
query: query.q.clone().unwrap_or_default(),
limit: query.limit,
offset: query.offset.unwrap_or_default(),
processing_time_ms: before_search.elapsed().as_millis(),
facet_distributions,
};
Ok(result)
}
#[derive(Clone)] #[derive(Clone)]
pub struct IndexActorHandle { pub struct IndexActorHandle {
sender: mpsc::Sender<IndexMsg>, sender: mpsc::Sender<IndexMsg>,