From 70d935a2da2fc73db469aa9ccc73a87f6e06ade5 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 3 Mar 2021 11:53:01 +0100 Subject: [PATCH] refactor index serach for better error handling --- .../actor_index_controller/index_actor.rs | 163 +++++++++--------- 1 file changed, 83 insertions(+), 80 deletions(-) diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/actor_index_controller/index_actor.rs index 7100edef6..48b10c8a3 100644 --- a/src/index_controller/actor_index_controller/index_actor.rs +++ b/src/index_controller/actor_index_controller/index_actor.rs @@ -87,85 +87,7 @@ impl IndexActor { async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender>) { let index = self.store.get(uuid).await.unwrap().unwrap(); tokio::task::spawn_blocking(move || { - - let before_search = Instant::now(); - let rtxn = index.read_txn().unwrap(); - - let mut search = index.search(&rtxn); - - if let Some(ref query) = query.q { - search.query(query); - } - - search.limit(query.limit); - search.offset(query.offset.unwrap_or_default()); - - //if let Some(ref facets) = query.facet_filters { - //if let Some(facets) = parse_facets(facets, index, &rtxn)? { - //search.facet_condition(facets); - //} - //} - let milli::SearchResult { - documents_ids, - found_words, - candidates, - .. - } = search.execute().unwrap(); - let mut documents = Vec::new(); - let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - - let displayed_fields_ids = index.displayed_fields_ids(&rtxn).unwrap(); - - let attributes_to_retrieve_ids = match query.attributes_to_retrieve { - Some(ref attrs) if attrs.iter().any(|f| f == "*") => None, - Some(ref attrs) => attrs - .iter() - .filter_map(|f| fields_ids_map.id(f)) - .collect::>() - .into(), - None => None, - }; - - let displayed_fields_ids = match (displayed_fields_ids, attributes_to_retrieve_ids) { - (_, Some(ids)) => ids, - (Some(ids), None) => ids, - (None, None) => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; - - let stop_words = fst::Set::default(); - let highlighter = crate::data::search::Highlighter::new(&stop_words); - - for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { - let mut object = milli::obkv_to_json(&displayed_fields_ids, &fields_ids_map, obkv).unwrap(); - if let Some(ref attributes_to_highlight) = query.attributes_to_highlight { - highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight); - } - documents.push(object); - } - - let nb_hits = candidates.len(); - - let facet_distributions = match query.facet_distributions { - Some(ref fields) => { - let mut facet_distribution = index.facets_distribution(&rtxn); - if fields.iter().all(|f| f != "*") { - facet_distribution.facets(fields); - } - Some(facet_distribution.candidates(candidates).execute().unwrap()) - } - None => None, - }; - - let result = Ok(SearchResult { - hits: documents, - nb_hits, - query: query.q.clone().unwrap_or_default(), - limit: query.limit, - offset: query.offset.unwrap_or_default(), - processing_time_ms: before_search.elapsed().as_millis(), - facet_distributions, - }); - + let result = perform_search(&index, query); ret.send(result) }); @@ -177,7 +99,7 @@ impl IndexActor { } async fn handle_update(&self, meta: Processing, data: File, ret: oneshot::Sender) { - info!("processing update"); + info!("processing update {}", meta.id()); let uuid = meta.index_uuid().clone(); let index = self.store.get_or_create(uuid).await.unwrap(); let update_handler = self.update_handler.clone(); @@ -187,6 +109,87 @@ impl IndexActor { } } +fn perform_search(index: &Index, query: SearchQuery) -> anyhow::Result { + let before_search = Instant::now(); + let rtxn = index.read_txn()?; + + let mut search = index.search(&rtxn); + + if let Some(ref query) = query.q { + search.query(query); + } + + search.limit(query.limit); + search.offset(query.offset.unwrap_or_default()); + + //if let Some(ref facets) = query.facet_filters { + //if let Some(facets) = parse_facets(facets, index, &rtxn)? { + //search.facet_condition(facets); + //} + //} + let milli::SearchResult { + documents_ids, + found_words, + candidates, + .. + } = search.execute()?; + let mut documents = Vec::new(); + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + + let displayed_fields_ids = index.displayed_fields_ids(&rtxn).unwrap(); + + let attributes_to_retrieve_ids = match query.attributes_to_retrieve { + Some(ref attrs) if attrs.iter().any(|f| f == "*") => None, + Some(ref attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f)) + .collect::>() + .into(), + None => None, + }; + + let displayed_fields_ids = match (displayed_fields_ids, attributes_to_retrieve_ids) { + (_, Some(ids)) => ids, + (Some(ids), None) => ids, + (None, None) => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let stop_words = fst::Set::default(); + let highlighter = crate::data::search::Highlighter::new(&stop_words); + + for (_id, obkv) in index.documents(&rtxn, documents_ids)? { + let mut object = milli::obkv_to_json(&displayed_fields_ids, &fields_ids_map, obkv).unwrap(); + if let Some(ref attributes_to_highlight) = query.attributes_to_highlight { + highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight); + } + documents.push(object); + } + + let nb_hits = candidates.len(); + + let facet_distributions = match query.facet_distributions { + Some(ref fields) => { + let mut facet_distribution = index.facets_distribution(&rtxn); + if fields.iter().all(|f| f != "*") { + facet_distribution.facets(fields); + } + Some(facet_distribution.candidates(candidates).execute()?) + } + None => None, + }; + + let result = SearchResult { + hits: documents, + nb_hits, + query: query.q.clone().unwrap_or_default(), + limit: query.limit, + offset: query.offset.unwrap_or_default(), + processing_time_ms: before_search.elapsed().as_millis(), + facet_distributions, + }; + Ok(result) +} + #[derive(Clone)] pub struct IndexActorHandle { sender: mpsc::Sender,