diff --git a/meilisearch/src/search/federated.rs b/meilisearch/src/search/federated.rs index f1acf5aa4..9d16ca59d 100644 --- a/meilisearch/src/search/federated.rs +++ b/meilisearch/src/search/federated.rs @@ -9,20 +9,24 @@ use std::vec::{IntoIter, Vec}; use actix_http::StatusCode; use index_scheduler::{IndexScheduler, RoFeatures}; +use indexmap::IndexMap; use meilisearch_types::deserr::DeserrJsonError; use meilisearch_types::error::deserr_codes::{ - InvalidMultiSearchWeight, InvalidSearchLimit, InvalidSearchOffset, + InvalidMultiSearchFacetsByIndex, InvalidMultiSearchMaxValuesPerFacet, + InvalidMultiSearchMergeFacets, InvalidMultiSearchSortFacetValuesBy, InvalidMultiSearchWeight, + InvalidSearchLimit, InvalidSearchOffset, }; use meilisearch_types::error::ResponseError; +use meilisearch_types::index_uid::IndexUid; use meilisearch_types::milli::score_details::{ScoreDetails, ScoreValue}; -use meilisearch_types::milli::{self, DocumentId, TimeBudget}; +use meilisearch_types::milli::{self, DocumentId, OrderBy, TimeBudget}; use roaring::RoaringBitmap; use serde::Serialize; use super::ranking_rules::{self, RankingRules}; use super::{ - prepare_search, AttributesFormat, HitMaker, HitsInfo, RetrieveVectors, SearchHit, SearchKind, - SearchQuery, SearchQueryWithIndex, + compute_facet_distribution_stats, prepare_search, AttributesFormat, ComputedFacets, FacetStats, + HitMaker, HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchQuery, SearchQueryWithIndex, }; use crate::error::MeilisearchHttpError; use crate::routes::indexes::search::search_kind; @@ -73,6 +77,59 @@ pub struct Federation { pub limit: usize, #[deserr(default = super::DEFAULT_SEARCH_OFFSET(), error = DeserrJsonError)] pub offset: usize, + #[deserr(default, error = DeserrJsonError)] + pub facets_by_index: BTreeMap>>, + #[deserr(default, error = DeserrJsonError)] + pub merge_facets: Option, +} + +#[derive(Copy, Clone, Debug, deserr::Deserr, Default)] +#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] +pub struct MergeFacets { + #[deserr(default, error = DeserrJsonError)] + pub sort_facet_values_by: SortFacetValuesBy, + #[deserr(default, error = DeserrJsonError)] + pub max_values_per_facet: Option, +} + +impl MergeFacets { + pub fn to_components(this: Option) -> (Option, Option) { + match this { + Some(MergeFacets { sort_facet_values_by, max_values_per_facet }) => { + (sort_facet_values_by.into(), max_values_per_facet) + } + None => (None, None), + } + } +} + +#[derive(Debug, deserr::Deserr, Default, Clone, Copy)] +#[deserr(rename_all = camelCase, deny_unknown_fields)] +pub enum SortFacetValuesBy { + #[default] + IndexSettings, + /// By lexicographic order... + Alpha, + /// Or by number of docids in common? + Count, +} + +impl From for Option { + fn from(value: SortFacetValuesBy) -> Self { + match value { + SortFacetValuesBy::Alpha => Some(OrderBy::Lexicographic), + SortFacetValuesBy::Count => Some(OrderBy::Count), + SortFacetValuesBy::IndexSettings => None, + } + } +} + +#[derive(Debug, deserr::Deserr, Default)] +#[deserr(rename_all = camelCase, deny_unknown_fields)] +pub enum GroupFacetsBy { + Facet, + #[default] + Index, } #[derive(Debug, deserr::Deserr)] @@ -82,7 +139,7 @@ pub struct FederatedSearch { #[deserr(default)] pub federation: Option, } -#[derive(Serialize, Clone, PartialEq)] +#[derive(Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct FederatedSearchResult { pub hits: Vec, @@ -93,6 +150,13 @@ pub struct FederatedSearchResult { #[serde(skip_serializing_if = "Option::is_none")] pub semantic_hit_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub facet_distribution: Option>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub facet_stats: Option>, + #[serde(skip_serializing_if = "FederatedFacets::is_empty")] + pub facets_by_index: FederatedFacets, + // These fields are only used for analytics purposes #[serde(skip)] pub degraded: bool, @@ -109,6 +173,9 @@ impl fmt::Debug for FederatedSearchResult { semantic_hit_count, degraded, used_negative_operator, + facet_distribution, + facet_stats, + facets_by_index, } = self; let mut debug = f.debug_struct("SearchResult"); @@ -122,9 +189,18 @@ impl fmt::Debug for FederatedSearchResult { if *degraded { debug.field("degraded", degraded); } + if let Some(facet_distribution) = facet_distribution { + debug.field("facet_distribution", &facet_distribution); + } + if let Some(facet_stats) = facet_stats { + debug.field("facet_stats", &facet_stats); + } if let Some(semantic_hit_count) = semantic_hit_count { debug.field("semantic_hit_count", &semantic_hit_count); } + if !facets_by_index.is_empty() { + debug.field("facets_by_index", &facets_by_index); + } debug.finish() } @@ -313,16 +389,111 @@ struct SearchHitByIndex { } struct SearchResultByIndex { + index: String, hits: Vec, - candidates: RoaringBitmap, + estimated_total_hits: usize, degraded: bool, used_negative_operator: bool, + facets: Option, +} + +#[derive(Debug, Clone, Default, Serialize)] +pub struct FederatedFacets(pub BTreeMap); + +impl FederatedFacets { + pub fn insert(&mut self, index: String, facets: Option) { + if let Some(facets) = facets { + self.0.insert(index, facets); + } + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn merge( + self, + MergeFacets { sort_facet_values_by, max_values_per_facet }: MergeFacets, + facet_order: Option>, + ) -> Option { + if self.is_empty() { + return None; + } + + let mut distribution: BTreeMap = Default::default(); + let mut stats: BTreeMap = Default::default(); + + for facets_by_index in self.0.into_values() { + for (facet, index_distribution) in facets_by_index.distribution { + match distribution.entry(facet) { + std::collections::btree_map::Entry::Vacant(entry) => { + entry.insert(index_distribution); + } + std::collections::btree_map::Entry::Occupied(mut entry) => { + let distribution = entry.get_mut(); + + for (value, index_count) in index_distribution { + distribution + .entry(value) + .and_modify(|count| *count += index_count) + .or_insert(index_count); + } + } + } + } + + for (facet, index_stats) in facets_by_index.stats { + match stats.entry(facet) { + std::collections::btree_map::Entry::Vacant(entry) => { + entry.insert(index_stats); + } + std::collections::btree_map::Entry::Occupied(mut entry) => { + let stats = entry.get_mut(); + + stats.min = + if stats.min <= index_stats.min { stats.min } else { index_stats.min }; + stats.max = + if stats.max >= index_stats.max { stats.max } else { index_stats.max }; + } + } + } + } + + // fixup order + for (facet, values) in &mut distribution { + let order_by = Option::::from(sort_facet_values_by) + .or_else(|| match &facet_order { + Some(facet_order) => facet_order.get(facet).map(|(_, order)| *order), + None => None, + }) + .unwrap_or_default(); + + match order_by { + OrderBy::Lexicographic => { + values.sort_unstable_by(|left, _, right, _| left.cmp(right)) + } + OrderBy::Count => { + values.sort_unstable_by(|_, left, _, right| { + left.cmp(right) + // biggest first + .reverse() + }) + } + } + + if let Some(max_values_per_facet) = max_values_per_facet { + values.truncate(max_values_per_facet) + }; + } + + Some(ComputedFacets { distribution, stats }) + } } pub fn perform_federated_search( index_scheduler: &IndexScheduler, queries: Vec, - federation: Federation, + mut federation: Federation, features: RoFeatures, ) -> Result { let before_search = std::time::Instant::now(); @@ -357,13 +528,29 @@ pub fn perform_federated_search( // 2. perform queries, merge and make hits index by index let required_hit_count = federation.limit + federation.offset; + + let (override_sort_facet_values_by, override_max_values_per_facet) = + MergeFacets::to_components(federation.merge_facets); + // In step (2), semantic_hit_count will be set to Some(0) if any search kind uses semantic // Then in step (3), we'll update its value if there is any semantic search let mut semantic_hit_count = None; let mut results_by_index = Vec::with_capacity(queries_by_index.len()); let mut previous_query_data: Option<(RankingRules, usize, String)> = None; + // remember the order and name of first index for each facet when merging with index settings + // to detect if the order is inconsistent for a facet. + let mut facet_order: Option> = match federation.merge_facets + { + Some(MergeFacets { sort_facet_values_by: SortFacetValuesBy::IndexSettings, .. }) => { + Some(Default::default()) + } + _ => None, + }; + for (index_uid, queries) in queries_by_index { + let first_query_index = queries.first().map(|query| query.query_index); + let index = match index_scheduler.index(&index_uid) { Ok(index) => index, Err(err) => { @@ -371,9 +558,8 @@ pub fn perform_federated_search( // Patch the HTTP status code to 400 as it defaults to 404 for `index_not_found`, but // here the resource not found is not part of the URL. err.code = StatusCode::BAD_REQUEST; - if let Some(query) = queries.first() { - err.message = - format!("Inside `.queries[{}]`: {}", query.query_index, err.message); + if let Some(query_index) = first_query_index { + err.message = format!("Inside `.queries[{}]`: {}", query_index, err.message); } return Err(err); } @@ -398,6 +584,23 @@ pub fn perform_federated_search( let mut used_negative_operator = false; let mut candidates = RoaringBitmap::new(); + let facets_by_index = federation.facets_by_index.remove(&index_uid).flatten(); + + // TODO: recover the max size + facets_by_index as return value of this function so as not to ask it for all queries + if let Err(mut error) = + check_facet_order(&mut facet_order, &index_uid, &facets_by_index, &index, &rtxn) + { + error.message = format!( + "Inside `.federation.facetsByIndex.{index_uid}`: {error}{}", + if let Some(query_index) = first_query_index { + format!("\n Note: index `{index_uid}` used in `.queries[{query_index}]`") + } else { + Default::default() + } + ); + return Err(error); + } + // 2.1. Compute all candidates for each query in the index let mut results_by_query = Vec::with_capacity(queries.len()); @@ -566,34 +769,118 @@ pub fn perform_federated_search( .collect(); let merged_result = merged_result?; + + let estimated_total_hits = candidates.len() as usize; + + let facets = facets_by_index + .map(|facets_by_index| { + compute_facet_distribution_stats( + &facets_by_index, + &index, + &rtxn, + candidates, + override_max_values_per_facet, + override_sort_facet_values_by, + ) + }) + .transpose() + .map_err(|mut error| { + error.message = format!( + "Inside `.federation.facetsByIndex.{index_uid}`: {}{}", + error.message, + if let Some(query_index) = first_query_index { + format!("\n Note: index `{index_uid}` used in `.queries[{query_index}]`") + } else { + Default::default() + } + ); + error + })?; + results_by_index.push(SearchResultByIndex { + index: index_uid, hits: merged_result, - candidates, + estimated_total_hits, degraded, used_negative_operator, + facets, }); } + // bonus step, make sure to return an error if an index wants a non-faceted field, even if no query actually uses that index. + for (index_uid, facets) in federation.facets_by_index { + let index = match index_scheduler.index(&index_uid) { + Ok(index) => index, + Err(err) => { + let mut err = ResponseError::from(err); + // Patch the HTTP status code to 400 as it defaults to 404 for `index_not_found`, but + // here the resource not found is not part of the URL. + err.code = StatusCode::BAD_REQUEST; + err.message = format!( + "Inside `.federation.facetsByIndex.{index_uid}`: {}\n Note: index `{index_uid}` is not used in queries", + err.message + ); + return Err(err); + } + }; + + // Important: this is the only transaction we'll use for this index during this federated search + let rtxn = index.read_txn()?; + + if let Err(mut error) = + check_facet_order(&mut facet_order, &index_uid, &facets, &index, &rtxn) + { + error.message = format!( + "Inside `.federation.facetsByIndex.{index_uid}`: {error}\n Note: index `{index_uid}` is not used in queries", + ); + return Err(error); + } + + if let Some(facets) = facets { + if let Err(mut error) = compute_facet_distribution_stats( + &facets, + &index, + &rtxn, + Default::default(), + override_max_values_per_facet, + override_sort_facet_values_by, + ) { + error.message = + format!("Inside `.federation.facetsByIndex.{index_uid}`: {}\n Note: index `{index_uid}` is not used in queries", error.message); + return Err(error); + } + } + } + // 3. merge hits and metadata across indexes // 3.1 merge metadata - let (estimated_total_hits, degraded, used_negative_operator) = { + let (estimated_total_hits, degraded, used_negative_operator, facets) = { let mut estimated_total_hits = 0; let mut degraded = false; let mut used_negative_operator = false; + let mut facets: FederatedFacets = FederatedFacets::default(); + for SearchResultByIndex { + index, hits: _, - candidates, + estimated_total_hits: estimated_total_hits_by_index, + facets: facets_by_index, degraded: degraded_by_index, used_negative_operator: used_negative_operator_by_index, - } in &results_by_index + } in &mut results_by_index { - estimated_total_hits += candidates.len() as usize; + estimated_total_hits += *estimated_total_hits_by_index; degraded |= *degraded_by_index; used_negative_operator |= *used_negative_operator_by_index; + + let facets_by_index = std::mem::take(facets_by_index); + let index = std::mem::take(index); + + facets.insert(index, facets_by_index); } - (estimated_total_hits, degraded, used_negative_operator) + (estimated_total_hits, degraded, used_negative_operator, facets) }; // 3.2 merge hits @@ -610,6 +897,18 @@ pub fn perform_federated_search( .map(|hit| hit.hit) .collect(); + let (facet_distribution, facet_stats, facets_by_index) = match federation.merge_facets { + Some(merge_facets) => { + let facets = facets.merge(merge_facets, facet_order); + + let (facet_distribution, facet_stats) = + facets.map(|ComputedFacets { distribution, stats }| (distribution, stats)).unzip(); + + (facet_distribution, facet_stats, FederatedFacets::default()) + } + None => (None, None, facets), + }; + let search_result = FederatedSearchResult { hits: merged_hits, processing_time_ms: before_search.elapsed().as_millis(), @@ -621,7 +920,39 @@ pub fn perform_federated_search( semantic_hit_count, degraded, used_negative_operator, + facet_distribution, + facet_stats, + facets_by_index, }; Ok(search_result) } + +fn check_facet_order( + facet_order: &mut Option>, + current_index: &str, + facets_by_index: &Option>, + index: &milli::Index, + rtxn: &milli::heed::RoTxn<'_>, +) -> Result<(), ResponseError> { + if let (Some(facet_order), Some(facets_by_index)) = (facet_order, facets_by_index) { + let index_facet_order = index.sort_facet_values_by(rtxn)?; + for facet in facets_by_index { + let index_facet_order = index_facet_order.get(facet); + let (previous_index, previous_facet_order) = facet_order + .entry(facet.to_owned()) + .or_insert_with(|| (current_index.to_owned(), index_facet_order)); + if previous_facet_order != &index_facet_order { + return Err(MeilisearchHttpError::InconsistentFacetOrder { + facet: facet.clone(), + previous_facet_order: *previous_facet_order, + previous_uid: previous_index.clone(), + current_uid: current_index.to_owned(), + index_facet_order, + } + .into()); + } + } + }; + Ok(()) +}