Implement all the facet extraction paths and simplify them

This commit is contained in:
Clément Renault 2023-10-18 11:01:02 +02:00
parent 7d546b9c22
commit 178a9802fa
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,22 +1,31 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::mem::size_of;
use std::result::Result as StdResult;
use grenad::Sorter;
use heed::zerocopy::AsBytes;
use heed::BytesEncode;
use itertools::EitherOrBoth;
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap;
use serde_json::{from_slice, Value};
use FilterableValues::{Empty, Null, Values};
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
use crate::error::InternalError;
use crate::facet::value_encoding::f64_into_bytes;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{create_writer, writer_into_reader};
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH};
use crate::{
CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH,
};
/// The length of the elements that are always in the buffer when inserting new values.
const TRUNCATE_SIZE: usize = size_of::<FieldId>() + size_of::<DocumentId>();
/// The extracted facet values stored in grenad files by type.
pub struct ExtractedFacetValues {
@ -68,7 +77,10 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
let mut facet_is_null_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
let mut facet_is_empty_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
let mut key_buffer = Vec::new();
// We create two buffer for mutable ref issues with closures.
let mut numbers_key_buffer = Vec::new();
let mut strings_key_buffer = Vec::new();
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
// TODO Obkv<FieldId, Obkv<DelAdd, serde_json::Value>>
@ -76,18 +88,21 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
for (field_id, field_bytes) in obkv.iter() {
if faceted_fields.contains(&field_id) {
key_buffer.clear();
numbers_key_buffer.clear();
strings_key_buffer.clear();
// Set key to the field_id
// Note: this encoding is consistent with FieldIdCodec
key_buffer.extend_from_slice(&field_id.to_be_bytes());
numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes());
strings_key_buffer.extend_from_slice(&field_id.to_be_bytes());
// Here, we know already that the document must be added to the “field id exists” database
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
let document = BEU32::from(document).get();
// For the other extraction tasks, prefix the key with the field_id and the document_id
key_buffer.extend_from_slice(docid_bytes);
numbers_key_buffer.extend_from_slice(docid_bytes);
strings_key_buffer.extend_from_slice(docid_bytes);
let del_add_obkv = obkv::KvReader::new(field_bytes);
let del_value = match del_add_obkv.get(DelAdd::Deletion) {
@ -100,8 +115,13 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
};
// We insert the document id on the Del and the Add side if the field exists.
let (mut del_exists, mut add_exists) =
let (ref mut del_exists, ref mut add_exists) =
facet_exists_docids.entry(field_id).or_default();
let (ref mut del_is_null, ref mut add_is_null) =
facet_is_null_docids.entry(field_id).or_default();
let (ref mut del_is_empty, ref mut add_is_empty) =
facet_is_empty_docids.entry(field_id).or_default();
if del_value.is_some() {
del_exists.insert(document);
}
@ -109,84 +129,58 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
add_exists.insert(document);
}
// TODO extract both Del and Add numbers an strings (dedup)
// TODO use the `itertools::merge_join_by` method to sort and diff both sides (Del and Add)
// TODO if there is a Left generate a Del
// TODO if there is a Right generate an Add
// TODO if there is a Both don't insert
// TODO compare numbers using OrderedFloat and strings using both normalized and original values.
let geo_support =
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
let del_filterable_values =
del_value.map(|value| extract_facet_values(&value, geo_support));
let add_filterable_values =
add_value.map(|value| extract_facet_values(&value, geo_support));
use FilterableValues::{Empty, Null, Values};
// Those closures are just here to simplify things a bit.
let mut insert_numbers_diff = |del_numbers, add_numbers| {
insert_numbers_diff(
&mut fid_docid_facet_numbers_sorter,
&mut numbers_key_buffer,
del_numbers,
add_numbers,
)
};
let mut insert_strings_diff = |del_strings, add_strings| {
insert_strings_diff(
&mut fid_docid_facet_strings_sorter,
&mut strings_key_buffer,
del_strings,
add_strings,
)
};
match (del_filterable_values, add_filterable_values) {
(None, None) => (),
(Some(del_filterable_values), None) => match del_filterable_values {
Null => {
let (mut del_is_null, _) =
facet_is_null_docids.entry(field_id).or_default();
del_is_null.insert(document);
}
Empty => {
let (mut del_is_empty, _) =
facet_is_empty_docids.entry(field_id).or_default();
del_is_empty.insert(document);
}
Values { numbers, strings } => {
// insert facet numbers in sorter
for number in numbers {
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
// insert normalized and original facet string in sorter
for (normalized, original) in
strings.into_iter().filter(|(n, _)| !n.is_empty())
{
let normalized_truncated_value: String = normalized
.char_indices()
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect();
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
key_buffer.extend_from_slice(normalized_truncated_value.as_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those strings.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, original.as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
}
},
(None, Some(add_filterable_values)) => {
todo!()
(None, Some(add_filterable_values)) => match add_filterable_values {
Null => {
add_is_null.insert(document);
}
Empty => {
add_is_empty.insert(document);
}
Values { numbers, strings } => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
}
},
(Some(del_filterable_values), Some(add_filterable_values)) => {
let (mut del_is_null, mut add_is_null) =
facet_is_null_docids.entry(field_id).or_default();
let (mut del_is_empty, mut add_is_empty) =
facet_is_empty_docids.entry(field_id).or_default();
match (del_filterable_values, add_filterable_values) {
(Null, Null) | (Empty, Empty) => (),
(Null, Empty) => {
@ -198,120 +192,31 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
add_is_null.insert(document);
}
(Null, Values { numbers, strings }) => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
del_is_null.insert(document);
todo!()
}
(Empty, Values { numbers, strings }) => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
del_is_empty.insert(document);
todo!()
}
(Values { numbers, strings }, Null) => {
todo!();
add_is_null.insert(document);
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
}
(Values { numbers, strings }, Empty) => {
todo!();
add_is_empty.insert(document);
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
}
(
Values { numbers: mut del_numbers, strings: mut del_strings },
Values { numbers: mut add_numbers, strings: mut add_strings },
Values { numbers: del_numbers, strings: del_strings },
Values { numbers: add_numbers, strings: add_strings },
) => {
// We sort and dedup the float numbers
del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
del_numbers.dedup_by_key(|f| OrderedFloat(*f));
add_numbers.dedup_by_key(|f| OrderedFloat(*f));
let merged_numbers_iter = itertools::merge_join_by(
del_numbers.into_iter().map(OrderedFloat),
add_numbers.into_iter().map(OrderedFloat),
|del, add| del.cmp(&add),
);
// insert facet numbers in sorter
for eob in merged_numbers_iter {
key_buffer
.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
match eob {
EitherOrBoth::Both(_, _) => (), // no need to touch anything
EitherOrBoth::Left(OrderedFloat(number)) => {
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter
.insert(&key_buffer, bytes)?;
}
}
EitherOrBoth::Right(OrderedFloat(number)) => {
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter
.insert(&key_buffer, bytes)?;
}
}
}
}
// We sort and dedup the normalized and original strings
del_strings.sort_unstable();
add_strings.sort_unstable();
del_strings.dedup();
add_strings.dedup();
let merged_strings_iter = itertools::merge_join_by(
del_strings.into_iter().filter(|(n, _)| !n.is_empty()),
add_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|del, add| del.cmp(&add),
);
// insert normalized and original facet string in sorter
for eob in merged_strings_iter {
match eob {
EitherOrBoth::Both(_, _) => (), // no need to touch anything
EitherOrBoth::Left((normalized, original)) => {
let truncated = truncate_string(normalized);
key_buffer.truncate(
size_of::<FieldId>() + size_of::<DocumentId>(),
);
key_buffer.extend_from_slice(truncated.as_bytes());
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, original)?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter
.insert(&key_buffer, bytes)?;
}
EitherOrBoth::Right((normalized, original)) => {
let truncated = truncate_string(normalized);
key_buffer.truncate(
size_of::<FieldId>() + size_of::<DocumentId>(),
);
key_buffer.extend_from_slice(truncated.as_bytes());
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, original)?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter
.insert(&key_buffer, bytes)?;
}
}
}
insert_numbers_diff(del_numbers, add_numbers)?;
insert_strings_diff(del_strings, add_strings)?;
}
}
}
@ -320,19 +225,15 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
}
}
let mut buffer = Vec::new();
let mut facet_exists_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
for (fid, (del_bitmap, add_bitmap)) in facet_exists_docids.into_iter() {
let mut obkv = KvWriterDelAdd::memory();
let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap();
let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap();
obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?;
obkv.insert(DelAdd::Addition, add_bitmap_bytes)?;
let bytes = obkv.into_inner()?;
facet_exists_docids_writer.insert(fid.to_be_bytes(), &bytes)?;
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
facet_exists_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
}
let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?;
@ -342,13 +243,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
tempfile::tempfile()?,
);
for (fid, (del_bitmap, add_bitmap)) in facet_is_null_docids.into_iter() {
let mut obkv = KvWriterDelAdd::memory();
let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap();
let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap();
obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?;
obkv.insert(DelAdd::Addition, add_bitmap_bytes)?;
let bytes = obkv.into_inner()?;
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bytes)?;
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
}
let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?;
@ -358,13 +254,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
tempfile::tempfile()?,
);
for (fid, (del_bitmap, add_bitmap)) in facet_is_empty_docids.into_iter() {
let mut obkv = KvWriterDelAdd::memory();
let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap();
let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap();
obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?;
obkv.insert(DelAdd::Addition, add_bitmap_bytes)?;
let bytes = obkv.into_inner()?;
facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bytes)?;
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
}
let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?;
@ -377,6 +268,141 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
})
}
/// Generates a vector of bytes containing a DelAdd obkv with two bitmaps.
fn deladd_obkv_cbo_roaring_bitmaps(
buffer: &mut Vec<u8>,
del_bitmap: &RoaringBitmap,
add_bitmap: &RoaringBitmap,
) -> io::Result<()> {
buffer.clear();
let mut obkv = KvWriterDelAdd::new(buffer);
let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(del_bitmap).unwrap();
let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(add_bitmap).unwrap();
obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?;
obkv.insert(DelAdd::Addition, add_bitmap_bytes)?;
obkv.finish()
}
/// Truncates a string to the biggest valid LMDB key size.
fn truncate_string(s: String) -> String {
s.char_indices()
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect()
}
/// Computes the diff between both Del and Add numbers and
/// only inserts the parts that differ in the sorter.
fn insert_numbers_diff<MF>(
fid_docid_facet_numbers_sorter: &mut Sorter<MF>,
key_buffer: &mut Vec<u8>,
mut del_numbers: Vec<f64>,
mut add_numbers: Vec<f64>,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
{
// We sort and dedup the float numbers
del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
del_numbers.dedup_by_key(|f| OrderedFloat(*f));
add_numbers.dedup_by_key(|f| OrderedFloat(*f));
let merged_numbers_iter = itertools::merge_join_by(
del_numbers.into_iter().map(OrderedFloat),
add_numbers.into_iter().map(OrderedFloat),
|del, add| del.cmp(add),
);
// insert facet numbers in sorter
for eob in merged_numbers_iter {
key_buffer.truncate(TRUNCATE_SIZE);
match eob {
EitherOrBoth::Both(_, _) => (), // no need to touch anything
EitherOrBoth::Left(OrderedFloat(number)) => {
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
EitherOrBoth::Right(OrderedFloat(number)) => {
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
}
}
Ok(())
}
/// Computes the diff between both Del and Add strings and
/// only inserts the parts that differ in the sorter.
fn insert_strings_diff<MF>(
fid_docid_facet_strings_sorter: &mut Sorter<MF>,
key_buffer: &mut Vec<u8>,
mut del_strings: Vec<(String, String)>,
mut add_strings: Vec<(String, String)>,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
{
// We sort and dedup the normalized and original strings
del_strings.sort_unstable();
add_strings.sort_unstable();
del_strings.dedup();
add_strings.dedup();
let merged_strings_iter = itertools::merge_join_by(
del_strings.into_iter().filter(|(n, _)| !n.is_empty()),
add_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|del, add| del.cmp(add),
);
// insert normalized and original facet string in sorter
for eob in merged_strings_iter {
key_buffer.truncate(TRUNCATE_SIZE);
match eob {
EitherOrBoth::Both(_, _) => (), // no need to touch anything
EitherOrBoth::Left((normalized, original)) => {
let truncated = truncate_string(normalized);
key_buffer.extend_from_slice(truncated.as_bytes());
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, original)?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
EitherOrBoth::Right((normalized, original)) => {
let truncated = truncate_string(normalized);
key_buffer.extend_from_slice(truncated.as_bytes());
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, original)?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
}
}
Ok(())
}
/// Represent what a document field contains.
enum FilterableValues {
/// Corresponds to the JSON `null` value.
@ -387,6 +413,7 @@ enum FilterableValues {
Values { numbers: Vec<f64>, strings: Vec<(String, String)> },
}
/// Extracts the facet values of a JSON field.
fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
fn inner_extract_facet_values(
value: &Value,
@ -448,10 +475,3 @@ fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
}
}
}
fn truncate_string(mut s: String) -> String {
s.char_indices()
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect()
}