Speed-up facet aggregation by using a FacetIter

This commit is contained in:
Kerollmops 2021-01-06 15:10:30 +01:00
parent 33945a3115
commit d893e83622
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 124 additions and 60 deletions

View File

@ -9,7 +9,7 @@ use serde_json::Value;
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetValueStringCodec, FacetLevelValueF64Codec, FacetLevelValueI64Codec}; use crate::heed_codec::facet::{FacetValueStringCodec, FacetLevelValueF64Codec, FacetLevelValueI64Codec};
use crate::heed_codec::facet::{FieldDocIdFacetStringCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetI64Codec}; use crate::heed_codec::facet::{FieldDocIdFacetStringCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetI64Codec};
use crate::search::facet::FacetRange; use crate::search::facet::{FacetIter, FacetRange};
use crate::{Index, FieldId}; use crate::{Index, FieldId};
pub struct FacetDistribution<'a> { pub struct FacetDistribution<'a> {
@ -41,61 +41,99 @@ impl<'a> FacetDistribution<'a> {
} }
fn facet_values(&self, field_id: FieldId, facet_type: FacetType) -> heed::Result<Vec<Value>> { fn facet_values(&self, field_id: FieldId, facet_type: FacetType) -> heed::Result<Vec<Value>> {
if let Some(candidates) = self.candidates.as_ref().filter(|c| c.len() <= 1000) { if let Some(candidates) = self.candidates.as_ref() {
let mut key_buffer = vec![field_id]; if candidates.len() <= 1000 {
match facet_type { let mut key_buffer = vec![field_id];
FacetType::Float => { match facet_type {
let mut facet_values = HashSet::new(); FacetType::Float => {
for docid in candidates { let mut facet_values = HashSet::new();
key_buffer.truncate(1); for docid in candidates {
key_buffer.extend_from_slice(&docid.to_be_bytes()); key_buffer.truncate(1);
let iter = self.index.field_id_docid_facet_values key_buffer.extend_from_slice(&docid.to_be_bytes());
.prefix_iter(self.rtxn, &key_buffer)? let iter = self.index.field_id_docid_facet_values
.remap_key_type::<FieldDocIdFacetF64Codec>(); .prefix_iter(self.rtxn, &key_buffer)?
for result in iter { .remap_key_type::<FieldDocIdFacetF64Codec>();
let ((_, _, value), ()) = result?; for result in iter {
facet_values.insert(OrderedFloat(value)); let ((_, _, value), ()) = result?;
facet_values.insert(OrderedFloat(value));
}
} }
} Ok(facet_values.into_iter().map(|f| Value::from(*f)).collect())
Ok(facet_values.into_iter().map(|f| Value::from(*f)).collect()) },
}, FacetType::Integer => {
FacetType::Integer => { let mut facet_values = HashSet::new();
let mut facet_values = HashSet::new(); for docid in candidates {
for docid in candidates { key_buffer.truncate(1);
key_buffer.truncate(1); key_buffer.extend_from_slice(&docid.to_be_bytes());
key_buffer.extend_from_slice(&docid.to_be_bytes()); let iter = self.index.field_id_docid_facet_values
let iter = self.index.field_id_docid_facet_values .prefix_iter(self.rtxn, &key_buffer)?
.prefix_iter(self.rtxn, &key_buffer)? .remap_key_type::<FieldDocIdFacetI64Codec>();
.remap_key_type::<FieldDocIdFacetI64Codec>(); for result in iter {
for result in iter { let ((_, _, value), ()) = result?;
let ((_, _, value), ()) = result?; facet_values.insert(value);
facet_values.insert(value); }
} }
} Ok(facet_values.into_iter().map(Value::from).collect())
Ok(facet_values.into_iter().map(Value::from).collect()) },
}, FacetType::String => {
FacetType::String => { let mut facet_values = HashSet::new();
let mut facet_values = HashSet::new(); for docid in candidates {
for docid in candidates { key_buffer.truncate(1);
key_buffer.truncate(1); key_buffer.extend_from_slice(&docid.to_be_bytes());
key_buffer.extend_from_slice(&docid.to_be_bytes()); let iter = self.index.field_id_docid_facet_values
let iter = self.index.field_id_docid_facet_values .prefix_iter(self.rtxn, &key_buffer)?
.prefix_iter(self.rtxn, &key_buffer)? .remap_key_type::<FieldDocIdFacetStringCodec>();
.remap_key_type::<FieldDocIdFacetStringCodec>(); for result in iter {
for result in iter { let ((_, _, value), ()) = result?;
let ((_, _, value), ()) = result?; facet_values.insert(value);
facet_values.insert(value); }
} }
Ok(facet_values.into_iter().map(Value::from).collect())
},
}
} else {
let iter = match facet_type {
FacetType::String => {
let db = self.index.facet_field_id_value_docids;
let iter = db
.prefix_iter(self.rtxn, &[field_id])?
.remap_key_type::<FacetValueStringCodec>()
.map(|r| r.map(|((_, v), docids)| (Value::from(v), docids)));
Box::new(iter) as Box::<dyn Iterator<Item=_>>
},
FacetType::Integer => {
let iter = FacetIter::<i64, FacetLevelValueI64Codec>::new_non_reducing(
self.rtxn, self.index, field_id, candidates.clone(),
)?;
Box::new(iter.map(|r| r.map(|(v, docids)| (Value::from(v), docids))))
},
FacetType::Float => {
let iter = FacetIter::<f64, FacetLevelValueF64Codec>::new_non_reducing(
self.rtxn, self.index, field_id, candidates.clone(),
)?;
Box::new(iter.map(|r| r.map(|(v, docids)| (Value::from(v), docids))))
},
};
let mut facet_values = Vec::new();
for result in iter {
let (value, docids) = result?;
if self.candidates.as_ref().map_or(true, |c| docids.is_disjoint(c)) {
facet_values.push(value);
} }
Ok(facet_values.into_iter().map(Value::from).collect()) if facet_values.len() == self.max_values_by_facet {
}, break;
}
}
Ok(facet_values)
} }
} else { } else {
let db = self.index.facet_field_id_value_docids; let db = self.index.facet_field_id_value_docids;
let iter = match facet_type { let iter = match facet_type {
FacetType::String => { FacetType::String => {
let iter = db let iter = db
.prefix_iter(&self.rtxn, &[field_id])? .prefix_iter(self.rtxn, &[field_id])?
.remap_key_type::<FacetValueStringCodec>() .remap_key_type::<FacetValueStringCodec>()
.map(|r| r.map(|((_, v), docids)| (Value::from(v), docids))); .map(|r| r.map(|((_, v), docids)| (Value::from(v), docids)));
Box::new(iter) as Box::<dyn Iterator<Item=_>> Box::new(iter) as Box::<dyn Iterator<Item=_>>
@ -119,11 +157,8 @@ impl<'a> FacetDistribution<'a> {
let mut facet_values = Vec::new(); let mut facet_values = Vec::new();
for result in iter { for result in iter {
let (value, docids) = result?; let (value, docids) = result?;
match &self.candidates { if self.candidates.as_ref().map_or(true, |c| docids.is_disjoint(c)) {
Some(candidates) => if !docids.is_disjoint(candidates) { facet_values.push(value);
facet_values.push(value);
},
None => facet_values.push(value),
} }
if facet_values.len() == self.max_values_by_facet { if facet_values.len() == self.max_values_by_facet {
break; break;

View File

@ -147,6 +147,7 @@ pub struct FacetIter<'t, T: 't, KC> {
db: Database<KC, CboRoaringBitmapCodec>, db: Database<KC, CboRoaringBitmapCodec>,
field_id: FieldId, field_id: FieldId,
level_iters: Vec<(RoaringBitmap, Either<FacetRange<'t, T, KC>, FacetRevRange<'t, T, KC>>)>, level_iters: Vec<(RoaringBitmap, Either<FacetRange<'t, T, KC>, FacetRevRange<'t, T, KC>>)>,
must_reduce: bool,
} }
impl<'t, T, KC> FacetIter<'t, T, KC> impl<'t, T, KC> FacetIter<'t, T, KC>
@ -155,7 +156,10 @@ where
KC: for<'a> BytesEncode<'a, EItem = (FieldId, u8, T, T)>, KC: for<'a> BytesEncode<'a, EItem = (FieldId, u8, T, T)>,
T: PartialOrd + Copy + Bounded, T: PartialOrd + Copy + Bounded,
{ {
pub fn new( /// Create a `FacetIter` that will iterate on the different facet entries
/// (facet value + documents ids) and that will reduce the given documents ids
/// while iterating on the different facet levels.
pub fn new_reducing(
rtxn: &'t heed::RoTxn, rtxn: &'t heed::RoTxn,
index: &'t Index, index: &'t Index,
field_id: FieldId, field_id: FieldId,
@ -165,10 +169,14 @@ where
let db = index.facet_field_id_value_docids.remap_key_type::<KC>(); let db = index.facet_field_id_value_docids.remap_key_type::<KC>();
let highest_level = Self::highest_level(rtxn, db, field_id)?.unwrap_or(0); let highest_level = Self::highest_level(rtxn, db, field_id)?.unwrap_or(0);
let highest_iter = FacetRange::new(rtxn, db, field_id, highest_level, Unbounded, Unbounded)?; let highest_iter = FacetRange::new(rtxn, db, field_id, highest_level, Unbounded, Unbounded)?;
Ok(FacetIter { rtxn, db, field_id, level_iters: vec![(documents_ids, Left(highest_iter))] }) let level_iters = vec![(documents_ids, Left(highest_iter))];
Ok(FacetIter { rtxn, db, field_id, level_iters, must_reduce: true })
} }
pub fn new_reverse( /// Create a `FacetIter` that will iterate on the different facet entries in reverse
/// (facet value + documents ids) and that will reduce the given documents ids
/// while iterating on the different facet levels.
pub fn new_reverse_reducing(
rtxn: &'t heed::RoTxn, rtxn: &'t heed::RoTxn,
index: &'t Index, index: &'t Index,
field_id: FieldId, field_id: FieldId,
@ -178,7 +186,26 @@ where
let db = index.facet_field_id_value_docids.remap_key_type::<KC>(); let db = index.facet_field_id_value_docids.remap_key_type::<KC>();
let highest_level = Self::highest_level(rtxn, db, field_id)?.unwrap_or(0); let highest_level = Self::highest_level(rtxn, db, field_id)?.unwrap_or(0);
let highest_iter = FacetRevRange::new(rtxn, db, field_id, highest_level, Unbounded, Unbounded)?; let highest_iter = FacetRevRange::new(rtxn, db, field_id, highest_level, Unbounded, Unbounded)?;
Ok(FacetIter { rtxn, db, field_id, level_iters: vec![(documents_ids, Right(highest_iter))] }) let level_iters = vec![(documents_ids, Right(highest_iter))];
Ok(FacetIter { rtxn, db, field_id, level_iters, must_reduce: true })
}
/// Create a `FacetIter` that will iterate on the different facet entries
/// (facet value + documents ids) and that will not reduce the given documents ids
/// while iterating on the different facet levels, possibly returning multiple times
/// a document id associated with multiple facet values.
pub fn new_non_reducing(
rtxn: &'t heed::RoTxn,
index: &'t Index,
field_id: FieldId,
documents_ids: RoaringBitmap,
) -> heed::Result<FacetIter<'t, T, KC>>
{
let db = index.facet_field_id_value_docids.remap_key_type::<KC>();
let highest_level = Self::highest_level(rtxn, db, field_id)?.unwrap_or(0);
let highest_iter = FacetRange::new(rtxn, db, field_id, highest_level, Unbounded, Unbounded)?;
let level_iters = vec![(documents_ids, Left(highest_iter))];
Ok(FacetIter { rtxn, db, field_id, level_iters, must_reduce: false })
} }
fn highest_level<X>(rtxn: &'t heed::RoTxn, db: Database<KC, X>, fid: FieldId) -> heed::Result<Option<u8>> { fn highest_level<X>(rtxn: &'t heed::RoTxn, db: Database<KC, X>, fid: FieldId) -> heed::Result<Option<u8>> {
@ -216,7 +243,9 @@ where
docids.intersect_with(&documents_ids); docids.intersect_with(&documents_ids);
if !docids.is_empty() { if !docids.is_empty() {
documents_ids.difference_with(&docids); if self.must_reduce {
documents_ids.difference_with(&docids);
}
if level == 0 { if level == 0 {
debug!("found {:?} at {:?}", docids, left); debug!("found {:?} at {:?}", docids, left);

View File

@ -189,9 +189,9 @@ impl<'a> Search<'a> {
} }
} else { } else {
let facet_fn = if ascending { let facet_fn = if ascending {
FacetIter::<f64, FacetLevelValueF64Codec>::new FacetIter::<f64, FacetLevelValueF64Codec>::new_reducing
} else { } else {
FacetIter::<f64, FacetLevelValueF64Codec>::new_reverse FacetIter::<f64, FacetLevelValueF64Codec>::new_reverse_reducing
}; };
let mut limit_tmp = limit; let mut limit_tmp = limit;
let mut output = Vec::new(); let mut output = Vec::new();
@ -226,9 +226,9 @@ impl<'a> Search<'a> {
} }
} else { } else {
let facet_fn = if ascending { let facet_fn = if ascending {
FacetIter::<i64, FacetLevelValueI64Codec>::new FacetIter::<i64, FacetLevelValueI64Codec>::new_reducing
} else { } else {
FacetIter::<i64, FacetLevelValueI64Codec>::new_reverse FacetIter::<i64, FacetLevelValueI64Codec>::new_reverse_reducing
}; };
let mut limit_tmp = limit; let mut limit_tmp = limit;
let mut output = Vec::new(); let mut output = Vec::new();