This commit is contained in:
Mubelotix 2025-07-04 01:26:43 +01:00 committed by GitHub
commit a719ed1d11
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1411 additions and 375 deletions

View file

@ -168,6 +168,16 @@ pub enum SortError {
ReservedNameForFilter { name: String },
}
impl SortError {
pub fn into_search_error(self) -> Error {
Error::UserError(UserError::SortError { error: self, search: true })
}
pub fn into_document_error(self) -> Error {
Error::UserError(UserError::SortError { error: self, search: false })
}
}
impl From<AscDescError> for SortError {
fn from(error: AscDescError) -> Self {
match error {
@ -190,12 +200,6 @@ impl From<AscDescError> for SortError {
}
}
impl From<SortError> for Error {
fn from(error: SortError) -> Self {
Self::UserError(UserError::SortError(error))
}
}
#[cfg(test)]
mod tests {
use big_s::S;

View file

@ -0,0 +1,294 @@
use crate::{
distance_between_two_points,
heed_codec::facet::{FieldDocIdFacetCodec, OrderedF64Codec},
lat_lng_to_xyz,
search::new::{facet_string_values, facet_values_prefix_key},
GeoPoint, Index,
};
use heed::{
types::{Bytes, Unit},
RoPrefix, RoTxn,
};
use roaring::RoaringBitmap;
use rstar::RTree;
use std::collections::VecDeque;
#[derive(Debug, Clone, Copy)]
pub struct GeoSortParameter {
// Define the strategy used by the geo sort
pub strategy: GeoSortStrategy,
// Limit the number of docs in a single bucket to avoid unexpectedly large overhead
pub max_bucket_size: u64,
// Considering the errors of GPS and geographical calculations, distances less than distance_error_margin will be treated as equal
pub distance_error_margin: f64,
}
impl Default for GeoSortParameter {
fn default() -> Self {
Self {
strategy: GeoSortStrategy::default(),
max_bucket_size: 1000,
distance_error_margin: 1.0,
}
}
}
/// Define the strategy used by the geo sort.
/// The parameter represents the cache size, and, in the case of the Dynamic strategy,
/// the point where we move from using the iterative strategy to the rtree.
#[derive(Debug, Clone, Copy)]
pub enum GeoSortStrategy {
AlwaysIterative(usize),
AlwaysRtree(usize),
Dynamic(usize),
}
impl Default for GeoSortStrategy {
fn default() -> Self {
GeoSortStrategy::Dynamic(1000)
}
}
impl GeoSortStrategy {
pub fn use_rtree(&self, candidates: usize) -> bool {
match self {
GeoSortStrategy::AlwaysIterative(_) => false,
GeoSortStrategy::AlwaysRtree(_) => true,
GeoSortStrategy::Dynamic(i) => candidates >= *i,
}
}
pub fn cache_size(&self) -> usize {
match self {
GeoSortStrategy::AlwaysIterative(i)
| GeoSortStrategy::AlwaysRtree(i)
| GeoSortStrategy::Dynamic(i) => *i,
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn fill_cache(
index: &Index,
txn: &RoTxn<heed::AnyTls>,
strategy: GeoSortStrategy,
ascending: bool,
target_point: [f64; 2],
field_ids: &Option<[u16; 2]>,
rtree: &mut Option<RTree<GeoPoint>>,
geo_candidates: &RoaringBitmap,
cached_sorted_docids: &mut VecDeque<(u32, [f64; 2])>,
) -> crate::Result<()> {
debug_assert!(cached_sorted_docids.is_empty());
// lazily initialize the rtree if needed by the strategy, and cache it in `self.rtree`
let rtree = if strategy.use_rtree(geo_candidates.len() as usize) {
if let Some(rtree) = rtree.as_ref() {
// get rtree from cache
Some(rtree)
} else {
let rtree2 = index.geo_rtree(txn)?.expect("geo candidates but no rtree");
// insert rtree in cache and returns it.
// Can't use `get_or_insert_with` because getting the rtree from the DB is a fallible operation.
Some(&*rtree.insert(rtree2))
}
} else {
None
};
let cache_size = strategy.cache_size();
if let Some(rtree) = rtree {
if ascending {
let point = lat_lng_to_xyz(&target_point);
for point in rtree.nearest_neighbor_iter(&point) {
if geo_candidates.contains(point.data.0) {
cached_sorted_docids.push_back(point.data);
if cached_sorted_docids.len() >= cache_size {
break;
}
}
}
} else {
// in the case of the desc geo sort we look for the closest point to the opposite of the queried point
// and we insert the points in reverse order they get reversed when emptying the cache later on
let point = lat_lng_to_xyz(&opposite_of(target_point));
for point in rtree.nearest_neighbor_iter(&point) {
if geo_candidates.contains(point.data.0) {
cached_sorted_docids.push_front(point.data);
if cached_sorted_docids.len() >= cache_size {
break;
}
}
}
}
} else {
// the iterative version
let [lat, lng] = field_ids.expect("fill_buffer can't be called without the lat&lng");
let mut documents = geo_candidates
.iter()
.map(|id| -> crate::Result<_> { Ok((id, geo_value(id, lat, lng, index, txn)?)) })
.collect::<crate::Result<Vec<(u32, [f64; 2])>>>()?;
// computing the distance between two points is expensive thus we cache the result
documents
.sort_by_cached_key(|(_, p)| distance_between_two_points(&target_point, p) as usize);
cached_sorted_docids.extend(documents);
};
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn next_bucket(
index: &Index,
txn: &RoTxn<heed::AnyTls>,
universe: &RoaringBitmap,
ascending: bool,
target_point: [f64; 2],
field_ids: &Option<[u16; 2]>,
rtree: &mut Option<RTree<GeoPoint>>,
cached_sorted_docids: &mut VecDeque<(u32, [f64; 2])>,
geo_candidates: &RoaringBitmap,
parameter: GeoSortParameter,
) -> crate::Result<Option<(RoaringBitmap, Option<[f64; 2]>)>> {
let mut geo_candidates = geo_candidates & universe;
if geo_candidates.is_empty() {
return Ok(Some((universe.clone(), None)));
}
let next = |cache: &mut VecDeque<_>| {
if ascending {
cache.pop_front()
} else {
cache.pop_back()
}
};
let put_back = |cache: &mut VecDeque<_>, x: _| {
if ascending {
cache.push_front(x)
} else {
cache.push_back(x)
}
};
let mut current_bucket = RoaringBitmap::new();
// current_distance stores the first point and distance in current bucket
let mut current_distance: Option<([f64; 2], f64)> = None;
loop {
// The loop will only exit when we have found all points with equal distance or have exhausted the candidates.
if let Some((id, point)) = next(cached_sorted_docids) {
if geo_candidates.contains(id) {
let distance = distance_between_two_points(&target_point, &point);
if let Some((point0, bucket_distance)) = current_distance.as_ref() {
if (bucket_distance - distance).abs() > parameter.distance_error_margin {
// different distance, point belongs to next bucket
put_back(cached_sorted_docids, (id, point));
return Ok(Some((current_bucket, Some(point0.to_owned()))));
} else {
// same distance, point belongs to current bucket
current_bucket.insert(id);
// remove from candidates to prevent it from being added to the cache again
geo_candidates.remove(id);
// current bucket size reaches limit, force return
if current_bucket.len() == parameter.max_bucket_size {
return Ok(Some((current_bucket, Some(point0.to_owned()))));
}
}
} else {
// first doc in current bucket
current_distance = Some((point, distance));
current_bucket.insert(id);
geo_candidates.remove(id);
// current bucket size reaches limit, force return
if current_bucket.len() == parameter.max_bucket_size {
return Ok(Some((current_bucket, Some(point.to_owned()))));
}
}
}
} else {
// cache exhausted, we need to refill it
fill_cache(
index,
txn,
parameter.strategy,
ascending,
target_point,
field_ids,
rtree,
&geo_candidates,
cached_sorted_docids,
)?;
if cached_sorted_docids.is_empty() {
// candidates exhausted, exit
if let Some((point0, _)) = current_distance.as_ref() {
return Ok(Some((current_bucket, Some(point0.to_owned()))));
} else {
return Ok(Some((universe.clone(), None)));
}
}
}
}
}
/// Return an iterator over each number value in the given field of the given document.
fn facet_number_values<'a>(
docid: u32,
field_id: u16,
index: &Index,
txn: &'a RoTxn<'a>,
) -> crate::Result<RoPrefix<'a, FieldDocIdFacetCodec<OrderedF64Codec>, Unit>> {
let key = facet_values_prefix_key(field_id, docid);
let iter = index
.field_id_docid_facet_f64s
.remap_key_type::<Bytes>()
.prefix_iter(txn, &key)?
.remap_key_type();
Ok(iter)
}
/// Extracts the lat and long values from a single document.
///
/// If it is not able to find it in the facet number index it will extract it
/// from the facet string index and parse it as f64 (as the geo extraction behaves).
pub(crate) fn geo_value(
docid: u32,
field_lat: u16,
field_lng: u16,
index: &Index,
rtxn: &RoTxn<'_>,
) -> crate::Result<[f64; 2]> {
let extract_geo = |geo_field: u16| -> crate::Result<f64> {
match facet_number_values(docid, geo_field, index, rtxn)?.next() {
Some(Ok(((_, _, geo), ()))) => Ok(geo),
Some(Err(e)) => Err(e.into()),
None => match facet_string_values(docid, geo_field, index, rtxn)?.next() {
Some(Ok((_, geo))) => {
Ok(geo.parse::<f64>().expect("cannot parse geo field as f64"))
}
Some(Err(e)) => Err(e.into()),
None => panic!("A geo faceted document doesn't contain any lat or lng"),
},
}
};
let lat = extract_geo(field_lat)?;
let lng = extract_geo(field_lng)?;
Ok([lat, lng])
}
/// Compute the antipodal coordinate of `coord`
pub(crate) fn opposite_of(mut coord: [f64; 2]) -> [f64; 2] {
coord[0] *= -1.;
// in the case of x,0 we want to return x,180
if coord[1] > 0. {
coord[1] -= 180.;
} else {
coord[1] += 180.;
}
coord
}

View file

@ -1,8 +1,10 @@
mod builder;
mod enriched;
pub mod geo_sort;
mod primary_key;
mod reader;
mod serde_impl;
pub mod sort;
use std::fmt::Debug;
use std::io;
@ -19,6 +21,7 @@ pub use primary_key::{
pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
use serde::{Deserialize, Serialize};
pub use self::geo_sort::{GeoSortParameter, GeoSortStrategy};
use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::{FieldId, Object, Result};

View file

@ -0,0 +1,430 @@
use std::collections::{BTreeSet, VecDeque};
use crate::{
constants::RESERVED_GEO_FIELD_NAME,
documents::{geo_sort::next_bucket, GeoSortParameter},
heed_codec::{
facet::{FacetGroupKeyCodec, FacetGroupValueCodec},
BytesRefCodec,
},
is_faceted,
search::facet::{ascending_facet_sort, descending_facet_sort},
AscDesc, DocumentId, Member, UserError,
};
use heed::Database;
use roaring::RoaringBitmap;
#[derive(Debug, Clone, Copy)]
enum AscDescId {
Facet { field_id: u16, ascending: bool },
Geo { field_ids: [u16; 2], target_point: [f64; 2], ascending: bool },
}
/// A [`SortedDocumentsIterator`] allows efficient access to a continuous range of sorted documents.
/// This is ideal in the context of paginated queries in which only a small number of documents are needed at a time.
/// Search operations will only be performed upon access.
pub enum SortedDocumentsIterator<'ctx> {
Leaf {
/// The exact number of documents remaining
size: usize,
values: Box<dyn Iterator<Item = DocumentId> + 'ctx>,
},
Branch {
/// The current child, got from the children iterator
current_child: Option<Box<SortedDocumentsIterator<'ctx>>>,
/// The exact number of documents remaining, excluding documents in the current child
next_children_size: usize,
/// Iterators to become the current child once it is exhausted
next_children:
Box<dyn Iterator<Item = crate::Result<SortedDocumentsIteratorBuilder<'ctx>>> + 'ctx>,
},
}
impl SortedDocumentsIterator<'_> {
/// Takes care of updating the current child if it is `None`, and also updates the size
fn update_current<'ctx>(
current_child: &mut Option<Box<SortedDocumentsIterator<'ctx>>>,
next_children_size: &mut usize,
next_children: &mut Box<
dyn Iterator<Item = crate::Result<SortedDocumentsIteratorBuilder<'ctx>>> + 'ctx,
>,
) -> crate::Result<()> {
if current_child.is_none() {
*current_child = match next_children.next() {
Some(Ok(builder)) => {
let next_child = Box::new(builder.build()?);
*next_children_size -= next_child.size_hint().0;
Some(next_child)
}
Some(Err(e)) => return Err(e),
None => return Ok(()),
};
}
Ok(())
}
}
impl Iterator for SortedDocumentsIterator<'_> {
type Item = crate::Result<DocumentId>;
/// Implementing the `nth` method allows for efficient access to the nth document in the sorted order.
/// It's used by `skip` internally.
/// The default implementation of `nth` would iterate over all children, which is inefficient for large datasets.
/// This implementation will jump over whole chunks of children until it gets close.
fn nth(&mut self, n: usize) -> Option<Self::Item> {
// If it's at the leaf level, just forward the call to the values iterator
let (current_child, next_children, next_children_size) = match self {
SortedDocumentsIterator::Leaf { values, size } => {
*size = size.saturating_sub(n);
return values.nth(n).map(Ok);
}
SortedDocumentsIterator::Branch {
current_child,
next_children,
next_children_size,
} => (current_child, next_children, next_children_size),
};
// Otherwise don't directly iterate over children, skip them if we know we will go further
let mut to_skip = n - 1;
while to_skip > 0 {
if let Err(e) = SortedDocumentsIterator::update_current(
current_child,
next_children_size,
next_children,
) {
return Some(Err(e));
}
let Some(inner) = current_child else {
return None; // No more inner iterators, everything has been consumed.
};
if to_skip >= inner.size_hint().0 {
// The current child isn't large enough to contain the nth element.
// Skip it and continue with the next one.
to_skip -= inner.size_hint().0;
*current_child = None;
continue;
} else {
// The current iterator is large enough, so we can forward the call to it.
return inner.nth(to_skip + 1);
}
}
self.next()
}
/// Iterators need to keep track of their size so that they can be skipped efficiently by the `nth` method.
fn size_hint(&self) -> (usize, Option<usize>) {
let size = match self {
SortedDocumentsIterator::Leaf { size, .. } => *size,
SortedDocumentsIterator::Branch {
next_children_size,
current_child: Some(current_child),
..
} => current_child.size_hint().0 + next_children_size,
SortedDocumentsIterator::Branch { next_children_size, current_child: None, .. } => {
*next_children_size
}
};
(size, Some(size))
}
fn next(&mut self) -> Option<Self::Item> {
match self {
SortedDocumentsIterator::Leaf { values, size } => {
let result = values.next().map(Ok);
if result.is_some() {
*size -= 1;
}
result
}
SortedDocumentsIterator::Branch {
current_child,
next_children_size,
next_children,
} => {
let mut result = None;
while result.is_none() {
// Ensure we have selected an iterator to work with
if let Err(e) = SortedDocumentsIterator::update_current(
current_child,
next_children_size,
next_children,
) {
return Some(Err(e));
}
let Some(inner) = current_child else {
return None;
};
result = inner.next();
// If the current iterator is exhausted, we need to try the next one
if result.is_none() {
*current_child = None;
}
}
result
}
}
}
}
/// Builder for a [`SortedDocumentsIterator`].
/// Most builders won't ever be built, because pagination will skip them.
pub struct SortedDocumentsIteratorBuilder<'ctx> {
index: &'ctx crate::Index,
rtxn: &'ctx heed::RoTxn<'ctx>,
number_db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
string_db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
fields: &'ctx [AscDescId],
candidates: RoaringBitmap,
geo_candidates: &'ctx RoaringBitmap,
}
impl<'ctx> SortedDocumentsIteratorBuilder<'ctx> {
/// Performs the sort and builds a [`SortedDocumentsIterator`].
fn build(self) -> crate::Result<SortedDocumentsIterator<'ctx>> {
let size = self.candidates.len() as usize;
// There is no point sorting a 1-element array
if size <= 1 {
return Ok(SortedDocumentsIterator::Leaf {
size,
values: Box::new(self.candidates.into_iter()),
});
}
match self.fields.first().copied() {
Some(AscDescId::Facet { field_id, ascending }) => self.build_facet(field_id, ascending),
Some(AscDescId::Geo { field_ids, target_point, ascending }) => {
self.build_geo(field_ids, target_point, ascending)
}
None => Ok(SortedDocumentsIterator::Leaf {
size,
values: Box::new(self.candidates.into_iter()),
}),
}
}
/// Builds a [`SortedDocumentsIterator`] based on the results of a facet sort.
fn build_facet(
self,
field_id: u16,
ascending: bool,
) -> crate::Result<SortedDocumentsIterator<'ctx>> {
let SortedDocumentsIteratorBuilder {
index,
rtxn,
number_db,
string_db,
fields,
candidates,
geo_candidates,
} = self;
let size = candidates.len() as usize;
// Perform the sort on the first field
let (number_iter, string_iter) = if ascending {
let number_iter = ascending_facet_sort(rtxn, number_db, field_id, candidates.clone())?;
let string_iter = ascending_facet_sort(rtxn, string_db, field_id, candidates)?;
(itertools::Either::Left(number_iter), itertools::Either::Left(string_iter))
} else {
let number_iter = descending_facet_sort(rtxn, number_db, field_id, candidates.clone())?;
let string_iter = descending_facet_sort(rtxn, string_db, field_id, candidates)?;
(itertools::Either::Right(number_iter), itertools::Either::Right(string_iter))
};
// Create builders for the next level of the tree
let number_iter = number_iter.map(|r| r.map(|(d, _)| d));
let string_iter = string_iter.map(|r| r.map(|(d, _)| d));
let next_children = number_iter.chain(string_iter).map(move |r| {
Ok(SortedDocumentsIteratorBuilder {
index,
rtxn,
number_db,
string_db,
fields: &fields[1..],
candidates: r?,
geo_candidates,
})
});
Ok(SortedDocumentsIterator::Branch {
current_child: None,
next_children_size: size,
next_children: Box::new(next_children),
})
}
/// Builds a [`SortedDocumentsIterator`] based on the (lazy) results of a geo sort.
fn build_geo(
self,
field_ids: [u16; 2],
target_point: [f64; 2],
ascending: bool,
) -> crate::Result<SortedDocumentsIterator<'ctx>> {
let SortedDocumentsIteratorBuilder {
index,
rtxn,
number_db,
string_db,
fields,
candidates,
geo_candidates,
} = self;
let mut cache = VecDeque::new();
let mut rtree = None;
let size = candidates.len() as usize;
let not_geo_candidates = candidates.clone() - geo_candidates;
let mut geo_remaining = size - not_geo_candidates.len() as usize;
let mut not_geo_candidates = Some(not_geo_candidates);
let next_children = std::iter::from_fn(move || {
// Find the next bucket of geo-sorted documents.
// next_bucket loops and will go back to the beginning so we use a variable to track how many are left.
if geo_remaining > 0 {
if let Ok(Some((docids, _point))) = next_bucket(
index,
rtxn,
&candidates,
ascending,
target_point,
&Some(field_ids),
&mut rtree,
&mut cache,
geo_candidates,
GeoSortParameter::default(),
) {
geo_remaining -= docids.len() as usize;
return Some(Ok(SortedDocumentsIteratorBuilder {
index,
rtxn,
number_db,
string_db,
fields: &fields[1..],
candidates: docids,
geo_candidates,
}));
}
}
// Once all geo candidates have been processed, we can return the others
if let Some(not_geo_candidates) = not_geo_candidates.take() {
if !not_geo_candidates.is_empty() {
return Some(Ok(SortedDocumentsIteratorBuilder {
index,
rtxn,
number_db,
string_db,
fields: &fields[1..],
candidates: not_geo_candidates,
geo_candidates,
}));
}
}
None
});
Ok(SortedDocumentsIterator::Branch {
current_child: None,
next_children_size: size,
next_children: Box::new(next_children),
})
}
}
/// A structure owning the data needed during the lifetime of a [`SortedDocumentsIterator`].
pub struct SortedDocuments<'ctx> {
index: &'ctx crate::Index,
rtxn: &'ctx heed::RoTxn<'ctx>,
fields: Vec<AscDescId>,
number_db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
string_db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
candidates: &'ctx RoaringBitmap,
geo_candidates: RoaringBitmap,
}
impl<'ctx> SortedDocuments<'ctx> {
pub fn iter(&'ctx self) -> crate::Result<SortedDocumentsIterator<'ctx>> {
let builder = SortedDocumentsIteratorBuilder {
index: self.index,
rtxn: self.rtxn,
number_db: self.number_db,
string_db: self.string_db,
fields: &self.fields,
candidates: self.candidates.clone(),
geo_candidates: &self.geo_candidates,
};
builder.build()
}
}
pub fn recursive_sort<'ctx>(
index: &'ctx crate::Index,
rtxn: &'ctx heed::RoTxn<'ctx>,
sort: Vec<AscDesc>,
candidates: &'ctx RoaringBitmap,
) -> crate::Result<SortedDocuments<'ctx>> {
let sortable_fields: BTreeSet<_> = index.sortable_fields(rtxn)?.into_iter().collect();
let fields_ids_map = index.fields_ids_map(rtxn)?;
// Retrieve the field ids that are used for sorting
let mut fields = Vec::new();
let mut need_geo_candidates = false;
for asc_desc in sort {
let (field, geofield) = match asc_desc {
AscDesc::Asc(Member::Field(field)) => (Some((field, true)), None),
AscDesc::Desc(Member::Field(field)) => (Some((field, false)), None),
AscDesc::Asc(Member::Geo(target_point)) => (None, Some((target_point, true))),
AscDesc::Desc(Member::Geo(target_point)) => (None, Some((target_point, false))),
};
if let Some((field, ascending)) = field {
if is_faceted(&field, &sortable_fields) {
if let Some(field_id) = fields_ids_map.id(&field) {
fields.push(AscDescId::Facet { field_id, ascending });
continue;
}
}
return Err(UserError::InvalidDocumentSortableAttribute {
field: field.to_string(),
sortable_fields: sortable_fields.clone(),
}
.into());
}
if let Some((target_point, ascending)) = geofield {
if sortable_fields.contains(RESERVED_GEO_FIELD_NAME) {
if let (Some(lat), Some(lng)) =
(fields_ids_map.id("_geo.lat"), fields_ids_map.id("_geo.lng"))
{
need_geo_candidates = true;
fields.push(AscDescId::Geo { field_ids: [lat, lng], target_point, ascending });
continue;
}
}
return Err(UserError::InvalidDocumentSortableAttribute {
field: RESERVED_GEO_FIELD_NAME.to_string(),
sortable_fields: sortable_fields.clone(),
}
.into());
}
}
let geo_candidates = if need_geo_candidates {
index.geo_faceted_documents_ids(rtxn)?
} else {
RoaringBitmap::new()
};
let number_db = index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>();
let string_db =
index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>();
Ok(SortedDocuments { index, rtxn, fields, number_db, string_db, candidates, geo_candidates })
}

View file

@ -191,7 +191,21 @@ and can not be more than 511 bytes.", .document_id.to_string()
),
}
)]
InvalidSortableAttribute { field: String, valid_fields: BTreeSet<String>, hidden_fields: bool },
InvalidSearchSortableAttribute {
field: String,
valid_fields: BTreeSet<String>,
hidden_fields: bool,
},
#[error("Attribute `{}` is not sortable. {}",
.field,
match .sortable_fields.is_empty() {
true => "This index does not have configured sortable attributes.".to_string(),
false => format!("Available sortable attributes are: `{}`.",
sortable_fields.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(", ")
),
}
)]
InvalidDocumentSortableAttribute { field: String, sortable_fields: BTreeSet<String> },
#[error("Attribute `{}` is not filterable and thus, cannot be used as distinct attribute. {}",
.field,
match (.valid_patterns.is_empty(), .matching_rule_index) {
@ -272,8 +286,8 @@ and can not be more than 511 bytes.", .document_id.to_string()
PrimaryKeyCannotBeChanged(String),
#[error(transparent)]
SerdeJson(serde_json::Error),
#[error(transparent)]
SortError(#[from] SortError),
#[error("{error}")]
SortError { error: SortError, search: bool },
#[error("An unknown internal document id have been used: `{document_id}`.")]
UnknownInternalDocumentId { document_id: DocumentId },
#[error("`minWordSizeForTypos` setting is invalid. `oneTypo` and `twoTypos` fields should be between `0` and `255`, and `twoTypos` should be greater or equals to `oneTypo` but found `oneTypo: {0}` and twoTypos: {1}`.")]
@ -616,7 +630,7 @@ fn conditionally_lookup_for_error_message() {
];
for (list, suffix) in messages {
let err = UserError::InvalidSortableAttribute {
let err = UserError::InvalidSearchSortableAttribute {
field: "name".to_string(),
valid_fields: list,
hidden_fields: false,

View file

@ -43,12 +43,13 @@ use std::fmt;
use std::hash::BuildHasherDefault;
use charabia::normalizer::{CharNormalizer, CompatibilityDecompositionNormalizer};
pub use documents::GeoSortStrategy;
pub use filter_parser::{Condition, FilterCondition, Span, Token};
use fxhash::{FxHasher32, FxHasher64};
pub use grenad::CompressionType;
pub use search::new::{
execute_search, filtered_universe, DefaultSearchLogger, GeoSortStrategy, SearchContext,
SearchLogger, VisualSearchLogger,
execute_search, filtered_universe, DefaultSearchLogger, SearchContext, SearchLogger,
VisualSearchLogger,
};
use serde_json::Value;
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};

View file

@ -9,6 +9,7 @@ use roaring::bitmap::RoaringBitmap;
pub use self::facet::{FacetDistribution, Filter, OrderBy, DEFAULT_VALUES_PER_FACET};
pub use self::new::matches::{FormatOptions, MatchBounds, MatcherBuilder, MatchingWords};
use self::new::{execute_vector_search, PartialSearchResult, VectorStoreStats};
use crate::documents::GeoSortParameter;
use crate::filterable_attributes_rules::{filtered_matching_patterns, matching_features};
use crate::index::MatchingStrategy;
use crate::score_details::{ScoreDetails, ScoringStrategy};
@ -47,7 +48,7 @@ pub struct Search<'a> {
sort_criteria: Option<Vec<AscDesc>>,
distinct: Option<String>,
searchable_attributes: Option<&'a [String]>,
geo_param: new::GeoSortParameter,
geo_param: GeoSortParameter,
terms_matching_strategy: TermsMatchingStrategy,
scoring_strategy: ScoringStrategy,
words_limit: usize,
@ -70,7 +71,7 @@ impl<'a> Search<'a> {
sort_criteria: None,
distinct: None,
searchable_attributes: None,
geo_param: new::GeoSortParameter::default(),
geo_param: GeoSortParameter::default(),
terms_matching_strategy: TermsMatchingStrategy::default(),
scoring_strategy: Default::default(),
exhaustive_number_hits: false,
@ -147,7 +148,7 @@ impl<'a> Search<'a> {
}
#[cfg(test)]
pub fn geo_sort_strategy(&mut self, strategy: new::GeoSortStrategy) -> &mut Search<'a> {
pub fn geo_sort_strategy(&mut self, strategy: crate::GeoSortStrategy) -> &mut Search<'a> {
self.geo_param.strategy = strategy;
self
}

View file

@ -82,7 +82,7 @@ fn facet_value_docids(
}
/// Return an iterator over each number value in the given field of the given document.
fn facet_number_values<'a>(
pub(crate) fn facet_number_values<'a>(
docid: u32,
field_id: u16,
index: &Index,
@ -118,7 +118,7 @@ pub fn facet_string_values<'a>(
}
#[allow(clippy::drop_non_drop)]
fn facet_values_prefix_key(distinct: u16, id: u32) -> [u8; FID_SIZE + DOCID_SIZE] {
pub(crate) fn facet_values_prefix_key(distinct: u16, id: u32) -> [u8; FID_SIZE + DOCID_SIZE] {
concat_arrays::concat_arrays!(distinct.to_be_bytes(), id.to_be_bytes())
}

View file

@ -1,96 +1,18 @@
use std::collections::VecDeque;
use heed::types::{Bytes, Unit};
use heed::{RoPrefix, RoTxn};
use roaring::RoaringBitmap;
use rstar::RTree;
use super::facet_string_values;
use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait};
use crate::heed_codec::facet::{FieldDocIdFacetCodec, OrderedF64Codec};
use crate::documents::geo_sort::{fill_cache, next_bucket};
use crate::documents::{GeoSortParameter, GeoSortStrategy};
use crate::score_details::{self, ScoreDetails};
use crate::{
distance_between_two_points, lat_lng_to_xyz, GeoPoint, Index, Result, SearchContext,
SearchLogger,
};
const FID_SIZE: usize = 2;
const DOCID_SIZE: usize = 4;
#[allow(clippy::drop_non_drop)]
fn facet_values_prefix_key(distinct: u16, id: u32) -> [u8; FID_SIZE + DOCID_SIZE] {
concat_arrays::concat_arrays!(distinct.to_be_bytes(), id.to_be_bytes())
}
/// Return an iterator over each number value in the given field of the given document.
fn facet_number_values<'a>(
docid: u32,
field_id: u16,
index: &Index,
txn: &'a RoTxn<'a>,
) -> Result<RoPrefix<'a, FieldDocIdFacetCodec<OrderedF64Codec>, Unit>> {
let key = facet_values_prefix_key(field_id, docid);
let iter = index
.field_id_docid_facet_f64s
.remap_key_type::<Bytes>()
.prefix_iter(txn, &key)?
.remap_key_type();
Ok(iter)
}
#[derive(Debug, Clone, Copy)]
pub struct Parameter {
// Define the strategy used by the geo sort
pub strategy: Strategy,
// Limit the number of docs in a single bucket to avoid unexpectedly large overhead
pub max_bucket_size: u64,
// Considering the errors of GPS and geographical calculations, distances less than distance_error_margin will be treated as equal
pub distance_error_margin: f64,
}
impl Default for Parameter {
fn default() -> Self {
Self { strategy: Strategy::default(), max_bucket_size: 1000, distance_error_margin: 1.0 }
}
}
/// Define the strategy used by the geo sort.
/// The parameter represents the cache size, and, in the case of the Dynamic strategy,
/// the point where we move from using the iterative strategy to the rtree.
#[derive(Debug, Clone, Copy)]
pub enum Strategy {
AlwaysIterative(usize),
AlwaysRtree(usize),
Dynamic(usize),
}
impl Default for Strategy {
fn default() -> Self {
Strategy::Dynamic(1000)
}
}
impl Strategy {
pub fn use_rtree(&self, candidates: usize) -> bool {
match self {
Strategy::AlwaysIterative(_) => false,
Strategy::AlwaysRtree(_) => true,
Strategy::Dynamic(i) => candidates >= *i,
}
}
pub fn cache_size(&self) -> usize {
match self {
Strategy::AlwaysIterative(i) | Strategy::AlwaysRtree(i) | Strategy::Dynamic(i) => *i,
}
}
}
use crate::{GeoPoint, Result, SearchContext, SearchLogger};
pub struct GeoSort<Q: RankingRuleQueryTrait> {
query: Option<Q>,
strategy: Strategy,
strategy: GeoSortStrategy,
ascending: bool,
point: [f64; 2],
field_ids: Option<[u16; 2]>,
@ -107,12 +29,12 @@ pub struct GeoSort<Q: RankingRuleQueryTrait> {
impl<Q: RankingRuleQueryTrait> GeoSort<Q> {
pub fn new(
parameter: Parameter,
parameter: GeoSortParameter,
geo_faceted_docids: RoaringBitmap,
point: [f64; 2],
ascending: bool,
) -> Result<Self> {
let Parameter { strategy, max_bucket_size, distance_error_margin } = parameter;
let GeoSortParameter { strategy, max_bucket_size, distance_error_margin } = parameter;
Ok(Self {
query: None,
strategy,
@ -134,98 +56,22 @@ impl<Q: RankingRuleQueryTrait> GeoSort<Q> {
ctx: &mut SearchContext<'_>,
geo_candidates: &RoaringBitmap,
) -> Result<()> {
debug_assert!(self.field_ids.is_some(), "fill_buffer can't be called without the lat&lng");
debug_assert!(self.cached_sorted_docids.is_empty());
// lazily initialize the rtree if needed by the strategy, and cache it in `self.rtree`
let rtree = if self.strategy.use_rtree(geo_candidates.len() as usize) {
if let Some(rtree) = self.rtree.as_ref() {
// get rtree from cache
Some(rtree)
} else {
let rtree = ctx.index.geo_rtree(ctx.txn)?.expect("geo candidates but no rtree");
// insert rtree in cache and returns it.
// Can't use `get_or_insert_with` because getting the rtree from the DB is a fallible operation.
Some(&*self.rtree.insert(rtree))
}
} else {
None
};
let cache_size = self.strategy.cache_size();
if let Some(rtree) = rtree {
if self.ascending {
let point = lat_lng_to_xyz(&self.point);
for point in rtree.nearest_neighbor_iter(&point) {
if geo_candidates.contains(point.data.0) {
self.cached_sorted_docids.push_back(point.data);
if self.cached_sorted_docids.len() >= cache_size {
break;
}
}
}
} else {
// in the case of the desc geo sort we look for the closest point to the opposite of the queried point
// and we insert the points in reverse order they get reversed when emptying the cache later on
let point = lat_lng_to_xyz(&opposite_of(self.point));
for point in rtree.nearest_neighbor_iter(&point) {
if geo_candidates.contains(point.data.0) {
self.cached_sorted_docids.push_front(point.data);
if self.cached_sorted_docids.len() >= cache_size {
break;
}
}
}
}
} else {
// the iterative version
let [lat, lng] = self.field_ids.unwrap();
let mut documents = geo_candidates
.iter()
.map(|id| -> Result<_> { Ok((id, geo_value(id, lat, lng, ctx.index, ctx.txn)?)) })
.collect::<Result<Vec<(u32, [f64; 2])>>>()?;
// computing the distance between two points is expensive thus we cache the result
documents
.sort_by_cached_key(|(_, p)| distance_between_two_points(&self.point, p) as usize);
self.cached_sorted_docids.extend(documents);
};
fill_cache(
ctx.index,
ctx.txn,
self.strategy,
self.ascending,
self.point,
&self.field_ids,
&mut self.rtree,
geo_candidates,
&mut self.cached_sorted_docids,
)?;
Ok(())
}
}
/// Extracts the lat and long values from a single document.
///
/// If it is not able to find it in the facet number index it will extract it
/// from the facet string index and parse it as f64 (as the geo extraction behaves).
fn geo_value(
docid: u32,
field_lat: u16,
field_lng: u16,
index: &Index,
rtxn: &RoTxn<'_>,
) -> Result<[f64; 2]> {
let extract_geo = |geo_field: u16| -> Result<f64> {
match facet_number_values(docid, geo_field, index, rtxn)?.next() {
Some(Ok(((_, _, geo), ()))) => Ok(geo),
Some(Err(e)) => Err(e.into()),
None => match facet_string_values(docid, geo_field, index, rtxn)?.next() {
Some(Ok((_, geo))) => {
Ok(geo.parse::<f64>().expect("cannot parse geo field as f64"))
}
Some(Err(e)) => Err(e.into()),
None => panic!("A geo faceted document doesn't contain any lat or lng"),
},
}
};
let lat = extract_geo(field_lat)?;
let lng = extract_geo(field_lng)?;
Ok([lat, lng])
}
impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
fn id(&self) -> String {
"geo_sort".to_owned()
@ -267,124 +113,33 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
) -> Result<Option<RankingRuleOutput<Q>>> {
let query = self.query.as_ref().unwrap().clone();
let mut geo_candidates = &self.geo_candidates & universe;
if geo_candidates.is_empty() {
return Ok(Some(RankingRuleOutput {
next_bucket(
ctx.index,
ctx.txn,
universe,
self.ascending,
self.point,
&self.field_ids,
&mut self.rtree,
&mut self.cached_sorted_docids,
&self.geo_candidates,
GeoSortParameter {
strategy: self.strategy,
max_bucket_size: self.max_bucket_size,
distance_error_margin: self.distance_error_margin,
},
)
.map(|o| {
o.map(|(candidates, point)| RankingRuleOutput {
query,
candidates: universe.clone(),
candidates,
score: ScoreDetails::GeoSort(score_details::GeoSort {
target_point: self.point,
ascending: self.ascending,
value: None,
value: point,
}),
}));
}
let ascending = self.ascending;
let next = |cache: &mut VecDeque<_>| {
if ascending {
cache.pop_front()
} else {
cache.pop_back()
}
};
let put_back = |cache: &mut VecDeque<_>, x: _| {
if ascending {
cache.push_front(x)
} else {
cache.push_back(x)
}
};
let mut current_bucket = RoaringBitmap::new();
// current_distance stores the first point and distance in current bucket
let mut current_distance: Option<([f64; 2], f64)> = None;
loop {
// The loop will only exit when we have found all points with equal distance or have exhausted the candidates.
if let Some((id, point)) = next(&mut self.cached_sorted_docids) {
if geo_candidates.contains(id) {
let distance = distance_between_two_points(&self.point, &point);
if let Some((point0, bucket_distance)) = current_distance.as_ref() {
if (bucket_distance - distance).abs() > self.distance_error_margin {
// different distance, point belongs to next bucket
put_back(&mut self.cached_sorted_docids, (id, point));
return Ok(Some(RankingRuleOutput {
query,
candidates: current_bucket,
score: ScoreDetails::GeoSort(score_details::GeoSort {
target_point: self.point,
ascending: self.ascending,
value: Some(point0.to_owned()),
}),
}));
} else {
// same distance, point belongs to current bucket
current_bucket.insert(id);
// remove from cadidates to prevent it from being added to the cache again
geo_candidates.remove(id);
// current bucket size reaches limit, force return
if current_bucket.len() == self.max_bucket_size {
return Ok(Some(RankingRuleOutput {
query,
candidates: current_bucket,
score: ScoreDetails::GeoSort(score_details::GeoSort {
target_point: self.point,
ascending: self.ascending,
value: Some(point0.to_owned()),
}),
}));
}
}
} else {
// first doc in current bucket
current_distance = Some((point, distance));
current_bucket.insert(id);
geo_candidates.remove(id);
// current bucket size reaches limit, force return
if current_bucket.len() == self.max_bucket_size {
return Ok(Some(RankingRuleOutput {
query,
candidates: current_bucket,
score: ScoreDetails::GeoSort(score_details::GeoSort {
target_point: self.point,
ascending: self.ascending,
value: Some(point.to_owned()),
}),
}));
}
}
}
} else {
// cache exhausted, we need to refill it
self.fill_buffer(ctx, &geo_candidates)?;
if self.cached_sorted_docids.is_empty() {
// candidates exhausted, exit
if let Some((point0, _)) = current_distance.as_ref() {
return Ok(Some(RankingRuleOutput {
query,
candidates: current_bucket,
score: ScoreDetails::GeoSort(score_details::GeoSort {
target_point: self.point,
ascending: self.ascending,
value: Some(point0.to_owned()),
}),
}));
} else {
return Ok(Some(RankingRuleOutput {
query,
candidates: universe.clone(),
score: ScoreDetails::GeoSort(score_details::GeoSort {
target_point: self.point,
ascending: self.ascending,
value: None,
}),
}));
}
}
}
}
})
})
}
#[tracing::instrument(level = "trace", skip_all, target = "search::geo_sort")]
@ -394,16 +149,3 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort<Q> {
self.cached_sorted_docids.clear();
}
}
/// Compute the antipodal coordinate of `coord`
fn opposite_of(mut coord: [f64; 2]) -> [f64; 2] {
coord[0] *= -1.;
// in the case of x,0 we want to return x,180
if coord[1] > 0. {
coord[1] -= 180.;
} else {
coord[1] += 180.;
}
coord
}

View file

@ -1,7 +1,7 @@
mod bucket_sort;
mod db_cache;
mod distinct;
mod geo_sort;
pub(crate) mod geo_sort;
mod graph_based_ranking_rule;
mod interner;
mod limits;
@ -46,14 +46,14 @@ use resolve_query_graph::{compute_query_graph_docids, PhraseDocIdsCache};
use roaring::RoaringBitmap;
use sort::Sort;
use self::distinct::facet_string_values;
pub(crate) use self::distinct::{facet_string_values, facet_values_prefix_key};
use self::geo_sort::GeoSort;
pub use self::geo_sort::{Parameter as GeoSortParameter, Strategy as GeoSortStrategy};
use self::graph_based_ranking_rule::Words;
use self::interner::Interned;
use self::vector_sort::VectorSort;
use crate::attribute_patterns::{match_pattern, PatternMatch};
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::documents::GeoSortParameter;
use crate::index::PrefixSearch;
use crate::localized_attributes_rules::LocalizedFieldIds;
use crate::score_details::{ScoreDetails, ScoringStrategy};
@ -319,7 +319,7 @@ fn resolve_negative_phrases(
fn get_ranking_rules_for_placeholder_search<'ctx>(
ctx: &SearchContext<'ctx>,
sort_criteria: &Option<Vec<AscDesc>>,
geo_param: geo_sort::Parameter,
geo_param: GeoSortParameter,
) -> Result<Vec<BoxRankingRule<'ctx, PlaceholderQuery>>> {
let mut sort = false;
let mut sorted_fields = HashSet::new();
@ -371,7 +371,7 @@ fn get_ranking_rules_for_placeholder_search<'ctx>(
fn get_ranking_rules_for_vector<'ctx>(
ctx: &SearchContext<'ctx>,
sort_criteria: &Option<Vec<AscDesc>>,
geo_param: geo_sort::Parameter,
geo_param: GeoSortParameter,
limit_plus_offset: usize,
target: &[f32],
embedder_name: &str,
@ -448,7 +448,7 @@ fn get_ranking_rules_for_vector<'ctx>(
fn get_ranking_rules_for_query_graph_search<'ctx>(
ctx: &SearchContext<'ctx>,
sort_criteria: &Option<Vec<AscDesc>>,
geo_param: geo_sort::Parameter,
geo_param: GeoSortParameter,
terms_matching_strategy: TermsMatchingStrategy,
) -> Result<Vec<BoxRankingRule<'ctx, QueryGraph>>> {
// query graph search
@ -559,7 +559,7 @@ fn resolve_sort_criteria<'ctx, Query: RankingRuleQueryTrait>(
ranking_rules: &mut Vec<BoxRankingRule<'ctx, Query>>,
sorted_fields: &mut HashSet<String>,
geo_sorted: &mut bool,
geo_param: geo_sort::Parameter,
geo_param: GeoSortParameter,
) -> Result<()> {
let sort_criteria = sort_criteria.clone().unwrap_or_default();
ranking_rules.reserve(sort_criteria.len());
@ -629,7 +629,7 @@ pub fn execute_vector_search(
universe: RoaringBitmap,
sort_criteria: &Option<Vec<AscDesc>>,
distinct: &Option<String>,
geo_param: geo_sort::Parameter,
geo_param: GeoSortParameter,
from: usize,
length: usize,
embedder_name: &str,
@ -692,7 +692,7 @@ pub fn execute_search(
mut universe: RoaringBitmap,
sort_criteria: &Option<Vec<AscDesc>>,
distinct: &Option<String>,
geo_param: geo_sort::Parameter,
geo_param: GeoSortParameter,
from: usize,
length: usize,
words_limit: Option<usize>,
@ -872,7 +872,7 @@ pub fn execute_search(
})
}
fn check_sort_criteria(
pub(crate) fn check_sort_criteria(
ctx: &SearchContext<'_>,
sort_criteria: Option<&Vec<AscDesc>>,
) -> Result<()> {
@ -902,7 +902,7 @@ fn check_sort_criteria(
let (valid_fields, hidden_fields) =
ctx.index.remove_hidden_fields(ctx.txn, sortable_fields)?;
return Err(UserError::InvalidSortableAttribute {
return Err(UserError::InvalidSearchSortableAttribute {
field: field.to_string(),
valid_fields,
hidden_fields,
@ -913,7 +913,7 @@ fn check_sort_criteria(
let (valid_fields, hidden_fields) =
ctx.index.remove_hidden_fields(ctx.txn, sortable_fields)?;
return Err(UserError::InvalidSortableAttribute {
return Err(UserError::InvalidSearchSortableAttribute {
field: RESERVED_GEO_FIELD_NAME.to_string(),
valid_fields,
hidden_fields,