mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-23 13:10:06 +01:00
Work on fid docid facet values rewrite
This commit is contained in:
parent
313b16bec2
commit
0c47defeee
@ -133,6 +133,8 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
||||
self.db.delete_range(wtxn, &range).map(drop)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO the new_data is an Reader<Obkv<Key, Obkv<DelAdd, RoaringBitmap>>>
|
||||
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
|
||||
let new_data = match self.new_data.take() {
|
||||
Some(x) => x,
|
||||
|
@ -115,6 +115,7 @@ pub struct FacetsUpdate<'i> {
|
||||
min_level_size: u8,
|
||||
}
|
||||
impl<'i> FacetsUpdate<'i> {
|
||||
// TODO grenad::Reader<Key, Obkv<DelAdd, RoaringBitmap>>
|
||||
pub fn new(
|
||||
index: &'i Index,
|
||||
facet_type: FacetType,
|
||||
|
@ -17,6 +17,7 @@ use crate::Result;
|
||||
/// documents ids from the given chunk of docid facet number positions.
|
||||
#[logging_timer::time]
|
||||
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
// TODO Reader<Key, Obkv<DelAdd, ()>>
|
||||
docid_fid_facet_number: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
@ -26,6 +27,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
|
||||
let mut facet_number_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
// TODO We must modify the merger to do unions of Del and Add separately
|
||||
merge_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
@ -34,12 +36,14 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
);
|
||||
|
||||
let mut cursor = docid_fid_facet_number.into_cursor()?;
|
||||
// TODO the value is a Obkv<DelAdd, ()> and must be taken into account
|
||||
while let Some((key_bytes, _)) = cursor.move_on_next()? {
|
||||
let (field_id, document_id, number) =
|
||||
FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap();
|
||||
|
||||
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
|
||||
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
|
||||
// TODO We must put a Obkv<DelAdd, RoaringBitmap>
|
||||
facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?;
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
|
||||
/// documents ids from the given chunk of docid facet string positions.
|
||||
#[logging_timer::time]
|
||||
pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
// TODO Reader<Key, Obkv<DelAdd, OriginalString>>
|
||||
docid_fid_facet_string: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
@ -24,6 +25,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
|
||||
let mut facet_string_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Stable,
|
||||
// TODO We must modify the merger to do unions of Del and Add separately
|
||||
merge_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
@ -33,6 +35,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
|
||||
let mut cursor = docid_fid_facet_string.into_cursor()?;
|
||||
while let Some((key, _original_value_bytes)) = cursor.move_on_next()? {
|
||||
// TODO the value is a Obkv<DelAdd, OriginalString> and must be taken into account
|
||||
let (field_id_bytes, bytes) = try_split_array_at(key).unwrap();
|
||||
let field_id = FieldId::from_be_bytes(field_id_bytes);
|
||||
|
||||
@ -54,6 +57,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
let key = FacetGroupKey { field_id, level: 0, left_bound: normalised_value };
|
||||
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
|
||||
// document id is encoded in native-endian because of the CBO roaring bitmap codec
|
||||
// TODO Reader<KeyBytes, Obkv<DelAdd, RoaringBitmap>>
|
||||
facet_string_docids_sorter.insert(&key_bytes, document_id.to_ne_bytes())?;
|
||||
}
|
||||
|
||||
|
@ -6,17 +6,21 @@ use std::mem::size_of;
|
||||
|
||||
use heed::zerocopy::AsBytes;
|
||||
use heed::BytesEncode;
|
||||
use itertools::EitherOrBoth;
|
||||
use ordered_float::OrderedFloat;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde_json::{from_slice, Value};
|
||||
|
||||
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
||||
use crate::error::InternalError;
|
||||
use crate::facet::value_encoding::f64_into_bytes;
|
||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::{create_writer, writer_into_reader};
|
||||
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH};
|
||||
|
||||
/// The extracted facet values stored in grenad files by type.
|
||||
pub struct ExtractedFacetValues {
|
||||
// TOOD rename into `fid_docid_*`
|
||||
pub docid_fid_facet_numbers_chunk: grenad::Reader<BufReader<File>>,
|
||||
pub docid_fid_facet_strings_chunk: grenad::Reader<BufReader<File>>,
|
||||
pub fid_facet_is_null_docids_chunk: grenad::Reader<BufReader<File>>,
|
||||
@ -31,6 +35,7 @@ pub struct ExtractedFacetValues {
|
||||
/// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially.
|
||||
#[logging_timer::time]
|
||||
pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
// TODO Reader<Obkv<FieldId, Obkv<DelAdd, serde_json::Value>>>
|
||||
obkv_documents: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
faceted_fields: &HashSet<FieldId>,
|
||||
@ -58,13 +63,15 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
max_memory.map(|m| m / 2),
|
||||
);
|
||||
|
||||
let mut facet_exists_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
||||
let mut facet_is_null_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
||||
let mut facet_is_empty_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
||||
// The tuples represents the Del and Add side for a bitmap
|
||||
let mut facet_exists_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
|
||||
let mut facet_is_null_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
|
||||
let mut facet_is_empty_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
|
||||
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut cursor = obkv_documents.into_cursor()?;
|
||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
||||
// TODO Obkv<FieldId, Obkv<DelAdd, serde_json::Value>>
|
||||
let obkv = obkv::KvReader::new(value);
|
||||
|
||||
for (field_id, field_bytes) in obkv.iter() {
|
||||
@ -79,50 +86,233 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
||||
let document = BEU32::from(document).get();
|
||||
|
||||
facet_exists_docids.entry(field_id).or_default().insert(document);
|
||||
|
||||
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
||||
key_buffer.extend_from_slice(docid_bytes);
|
||||
|
||||
let value = from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
|
||||
let del_add_obkv = obkv::KvReader::new(field_bytes);
|
||||
let del_value = match del_add_obkv.get(DelAdd::Deletion) {
|
||||
Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?,
|
||||
None => None,
|
||||
};
|
||||
let add_value = match del_add_obkv.get(DelAdd::Addition) {
|
||||
Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?,
|
||||
None => None,
|
||||
};
|
||||
|
||||
match extract_facet_values(
|
||||
&value,
|
||||
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng),
|
||||
) {
|
||||
FilterableValues::Null => {
|
||||
facet_is_null_docids.entry(field_id).or_default().insert(document);
|
||||
}
|
||||
FilterableValues::Empty => {
|
||||
facet_is_empty_docids.entry(field_id).or_default().insert(document);
|
||||
}
|
||||
FilterableValues::Values { numbers, strings } => {
|
||||
// insert facet numbers in sorter
|
||||
for number in numbers {
|
||||
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
||||
if let Some(value_bytes) = f64_into_bytes(number) {
|
||||
key_buffer.extend_from_slice(&value_bytes);
|
||||
key_buffer.extend_from_slice(&number.to_be_bytes());
|
||||
// We insert the document id on the Del and the Add side if the field exists.
|
||||
let (mut del_exists, mut add_exists) =
|
||||
facet_exists_docids.entry(field_id).or_default();
|
||||
if del_value.is_some() {
|
||||
del_exists.insert(document);
|
||||
}
|
||||
if add_value.is_some() {
|
||||
add_exists.insert(document);
|
||||
}
|
||||
|
||||
fid_docid_facet_numbers_sorter
|
||||
.insert(&key_buffer, ().as_bytes())?;
|
||||
// TODO extract both Del and Add numbers an strings (dedup)
|
||||
// TODO use the `itertools::merge_join_by` method to sort and diff both sides (Del and Add)
|
||||
// TODO if there is a Left generate a Del
|
||||
// TODO if there is a Right generate an Add
|
||||
// TODO if there is a Both don't insert
|
||||
// TODO compare numbers using OrderedFloat and strings using both normalized and original values.
|
||||
|
||||
let geo_support =
|
||||
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
|
||||
|
||||
let del_filterable_values =
|
||||
del_value.map(|value| extract_facet_values(&value, geo_support));
|
||||
let add_filterable_values =
|
||||
add_value.map(|value| extract_facet_values(&value, geo_support));
|
||||
|
||||
use FilterableValues::{Empty, Null, Values};
|
||||
|
||||
match (del_filterable_values, add_filterable_values) {
|
||||
(None, None) => (),
|
||||
(Some(del_filterable_values), None) => match del_filterable_values {
|
||||
Null => {
|
||||
let (mut del_is_null, _) =
|
||||
facet_is_null_docids.entry(field_id).or_default();
|
||||
del_is_null.insert(document);
|
||||
}
|
||||
Empty => {
|
||||
let (mut del_is_empty, _) =
|
||||
facet_is_empty_docids.entry(field_id).or_default();
|
||||
del_is_empty.insert(document);
|
||||
}
|
||||
Values { numbers, strings } => {
|
||||
// insert facet numbers in sorter
|
||||
for number in numbers {
|
||||
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
||||
if let Some(value_bytes) = f64_into_bytes(number) {
|
||||
key_buffer.extend_from_slice(&value_bytes);
|
||||
key_buffer.extend_from_slice(&number.to_be_bytes());
|
||||
|
||||
// We insert only the Del part of the Obkv to inform
|
||||
// that we only want to remove all those numbers.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
|
||||
// insert normalized and original facet string in sorter
|
||||
for (normalized, original) in
|
||||
strings.into_iter().filter(|(n, _)| !n.is_empty())
|
||||
{
|
||||
let normalized_truncated_value: String = normalized
|
||||
.char_indices()
|
||||
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
|
||||
.map(|(_, c)| c)
|
||||
.collect();
|
||||
|
||||
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
||||
key_buffer.extend_from_slice(normalized_truncated_value.as_bytes());
|
||||
|
||||
// We insert only the Del part of the Obkv to inform
|
||||
// that we only want to remove all those strings.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, original.as_bytes())?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
},
|
||||
(None, Some(add_filterable_values)) => {
|
||||
todo!()
|
||||
}
|
||||
(Some(del_filterable_values), Some(add_filterable_values)) => {
|
||||
let (mut del_is_null, mut add_is_null) =
|
||||
facet_is_null_docids.entry(field_id).or_default();
|
||||
let (mut del_is_empty, mut add_is_empty) =
|
||||
facet_is_empty_docids.entry(field_id).or_default();
|
||||
|
||||
// insert normalized and original facet string in sorter
|
||||
for (normalized, original) in
|
||||
strings.into_iter().filter(|(n, _)| !n.is_empty())
|
||||
{
|
||||
let normalized_truncated_value: String = normalized
|
||||
.char_indices()
|
||||
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
|
||||
.map(|(_, c)| c)
|
||||
.collect();
|
||||
match (del_filterable_values, add_filterable_values) {
|
||||
(Null, Null) | (Empty, Empty) => (),
|
||||
(Null, Empty) => {
|
||||
del_is_null.insert(document);
|
||||
add_is_empty.insert(document);
|
||||
}
|
||||
(Empty, Null) => {
|
||||
del_is_empty.insert(document);
|
||||
add_is_null.insert(document);
|
||||
}
|
||||
(Null, Values { numbers, strings }) => {
|
||||
del_is_null.insert(document);
|
||||
todo!()
|
||||
}
|
||||
(Empty, Values { numbers, strings }) => {
|
||||
del_is_empty.insert(document);
|
||||
todo!()
|
||||
}
|
||||
(Values { numbers, strings }, Null) => {
|
||||
todo!();
|
||||
add_is_null.insert(document);
|
||||
}
|
||||
(Values { numbers, strings }, Empty) => {
|
||||
todo!();
|
||||
add_is_empty.insert(document);
|
||||
}
|
||||
(
|
||||
Values { numbers: mut del_numbers, strings: mut del_strings },
|
||||
Values { numbers: mut add_numbers, strings: mut add_strings },
|
||||
) => {
|
||||
// We sort and dedup the float numbers
|
||||
del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
|
||||
add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
|
||||
del_numbers.dedup_by_key(|f| OrderedFloat(*f));
|
||||
add_numbers.dedup_by_key(|f| OrderedFloat(*f));
|
||||
|
||||
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
||||
key_buffer.extend_from_slice(normalized_truncated_value.as_bytes());
|
||||
fid_docid_facet_strings_sorter
|
||||
.insert(&key_buffer, original.as_bytes())?;
|
||||
let merged_numbers_iter = itertools::merge_join_by(
|
||||
del_numbers.into_iter().map(OrderedFloat),
|
||||
add_numbers.into_iter().map(OrderedFloat),
|
||||
|del, add| del.cmp(&add),
|
||||
);
|
||||
|
||||
// insert facet numbers in sorter
|
||||
for eob in merged_numbers_iter {
|
||||
key_buffer
|
||||
.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
||||
match eob {
|
||||
EitherOrBoth::Both(_, _) => (), // no need to touch anything
|
||||
EitherOrBoth::Left(OrderedFloat(number)) => {
|
||||
if let Some(value_bytes) = f64_into_bytes(number) {
|
||||
key_buffer.extend_from_slice(&value_bytes);
|
||||
key_buffer.extend_from_slice(&number.to_be_bytes());
|
||||
|
||||
// We insert only the Del part of the Obkv to inform
|
||||
// that we only want to remove all those numbers.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
fid_docid_facet_numbers_sorter
|
||||
.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
EitherOrBoth::Right(OrderedFloat(number)) => {
|
||||
if let Some(value_bytes) = f64_into_bytes(number) {
|
||||
key_buffer.extend_from_slice(&value_bytes);
|
||||
key_buffer.extend_from_slice(&number.to_be_bytes());
|
||||
|
||||
// We insert only the Del part of the Obkv to inform
|
||||
// that we only want to remove all those numbers.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, ().as_bytes())?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
fid_docid_facet_numbers_sorter
|
||||
.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We sort and dedup the normalized and original strings
|
||||
del_strings.sort_unstable();
|
||||
add_strings.sort_unstable();
|
||||
del_strings.dedup();
|
||||
add_strings.dedup();
|
||||
|
||||
let merged_strings_iter = itertools::merge_join_by(
|
||||
del_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|
||||
add_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|
||||
|del, add| del.cmp(&add),
|
||||
);
|
||||
|
||||
// insert normalized and original facet string in sorter
|
||||
for eob in merged_strings_iter {
|
||||
match eob {
|
||||
EitherOrBoth::Both(_, _) => (), // no need to touch anything
|
||||
EitherOrBoth::Left((normalized, original)) => {
|
||||
let truncated = truncate_string(normalized);
|
||||
|
||||
key_buffer.truncate(
|
||||
size_of::<FieldId>() + size_of::<DocumentId>(),
|
||||
);
|
||||
key_buffer.extend_from_slice(truncated.as_bytes());
|
||||
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, original)?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
fid_docid_facet_strings_sorter
|
||||
.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
EitherOrBoth::Right((normalized, original)) => {
|
||||
let truncated = truncate_string(normalized);
|
||||
|
||||
key_buffer.truncate(
|
||||
size_of::<FieldId>() + size_of::<DocumentId>(),
|
||||
);
|
||||
key_buffer.extend_from_slice(truncated.as_bytes());
|
||||
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, original)?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
fid_docid_facet_strings_sorter
|
||||
.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -135,6 +325,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
// TODO generate an Obkv<DelAdd, Bitmap>
|
||||
for (fid, bitmap) in facet_exists_docids.into_iter() {
|
||||
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
|
||||
facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
|
||||
@ -146,12 +337,14 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
indexer.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
// TODO generate an Obkv<DelAdd, Bitmap>
|
||||
for (fid, bitmap) in facet_is_null_docids.into_iter() {
|
||||
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
|
||||
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
|
||||
}
|
||||
let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?;
|
||||
|
||||
// TODO generate an Obkv<DelAdd, Bitmap>
|
||||
let mut facet_is_empty_docids_writer = create_writer(
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
@ -243,3 +436,10 @@ fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_string(mut s: String) -> String {
|
||||
s.char_indices()
|
||||
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
|
||||
.map(|(_, c)| c)
|
||||
.collect()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user