diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 4f9b273ef..7d366ae1a 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,22 +1,31 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::File; use std::io; use std::mem::size_of; +use std::result::Result as StdResult; +use grenad::Sorter; use heed::zerocopy::AsBytes; use heed::BytesEncode; use itertools::EitherOrBoth; use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use serde_json::{from_slice, Value}; +use FilterableValues::{Empty, Null, Values}; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{create_writer, writer_into_reader}; -use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH}; +use crate::{ + CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH, +}; + +/// The length of the elements that are always in the buffer when inserting new values. +const TRUNCATE_SIZE: usize = size_of::() + size_of::(); /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { @@ -68,7 +77,10 @@ pub fn extract_fid_docid_facet_values( let mut facet_is_null_docids = BTreeMap::::new(); let mut facet_is_empty_docids = BTreeMap::::new(); - let mut key_buffer = Vec::new(); + // We create two buffer for mutable ref issues with closures. + let mut numbers_key_buffer = Vec::new(); + let mut strings_key_buffer = Vec::new(); + let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { // TODO Obkv> @@ -76,18 +88,21 @@ pub fn extract_fid_docid_facet_values( for (field_id, field_bytes) in obkv.iter() { if faceted_fields.contains(&field_id) { - key_buffer.clear(); + numbers_key_buffer.clear(); + strings_key_buffer.clear(); // Set key to the field_id // Note: this encoding is consistent with FieldIdCodec - key_buffer.extend_from_slice(&field_id.to_be_bytes()); + numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes()); + strings_key_buffer.extend_from_slice(&field_id.to_be_bytes()); // Here, we know already that the document must be added to the “field id exists” database let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); let document = BEU32::from(document).get(); // For the other extraction tasks, prefix the key with the field_id and the document_id - key_buffer.extend_from_slice(docid_bytes); + numbers_key_buffer.extend_from_slice(docid_bytes); + strings_key_buffer.extend_from_slice(docid_bytes); let del_add_obkv = obkv::KvReader::new(field_bytes); let del_value = match del_add_obkv.get(DelAdd::Deletion) { @@ -100,8 +115,13 @@ pub fn extract_fid_docid_facet_values( }; // We insert the document id on the Del and the Add side if the field exists. - let (mut del_exists, mut add_exists) = + let (ref mut del_exists, ref mut add_exists) = facet_exists_docids.entry(field_id).or_default(); + let (ref mut del_is_null, ref mut add_is_null) = + facet_is_null_docids.entry(field_id).or_default(); + let (ref mut del_is_empty, ref mut add_is_empty) = + facet_is_empty_docids.entry(field_id).or_default(); + if del_value.is_some() { del_exists.insert(document); } @@ -109,84 +129,58 @@ pub fn extract_fid_docid_facet_values( add_exists.insert(document); } - // TODO extract both Del and Add numbers an strings (dedup) - // TODO use the `itertools::merge_join_by` method to sort and diff both sides (Del and Add) - // TODO if there is a Left generate a Del - // TODO if there is a Right generate an Add - // TODO if there is a Both don't insert - // TODO compare numbers using OrderedFloat and strings using both normalized and original values. - let geo_support = geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng); - let del_filterable_values = del_value.map(|value| extract_facet_values(&value, geo_support)); let add_filterable_values = add_value.map(|value| extract_facet_values(&value, geo_support)); - use FilterableValues::{Empty, Null, Values}; + // Those closures are just here to simplify things a bit. + let mut insert_numbers_diff = |del_numbers, add_numbers| { + insert_numbers_diff( + &mut fid_docid_facet_numbers_sorter, + &mut numbers_key_buffer, + del_numbers, + add_numbers, + ) + }; + let mut insert_strings_diff = |del_strings, add_strings| { + insert_strings_diff( + &mut fid_docid_facet_strings_sorter, + &mut strings_key_buffer, + del_strings, + add_strings, + ) + }; match (del_filterable_values, add_filterable_values) { (None, None) => (), (Some(del_filterable_values), None) => match del_filterable_values { Null => { - let (mut del_is_null, _) = - facet_is_null_docids.entry(field_id).or_default(); del_is_null.insert(document); } Empty => { - let (mut del_is_empty, _) = - facet_is_empty_docids.entry(field_id).or_default(); del_is_empty.insert(document); } Values { numbers, strings } => { - // insert facet numbers in sorter - for number in numbers { - key_buffer.truncate(size_of::() + size_of::()); - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those numbers. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, ().as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; - } - } - - // insert normalized and original facet string in sorter - for (normalized, original) in - strings.into_iter().filter(|(n, _)| !n.is_empty()) - { - let normalized_truncated_value: String = normalized - .char_indices() - .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect(); - - key_buffer.truncate(size_of::() + size_of::()); - key_buffer.extend_from_slice(normalized_truncated_value.as_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those strings. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, original.as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; - } + insert_numbers_diff(numbers, vec![])?; + insert_strings_diff(strings, vec![])?; + } + }, + (None, Some(add_filterable_values)) => match add_filterable_values { + Null => { + add_is_null.insert(document); + } + Empty => { + add_is_empty.insert(document); + } + Values { numbers, strings } => { + insert_numbers_diff(vec![], numbers)?; + insert_strings_diff(vec![], strings)?; } }, - (None, Some(add_filterable_values)) => { - todo!() - } (Some(del_filterable_values), Some(add_filterable_values)) => { - let (mut del_is_null, mut add_is_null) = - facet_is_null_docids.entry(field_id).or_default(); - let (mut del_is_empty, mut add_is_empty) = - facet_is_empty_docids.entry(field_id).or_default(); - match (del_filterable_values, add_filterable_values) { (Null, Null) | (Empty, Empty) => (), (Null, Empty) => { @@ -198,120 +192,31 @@ pub fn extract_fid_docid_facet_values( add_is_null.insert(document); } (Null, Values { numbers, strings }) => { + insert_numbers_diff(vec![], numbers)?; + insert_strings_diff(vec![], strings)?; del_is_null.insert(document); - todo!() } (Empty, Values { numbers, strings }) => { + insert_numbers_diff(vec![], numbers)?; + insert_strings_diff(vec![], strings)?; del_is_empty.insert(document); - todo!() } (Values { numbers, strings }, Null) => { - todo!(); add_is_null.insert(document); + insert_numbers_diff(numbers, vec![])?; + insert_strings_diff(strings, vec![])?; } (Values { numbers, strings }, Empty) => { - todo!(); add_is_empty.insert(document); + insert_numbers_diff(numbers, vec![])?; + insert_strings_diff(strings, vec![])?; } ( - Values { numbers: mut del_numbers, strings: mut del_strings }, - Values { numbers: mut add_numbers, strings: mut add_strings }, + Values { numbers: del_numbers, strings: del_strings }, + Values { numbers: add_numbers, strings: add_strings }, ) => { - // We sort and dedup the float numbers - del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); - add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); - del_numbers.dedup_by_key(|f| OrderedFloat(*f)); - add_numbers.dedup_by_key(|f| OrderedFloat(*f)); - - let merged_numbers_iter = itertools::merge_join_by( - del_numbers.into_iter().map(OrderedFloat), - add_numbers.into_iter().map(OrderedFloat), - |del, add| del.cmp(&add), - ); - - // insert facet numbers in sorter - for eob in merged_numbers_iter { - key_buffer - .truncate(size_of::() + size_of::()); - match eob { - EitherOrBoth::Both(_, _) => (), // no need to touch anything - EitherOrBoth::Left(OrderedFloat(number)) => { - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those numbers. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, ().as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_numbers_sorter - .insert(&key_buffer, bytes)?; - } - } - EitherOrBoth::Right(OrderedFloat(number)) => { - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - // We insert only the Del part of the Obkv to inform - // that we only want to remove all those numbers. - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Addition, ().as_bytes())?; - let bytes = obkv.into_inner()?; - fid_docid_facet_numbers_sorter - .insert(&key_buffer, bytes)?; - } - } - } - } - - // We sort and dedup the normalized and original strings - del_strings.sort_unstable(); - add_strings.sort_unstable(); - del_strings.dedup(); - add_strings.dedup(); - - let merged_strings_iter = itertools::merge_join_by( - del_strings.into_iter().filter(|(n, _)| !n.is_empty()), - add_strings.into_iter().filter(|(n, _)| !n.is_empty()), - |del, add| del.cmp(&add), - ); - - // insert normalized and original facet string in sorter - for eob in merged_strings_iter { - match eob { - EitherOrBoth::Both(_, _) => (), // no need to touch anything - EitherOrBoth::Left((normalized, original)) => { - let truncated = truncate_string(normalized); - - key_buffer.truncate( - size_of::() + size_of::(), - ); - key_buffer.extend_from_slice(truncated.as_bytes()); - - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Deletion, original)?; - let bytes = obkv.into_inner()?; - fid_docid_facet_strings_sorter - .insert(&key_buffer, bytes)?; - } - EitherOrBoth::Right((normalized, original)) => { - let truncated = truncate_string(normalized); - - key_buffer.truncate( - size_of::() + size_of::(), - ); - key_buffer.extend_from_slice(truncated.as_bytes()); - - let mut obkv = KvWriterDelAdd::memory(); - obkv.insert(DelAdd::Addition, original)?; - let bytes = obkv.into_inner()?; - fid_docid_facet_strings_sorter - .insert(&key_buffer, bytes)?; - } - } - } + insert_numbers_diff(del_numbers, add_numbers)?; + insert_strings_diff(del_strings, add_strings)?; } } } @@ -320,19 +225,15 @@ pub fn extract_fid_docid_facet_values( } } + let mut buffer = Vec::new(); let mut facet_exists_docids_writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); for (fid, (del_bitmap, add_bitmap)) in facet_exists_docids.into_iter() { - let mut obkv = KvWriterDelAdd::memory(); - let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); - let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); - obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; - obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; - let bytes = obkv.into_inner()?; - facet_exists_docids_writer.insert(fid.to_be_bytes(), &bytes)?; + deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?; + facet_exists_docids_writer.insert(fid.to_be_bytes(), &buffer)?; } let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?; @@ -342,13 +243,8 @@ pub fn extract_fid_docid_facet_values( tempfile::tempfile()?, ); for (fid, (del_bitmap, add_bitmap)) in facet_is_null_docids.into_iter() { - let mut obkv = KvWriterDelAdd::memory(); - let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); - let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); - obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; - obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; - let bytes = obkv.into_inner()?; - facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bytes)?; + deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?; + facet_is_null_docids_writer.insert(fid.to_be_bytes(), &buffer)?; } let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?; @@ -358,13 +254,8 @@ pub fn extract_fid_docid_facet_values( tempfile::tempfile()?, ); for (fid, (del_bitmap, add_bitmap)) in facet_is_empty_docids.into_iter() { - let mut obkv = KvWriterDelAdd::memory(); - let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&del_bitmap).unwrap(); - let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&add_bitmap).unwrap(); - obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; - obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; - let bytes = obkv.into_inner()?; - facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bytes)?; + deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?; + facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &buffer)?; } let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?; @@ -377,6 +268,141 @@ pub fn extract_fid_docid_facet_values( }) } +/// Generates a vector of bytes containing a DelAdd obkv with two bitmaps. +fn deladd_obkv_cbo_roaring_bitmaps( + buffer: &mut Vec, + del_bitmap: &RoaringBitmap, + add_bitmap: &RoaringBitmap, +) -> io::Result<()> { + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(buffer); + let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(del_bitmap).unwrap(); + let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(add_bitmap).unwrap(); + obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?; + obkv.insert(DelAdd::Addition, add_bitmap_bytes)?; + obkv.finish() +} + +/// Truncates a string to the biggest valid LMDB key size. +fn truncate_string(s: String) -> String { + s.char_indices() + .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) + .map(|(_, c)| c) + .collect() +} + +/// Computes the diff between both Del and Add numbers and +/// only inserts the parts that differ in the sorter. +fn insert_numbers_diff( + fid_docid_facet_numbers_sorter: &mut Sorter, + key_buffer: &mut Vec, + mut del_numbers: Vec, + mut add_numbers: Vec, +) -> Result<()> +where + MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, +{ + // We sort and dedup the float numbers + del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f)); + del_numbers.dedup_by_key(|f| OrderedFloat(*f)); + add_numbers.dedup_by_key(|f| OrderedFloat(*f)); + + let merged_numbers_iter = itertools::merge_join_by( + del_numbers.into_iter().map(OrderedFloat), + add_numbers.into_iter().map(OrderedFloat), + |del, add| del.cmp(add), + ); + + // insert facet numbers in sorter + for eob in merged_numbers_iter { + key_buffer.truncate(TRUNCATE_SIZE); + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; + } + } + EitherOrBoth::Right(OrderedFloat(number)) => { + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); + + // We insert only the Del part of the Obkv to inform + // that we only want to remove all those numbers. + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, ().as_bytes())?; + let bytes = obkv.into_inner()?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; + } + } + } + } + + Ok(()) +} + +/// Computes the diff between both Del and Add strings and +/// only inserts the parts that differ in the sorter. +fn insert_strings_diff( + fid_docid_facet_strings_sorter: &mut Sorter, + key_buffer: &mut Vec, + mut del_strings: Vec<(String, String)>, + mut add_strings: Vec<(String, String)>, +) -> Result<()> +where + MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, +{ + // We sort and dedup the normalized and original strings + del_strings.sort_unstable(); + add_strings.sort_unstable(); + del_strings.dedup(); + add_strings.dedup(); + + let merged_strings_iter = itertools::merge_join_by( + del_strings.into_iter().filter(|(n, _)| !n.is_empty()), + add_strings.into_iter().filter(|(n, _)| !n.is_empty()), + |del, add| del.cmp(add), + ); + + // insert normalized and original facet string in sorter + for eob in merged_strings_iter { + key_buffer.truncate(TRUNCATE_SIZE); + match eob { + EitherOrBoth::Both(_, _) => (), // no need to touch anything + EitherOrBoth::Left((normalized, original)) => { + let truncated = truncate_string(normalized); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Deletion, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; + } + EitherOrBoth::Right((normalized, original)) => { + let truncated = truncate_string(normalized); + key_buffer.extend_from_slice(truncated.as_bytes()); + + let mut obkv = KvWriterDelAdd::memory(); + obkv.insert(DelAdd::Addition, original)?; + let bytes = obkv.into_inner()?; + fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; + } + } + } + + Ok(()) +} + /// Represent what a document field contains. enum FilterableValues { /// Corresponds to the JSON `null` value. @@ -387,6 +413,7 @@ enum FilterableValues { Values { numbers: Vec, strings: Vec<(String, String)> }, } +/// Extracts the facet values of a JSON field. fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues { fn inner_extract_facet_values( value: &Value, @@ -448,10 +475,3 @@ fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues { } } } - -fn truncate_string(mut s: String) -> String { - s.char_indices() - .take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH) - .map(|(_, c)| c) - .collect() -}