mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-05 02:55:46 +01:00
Merge pull request #4136 from meilisearch/diff-indexing-facet-values
Diff Indexing on the facet values extractors
This commit is contained in:
commit
0f6a0b1ab8
@ -60,12 +60,16 @@ impl CboRoaringBitmapCodec {
|
|||||||
/// if the merged values length is under the threshold, values are directly
|
/// if the merged values length is under the threshold, values are directly
|
||||||
/// serialized in the buffer else a RoaringBitmap is created from the
|
/// serialized in the buffer else a RoaringBitmap is created from the
|
||||||
/// values and is serialized in the buffer.
|
/// values and is serialized in the buffer.
|
||||||
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
|
pub fn merge_into<I, A>(slices: I, buffer: &mut Vec<u8>) -> io::Result<()>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = A>,
|
||||||
|
A: AsRef<[u8]>,
|
||||||
|
{
|
||||||
let mut roaring = RoaringBitmap::new();
|
let mut roaring = RoaringBitmap::new();
|
||||||
let mut vec = Vec::new();
|
let mut vec = Vec::new();
|
||||||
|
|
||||||
for bytes in slices {
|
for bytes in slices {
|
||||||
if bytes.len() <= THRESHOLD * size_of::<u32>() {
|
if bytes.as_ref().len() <= THRESHOLD * size_of::<u32>() {
|
||||||
let mut reader = bytes.as_ref();
|
let mut reader = bytes.as_ref();
|
||||||
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
|
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
|
||||||
vec.push(integer);
|
vec.push(integer);
|
||||||
@ -85,7 +89,7 @@ impl CboRoaringBitmapCodec {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We can unwrap safely because the vector is sorted upper.
|
// We can unwrap safely because the vector is sorted upper.
|
||||||
let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()).unwrap();
|
let roaring = RoaringBitmap::from_sorted_iter(vec).unwrap();
|
||||||
roaring.serialize_into(buffer)?;
|
roaring.serialize_into(buffer)?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -132,6 +132,8 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
|||||||
self.db.delete_range(wtxn, &range).map(drop)?;
|
self.db.delete_range(wtxn, &range).map(drop)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO the new_data is an Reader<Obkv<Key, Obkv<DelAdd, RoaringBitmap>>>
|
||||||
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
|
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
|
||||||
let new_data = match self.new_data.take() {
|
let new_data = match self.new_data.take() {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
|
@ -114,6 +114,7 @@ pub struct FacetsUpdate<'i> {
|
|||||||
min_level_size: u8,
|
min_level_size: u8,
|
||||||
}
|
}
|
||||||
impl<'i> FacetsUpdate<'i> {
|
impl<'i> FacetsUpdate<'i> {
|
||||||
|
// TODO grenad::Reader<Key, Obkv<DelAdd, RoaringBitmap>>
|
||||||
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
|
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
|
||||||
let database = match facet_type {
|
let database = match facet_type {
|
||||||
FacetType::String => index
|
FacetType::String => index
|
||||||
|
@ -4,11 +4,12 @@ use std::io;
|
|||||||
use heed::{BytesDecode, BytesEncode};
|
use heed::{BytesDecode, BytesEncode};
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
|
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
|
||||||
};
|
};
|
||||||
use crate::heed_codec::facet::{
|
use crate::heed_codec::facet::{
|
||||||
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
|
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
|
||||||
};
|
};
|
||||||
|
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
/// Extracts the facet number and the documents ids where this facet number appear.
|
/// Extracts the facet number and the documents ids where this facet number appear.
|
||||||
@ -17,7 +18,7 @@ use crate::Result;
|
|||||||
/// documents ids from the given chunk of docid facet number positions.
|
/// documents ids from the given chunk of docid facet number positions.
|
||||||
#[logging_timer::time]
|
#[logging_timer::time]
|
||||||
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||||
docid_fid_facet_number: grenad::Reader<R>,
|
fid_docid_facet_number: grenad::Reader<R>,
|
||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
) -> Result<grenad::Reader<File>> {
|
) -> Result<grenad::Reader<File>> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
@ -26,21 +27,30 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
let mut facet_number_docids_sorter = create_sorter(
|
let mut facet_number_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory,
|
max_memory,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut cursor = docid_fid_facet_number.into_cursor()?;
|
let mut buffer = Vec::new();
|
||||||
while let Some((key_bytes, _)) = cursor.move_on_next()? {
|
let mut cursor = fid_docid_facet_number.into_cursor()?;
|
||||||
|
while let Some((key_bytes, deladd_obkv_bytes)) = cursor.move_on_next()? {
|
||||||
let (field_id, document_id, number) =
|
let (field_id, document_id, number) =
|
||||||
FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap();
|
FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap();
|
||||||
|
|
||||||
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
|
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
|
||||||
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
|
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
|
||||||
facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?;
|
|
||||||
|
buffer.clear();
|
||||||
|
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||||
|
for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() {
|
||||||
|
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
|
||||||
|
}
|
||||||
|
obkv.finish()?;
|
||||||
|
|
||||||
|
facet_number_docids_sorter.insert(key_bytes, &buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
sorter_into_reader(facet_number_docids_sorter, indexer)
|
sorter_into_reader(facet_number_docids_sorter, indexer)
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::{io, str};
|
||||||
|
|
||||||
use heed::BytesEncode;
|
use heed::BytesEncode;
|
||||||
|
|
||||||
use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters};
|
use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters};
|
||||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
|
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
|
||||||
use crate::heed_codec::StrRefCodec;
|
use crate::heed_codec::StrRefCodec;
|
||||||
use crate::update::index_documents::merge_cbo_roaring_bitmaps;
|
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
|
use crate::update::index_documents::helpers::merge_deladd_cbo_roaring_bitmaps;
|
||||||
|
use crate::{FieldId, Result};
|
||||||
|
|
||||||
/// Extracts the facet string and the documents ids where this facet string appear.
|
/// Extracts the facet string and the documents ids where this facet string appear.
|
||||||
///
|
///
|
||||||
@ -24,15 +25,16 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
let mut facet_string_docids_sorter = create_sorter(
|
let mut facet_string_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Stable,
|
grenad::SortAlgorithm::Stable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory,
|
max_memory,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
let mut cursor = docid_fid_facet_string.into_cursor()?;
|
let mut cursor = docid_fid_facet_string.into_cursor()?;
|
||||||
while let Some((key, _original_value_bytes)) = cursor.move_on_next()? {
|
while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? {
|
||||||
let (field_id_bytes, bytes) = try_split_array_at(key).unwrap();
|
let (field_id_bytes, bytes) = try_split_array_at(key).unwrap();
|
||||||
let field_id = FieldId::from_be_bytes(field_id_bytes);
|
let field_id = FieldId::from_be_bytes(field_id_bytes);
|
||||||
|
|
||||||
@ -40,21 +42,17 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
|||||||
try_split_array_at::<_, 4>(bytes).unwrap();
|
try_split_array_at::<_, 4>(bytes).unwrap();
|
||||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||||
|
|
||||||
let mut normalised_value = std::str::from_utf8(normalized_value_bytes)?;
|
let normalized_value = str::from_utf8(normalized_value_bytes)?;
|
||||||
|
let key = FacetGroupKey { field_id, level: 0, left_bound: normalized_value };
|
||||||
let normalised_truncated_value: String;
|
|
||||||
if normalised_value.len() > MAX_FACET_VALUE_LENGTH {
|
|
||||||
normalised_truncated_value = normalised_value
|
|
||||||
.char_indices()
|
|
||||||
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
|
|
||||||
.map(|(_, c)| c)
|
|
||||||
.collect();
|
|
||||||
normalised_value = normalised_truncated_value.as_str();
|
|
||||||
}
|
|
||||||
let key = FacetGroupKey { field_id, level: 0, left_bound: normalised_value };
|
|
||||||
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
|
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
|
||||||
// document id is encoded in native-endian because of the CBO roaring bitmap codec
|
|
||||||
facet_string_docids_sorter.insert(&key_bytes, document_id.to_ne_bytes())?;
|
buffer.clear();
|
||||||
|
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||||
|
for (deladd_key, _) in KvReaderDelAdd::new(deladd_original_value_bytes).iter() {
|
||||||
|
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
|
||||||
|
}
|
||||||
|
obkv.finish()?;
|
||||||
|
facet_string_docids_sorter.insert(&key_bytes, &buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
sorter_into_reader(facet_string_docids_sorter, indexer)
|
sorter_into_reader(facet_string_docids_sorter, indexer)
|
||||||
|
@ -1,24 +1,36 @@
|
|||||||
|
use std::borrow::Cow;
|
||||||
use std::collections::{BTreeMap, HashSet};
|
use std::collections::{BTreeMap, HashSet};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
use std::result::Result as StdResult;
|
||||||
|
|
||||||
|
use grenad::Sorter;
|
||||||
use heed::zerocopy::AsBytes;
|
use heed::zerocopy::AsBytes;
|
||||||
use heed::BytesEncode;
|
use heed::BytesEncode;
|
||||||
|
use itertools::EitherOrBoth;
|
||||||
|
use ordered_float::OrderedFloat;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::{from_slice, Value};
|
use serde_json::{from_slice, Value};
|
||||||
|
use FilterableValues::{Empty, Null, Values};
|
||||||
|
|
||||||
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
||||||
use crate::error::InternalError;
|
use crate::error::InternalError;
|
||||||
use crate::facet::value_encoding::f64_into_bytes;
|
use crate::facet::value_encoding::f64_into_bytes;
|
||||||
|
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
||||||
use crate::update::index_documents::{create_writer, writer_into_reader};
|
use crate::update::index_documents::{create_writer, writer_into_reader};
|
||||||
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH};
|
use crate::{
|
||||||
|
CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// The length of the elements that are always in the buffer when inserting new values.
|
||||||
|
const TRUNCATE_SIZE: usize = size_of::<FieldId>() + size_of::<DocumentId>();
|
||||||
|
|
||||||
/// The extracted facet values stored in grenad files by type.
|
/// The extracted facet values stored in grenad files by type.
|
||||||
pub struct ExtractedFacetValues {
|
pub struct ExtractedFacetValues {
|
||||||
pub docid_fid_facet_numbers_chunk: grenad::Reader<File>,
|
pub fid_docid_facet_numbers_chunk: grenad::Reader<File>,
|
||||||
pub docid_fid_facet_strings_chunk: grenad::Reader<File>,
|
pub fid_docid_facet_strings_chunk: grenad::Reader<File>,
|
||||||
pub fid_facet_is_null_docids_chunk: grenad::Reader<File>,
|
pub fid_facet_is_null_docids_chunk: grenad::Reader<File>,
|
||||||
pub fid_facet_is_empty_docids_chunk: grenad::Reader<File>,
|
pub fid_facet_is_empty_docids_chunk: grenad::Reader<File>,
|
||||||
pub fid_facet_exists_docids_chunk: grenad::Reader<File>,
|
pub fid_facet_exists_docids_chunk: grenad::Reader<File>,
|
||||||
@ -58,71 +70,150 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
max_memory.map(|m| m / 2),
|
max_memory.map(|m| m / 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut facet_exists_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
// The tuples represents the Del and Add side for a bitmap
|
||||||
let mut facet_is_null_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
let mut facet_exists_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
|
||||||
let mut facet_is_empty_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
let mut facet_is_null_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
|
||||||
|
let mut facet_is_empty_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
|
||||||
|
|
||||||
|
// We create two buffer for mutable ref issues with closures.
|
||||||
|
let mut numbers_key_buffer = Vec::new();
|
||||||
|
let mut strings_key_buffer = Vec::new();
|
||||||
|
|
||||||
let mut key_buffer = Vec::new();
|
|
||||||
let mut cursor = obkv_documents.into_cursor()?;
|
let mut cursor = obkv_documents.into_cursor()?;
|
||||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
||||||
let obkv = obkv::KvReader::new(value);
|
let obkv = obkv::KvReader::new(value);
|
||||||
|
|
||||||
for (field_id, field_bytes) in obkv.iter() {
|
for (field_id, field_bytes) in obkv.iter() {
|
||||||
if faceted_fields.contains(&field_id) {
|
if faceted_fields.contains(&field_id) {
|
||||||
key_buffer.clear();
|
numbers_key_buffer.clear();
|
||||||
|
strings_key_buffer.clear();
|
||||||
|
|
||||||
// Set key to the field_id
|
// Set key to the field_id
|
||||||
// Note: this encoding is consistent with FieldIdCodec
|
// Note: this encoding is consistent with FieldIdCodec
|
||||||
key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||||
|
strings_key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||||
|
|
||||||
// Here, we know already that the document must be added to the “field id exists” database
|
|
||||||
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
||||||
let document = BEU32::from(document).get();
|
let document = BEU32::from(document).get();
|
||||||
|
|
||||||
facet_exists_docids.entry(field_id).or_default().insert(document);
|
|
||||||
|
|
||||||
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
||||||
key_buffer.extend_from_slice(docid_bytes);
|
numbers_key_buffer.extend_from_slice(docid_bytes);
|
||||||
|
strings_key_buffer.extend_from_slice(docid_bytes);
|
||||||
|
|
||||||
let value = from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
|
let del_add_obkv = obkv::KvReader::new(field_bytes);
|
||||||
|
let del_value = match del_add_obkv.get(DelAdd::Deletion) {
|
||||||
|
Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?,
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
let add_value = match del_add_obkv.get(DelAdd::Addition) {
|
||||||
|
Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?,
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
match extract_facet_values(
|
// We insert the document id on the Del and the Add side if the field exists.
|
||||||
&value,
|
let (ref mut del_exists, ref mut add_exists) =
|
||||||
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng),
|
facet_exists_docids.entry(field_id).or_default();
|
||||||
) {
|
let (ref mut del_is_null, ref mut add_is_null) =
|
||||||
FilterableValues::Null => {
|
facet_is_null_docids.entry(field_id).or_default();
|
||||||
facet_is_null_docids.entry(field_id).or_default().insert(document);
|
let (ref mut del_is_empty, ref mut add_is_empty) =
|
||||||
}
|
facet_is_empty_docids.entry(field_id).or_default();
|
||||||
FilterableValues::Empty => {
|
|
||||||
facet_is_empty_docids.entry(field_id).or_default().insert(document);
|
|
||||||
}
|
|
||||||
FilterableValues::Values { numbers, strings } => {
|
|
||||||
// insert facet numbers in sorter
|
|
||||||
for number in numbers {
|
|
||||||
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
|
||||||
if let Some(value_bytes) = f64_into_bytes(number) {
|
|
||||||
key_buffer.extend_from_slice(&value_bytes);
|
|
||||||
key_buffer.extend_from_slice(&number.to_be_bytes());
|
|
||||||
|
|
||||||
fid_docid_facet_numbers_sorter
|
if del_value.is_some() {
|
||||||
.insert(&key_buffer, ().as_bytes())?;
|
del_exists.insert(document);
|
||||||
}
|
}
|
||||||
|
if add_value.is_some() {
|
||||||
|
add_exists.insert(document);
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert normalized and original facet string in sorter
|
let geo_support =
|
||||||
for (normalized, original) in
|
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
|
||||||
strings.into_iter().filter(|(n, _)| !n.is_empty())
|
let del_filterable_values =
|
||||||
{
|
del_value.map(|value| extract_facet_values(&value, geo_support));
|
||||||
let normalized_truncated_value: String = normalized
|
let add_filterable_values =
|
||||||
.char_indices()
|
add_value.map(|value| extract_facet_values(&value, geo_support));
|
||||||
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
|
|
||||||
.map(|(_, c)| c)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
|
// Those closures are just here to simplify things a bit.
|
||||||
key_buffer.extend_from_slice(normalized_truncated_value.as_bytes());
|
let mut insert_numbers_diff = |del_numbers, add_numbers| {
|
||||||
fid_docid_facet_strings_sorter
|
insert_numbers_diff(
|
||||||
.insert(&key_buffer, original.as_bytes())?;
|
&mut fid_docid_facet_numbers_sorter,
|
||||||
|
&mut numbers_key_buffer,
|
||||||
|
del_numbers,
|
||||||
|
add_numbers,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
let mut insert_strings_diff = |del_strings, add_strings| {
|
||||||
|
insert_strings_diff(
|
||||||
|
&mut fid_docid_facet_strings_sorter,
|
||||||
|
&mut strings_key_buffer,
|
||||||
|
del_strings,
|
||||||
|
add_strings,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
match (del_filterable_values, add_filterable_values) {
|
||||||
|
(None, None) => (),
|
||||||
|
(Some(del_filterable_values), None) => match del_filterable_values {
|
||||||
|
Null => {
|
||||||
|
del_is_null.insert(document);
|
||||||
|
}
|
||||||
|
Empty => {
|
||||||
|
del_is_empty.insert(document);
|
||||||
|
}
|
||||||
|
Values { numbers, strings } => {
|
||||||
|
insert_numbers_diff(numbers, vec![])?;
|
||||||
|
insert_strings_diff(strings, vec![])?;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(None, Some(add_filterable_values)) => match add_filterable_values {
|
||||||
|
Null => {
|
||||||
|
add_is_null.insert(document);
|
||||||
|
}
|
||||||
|
Empty => {
|
||||||
|
add_is_empty.insert(document);
|
||||||
|
}
|
||||||
|
Values { numbers, strings } => {
|
||||||
|
insert_numbers_diff(vec![], numbers)?;
|
||||||
|
insert_strings_diff(vec![], strings)?;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(Some(del_filterable_values), Some(add_filterable_values)) => {
|
||||||
|
match (del_filterable_values, add_filterable_values) {
|
||||||
|
(Null, Null) | (Empty, Empty) => (),
|
||||||
|
(Null, Empty) => {
|
||||||
|
del_is_null.insert(document);
|
||||||
|
add_is_empty.insert(document);
|
||||||
|
}
|
||||||
|
(Empty, Null) => {
|
||||||
|
del_is_empty.insert(document);
|
||||||
|
add_is_null.insert(document);
|
||||||
|
}
|
||||||
|
(Null, Values { numbers, strings }) => {
|
||||||
|
insert_numbers_diff(vec![], numbers)?;
|
||||||
|
insert_strings_diff(vec![], strings)?;
|
||||||
|
del_is_null.insert(document);
|
||||||
|
}
|
||||||
|
(Empty, Values { numbers, strings }) => {
|
||||||
|
insert_numbers_diff(vec![], numbers)?;
|
||||||
|
insert_strings_diff(vec![], strings)?;
|
||||||
|
del_is_empty.insert(document);
|
||||||
|
}
|
||||||
|
(Values { numbers, strings }, Null) => {
|
||||||
|
add_is_null.insert(document);
|
||||||
|
insert_numbers_diff(numbers, vec![])?;
|
||||||
|
insert_strings_diff(strings, vec![])?;
|
||||||
|
}
|
||||||
|
(Values { numbers, strings }, Empty) => {
|
||||||
|
add_is_empty.insert(document);
|
||||||
|
insert_numbers_diff(numbers, vec![])?;
|
||||||
|
insert_strings_diff(strings, vec![])?;
|
||||||
|
}
|
||||||
|
(
|
||||||
|
Values { numbers: del_numbers, strings: del_strings },
|
||||||
|
Values { numbers: add_numbers, strings: add_strings },
|
||||||
|
) => {
|
||||||
|
insert_numbers_diff(del_numbers, add_numbers)?;
|
||||||
|
insert_strings_diff(del_strings, add_strings)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -130,14 +221,15 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
let mut facet_exists_docids_writer = create_writer(
|
let mut facet_exists_docids_writer = create_writer(
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
tempfile::tempfile()?,
|
tempfile::tempfile()?,
|
||||||
);
|
);
|
||||||
for (fid, bitmap) in facet_exists_docids.into_iter() {
|
for (fid, (del_bitmap, add_bitmap)) in facet_exists_docids.into_iter() {
|
||||||
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
|
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
|
||||||
facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
|
facet_exists_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
|
||||||
}
|
}
|
||||||
let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?;
|
let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?;
|
||||||
|
|
||||||
@ -146,9 +238,9 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
tempfile::tempfile()?,
|
tempfile::tempfile()?,
|
||||||
);
|
);
|
||||||
for (fid, bitmap) in facet_is_null_docids.into_iter() {
|
for (fid, (del_bitmap, add_bitmap)) in facet_is_null_docids.into_iter() {
|
||||||
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
|
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
|
||||||
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
|
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
|
||||||
}
|
}
|
||||||
let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?;
|
let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?;
|
||||||
|
|
||||||
@ -157,21 +249,156 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
tempfile::tempfile()?,
|
tempfile::tempfile()?,
|
||||||
);
|
);
|
||||||
for (fid, bitmap) in facet_is_empty_docids.into_iter() {
|
for (fid, (del_bitmap, add_bitmap)) in facet_is_empty_docids.into_iter() {
|
||||||
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
|
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
|
||||||
facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
|
facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
|
||||||
}
|
}
|
||||||
let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?;
|
let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?;
|
||||||
|
|
||||||
Ok(ExtractedFacetValues {
|
Ok(ExtractedFacetValues {
|
||||||
docid_fid_facet_numbers_chunk: sorter_into_reader(fid_docid_facet_numbers_sorter, indexer)?,
|
fid_docid_facet_numbers_chunk: sorter_into_reader(fid_docid_facet_numbers_sorter, indexer)?,
|
||||||
docid_fid_facet_strings_chunk: sorter_into_reader(fid_docid_facet_strings_sorter, indexer)?,
|
fid_docid_facet_strings_chunk: sorter_into_reader(fid_docid_facet_strings_sorter, indexer)?,
|
||||||
fid_facet_is_null_docids_chunk: facet_is_null_docids_reader,
|
fid_facet_is_null_docids_chunk: facet_is_null_docids_reader,
|
||||||
fid_facet_is_empty_docids_chunk: facet_is_empty_docids_reader,
|
fid_facet_is_empty_docids_chunk: facet_is_empty_docids_reader,
|
||||||
fid_facet_exists_docids_chunk: facet_exists_docids_reader,
|
fid_facet_exists_docids_chunk: facet_exists_docids_reader,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generates a vector of bytes containing a DelAdd obkv with two bitmaps.
|
||||||
|
fn deladd_obkv_cbo_roaring_bitmaps(
|
||||||
|
buffer: &mut Vec<u8>,
|
||||||
|
del_bitmap: &RoaringBitmap,
|
||||||
|
add_bitmap: &RoaringBitmap,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
buffer.clear();
|
||||||
|
let mut obkv = KvWriterDelAdd::new(buffer);
|
||||||
|
let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(del_bitmap).unwrap();
|
||||||
|
let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(add_bitmap).unwrap();
|
||||||
|
obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?;
|
||||||
|
obkv.insert(DelAdd::Addition, add_bitmap_bytes)?;
|
||||||
|
obkv.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Truncates a string to the biggest valid LMDB key size.
|
||||||
|
fn truncate_string(s: String) -> String {
|
||||||
|
s.char_indices()
|
||||||
|
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
|
||||||
|
.map(|(_, c)| c)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes the diff between both Del and Add numbers and
|
||||||
|
/// only inserts the parts that differ in the sorter.
|
||||||
|
fn insert_numbers_diff<MF>(
|
||||||
|
fid_docid_facet_numbers_sorter: &mut Sorter<MF>,
|
||||||
|
key_buffer: &mut Vec<u8>,
|
||||||
|
mut del_numbers: Vec<f64>,
|
||||||
|
mut add_numbers: Vec<f64>,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
|
||||||
|
{
|
||||||
|
// We sort and dedup the float numbers
|
||||||
|
del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
|
||||||
|
add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
|
||||||
|
del_numbers.dedup_by_key(|f| OrderedFloat(*f));
|
||||||
|
add_numbers.dedup_by_key(|f| OrderedFloat(*f));
|
||||||
|
|
||||||
|
let merged_numbers_iter = itertools::merge_join_by(
|
||||||
|
del_numbers.into_iter().map(OrderedFloat),
|
||||||
|
add_numbers.into_iter().map(OrderedFloat),
|
||||||
|
|del, add| del.cmp(add),
|
||||||
|
);
|
||||||
|
|
||||||
|
// insert facet numbers in sorter
|
||||||
|
for eob in merged_numbers_iter {
|
||||||
|
key_buffer.truncate(TRUNCATE_SIZE);
|
||||||
|
match eob {
|
||||||
|
EitherOrBoth::Both(_, _) => (), // no need to touch anything
|
||||||
|
EitherOrBoth::Left(OrderedFloat(number)) => {
|
||||||
|
if let Some(value_bytes) = f64_into_bytes(number) {
|
||||||
|
key_buffer.extend_from_slice(&value_bytes);
|
||||||
|
key_buffer.extend_from_slice(&number.to_be_bytes());
|
||||||
|
|
||||||
|
// We insert only the Del part of the Obkv to inform
|
||||||
|
// that we only want to remove all those numbers.
|
||||||
|
let mut obkv = KvWriterDelAdd::memory();
|
||||||
|
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
|
||||||
|
let bytes = obkv.into_inner()?;
|
||||||
|
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
EitherOrBoth::Right(OrderedFloat(number)) => {
|
||||||
|
if let Some(value_bytes) = f64_into_bytes(number) {
|
||||||
|
key_buffer.extend_from_slice(&value_bytes);
|
||||||
|
key_buffer.extend_from_slice(&number.to_be_bytes());
|
||||||
|
|
||||||
|
// We insert only the Del part of the Obkv to inform
|
||||||
|
// that we only want to remove all those numbers.
|
||||||
|
let mut obkv = KvWriterDelAdd::memory();
|
||||||
|
obkv.insert(DelAdd::Addition, ().as_bytes())?;
|
||||||
|
let bytes = obkv.into_inner()?;
|
||||||
|
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes the diff between both Del and Add strings and
|
||||||
|
/// only inserts the parts that differ in the sorter.
|
||||||
|
fn insert_strings_diff<MF>(
|
||||||
|
fid_docid_facet_strings_sorter: &mut Sorter<MF>,
|
||||||
|
key_buffer: &mut Vec<u8>,
|
||||||
|
mut del_strings: Vec<(String, String)>,
|
||||||
|
mut add_strings: Vec<(String, String)>,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
|
||||||
|
{
|
||||||
|
// We sort and dedup the normalized and original strings
|
||||||
|
del_strings.sort_unstable();
|
||||||
|
add_strings.sort_unstable();
|
||||||
|
del_strings.dedup();
|
||||||
|
add_strings.dedup();
|
||||||
|
|
||||||
|
let merged_strings_iter = itertools::merge_join_by(
|
||||||
|
del_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|
||||||
|
add_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|
||||||
|
|del, add| del.cmp(add),
|
||||||
|
);
|
||||||
|
|
||||||
|
// insert normalized and original facet string in sorter
|
||||||
|
for eob in merged_strings_iter {
|
||||||
|
key_buffer.truncate(TRUNCATE_SIZE);
|
||||||
|
match eob {
|
||||||
|
EitherOrBoth::Both(_, _) => (), // no need to touch anything
|
||||||
|
EitherOrBoth::Left((normalized, original)) => {
|
||||||
|
let truncated = truncate_string(normalized);
|
||||||
|
key_buffer.extend_from_slice(truncated.as_bytes());
|
||||||
|
|
||||||
|
let mut obkv = KvWriterDelAdd::memory();
|
||||||
|
obkv.insert(DelAdd::Deletion, original)?;
|
||||||
|
let bytes = obkv.into_inner()?;
|
||||||
|
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
|
||||||
|
}
|
||||||
|
EitherOrBoth::Right((normalized, original)) => {
|
||||||
|
let truncated = truncate_string(normalized);
|
||||||
|
key_buffer.extend_from_slice(truncated.as_bytes());
|
||||||
|
|
||||||
|
let mut obkv = KvWriterDelAdd::memory();
|
||||||
|
obkv.insert(DelAdd::Addition, original)?;
|
||||||
|
let bytes = obkv.into_inner()?;
|
||||||
|
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Represent what a document field contains.
|
/// Represent what a document field contains.
|
||||||
enum FilterableValues {
|
enum FilterableValues {
|
||||||
/// Corresponds to the JSON `null` value.
|
/// Corresponds to the JSON `null` value.
|
||||||
@ -182,6 +409,7 @@ enum FilterableValues {
|
|||||||
Values { numbers: Vec<f64>, strings: Vec<(String, String)> },
|
Values { numbers: Vec<f64>, strings: Vec<(String, String)> },
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extracts the facet values of a JSON field.
|
||||||
fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
|
fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
|
||||||
fn inner_extract_facet_values(
|
fn inner_extract_facet_values(
|
||||||
value: &Value,
|
value: &Value,
|
||||||
|
@ -91,9 +91,9 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
let (
|
let (
|
||||||
docid_word_positions_chunks,
|
docid_word_positions_chunks,
|
||||||
(
|
(
|
||||||
docid_fid_facet_numbers_chunks,
|
fid_docid_facet_numbers_chunks,
|
||||||
(
|
(
|
||||||
docid_fid_facet_strings_chunks,
|
fid_docid_facet_strings_chunks,
|
||||||
(
|
(
|
||||||
facet_is_null_docids_chunks,
|
facet_is_null_docids_chunks,
|
||||||
(facet_is_empty_docids_chunks, facet_exists_docids_chunks),
|
(facet_is_empty_docids_chunks, facet_exists_docids_chunks),
|
||||||
@ -201,7 +201,7 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
);
|
);
|
||||||
|
|
||||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
||||||
docid_fid_facet_strings_chunks,
|
fid_docid_facet_strings_chunks,
|
||||||
indexer,
|
indexer,
|
||||||
lmdb_writer_sx.clone(),
|
lmdb_writer_sx.clone(),
|
||||||
extract_facet_string_docids,
|
extract_facet_string_docids,
|
||||||
@ -211,7 +211,7 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
);
|
);
|
||||||
|
|
||||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
||||||
docid_fid_facet_numbers_chunks,
|
fid_docid_facet_numbers_chunks,
|
||||||
indexer,
|
indexer,
|
||||||
lmdb_writer_sx,
|
lmdb_writer_sx,
|
||||||
extract_facet_number_docids,
|
extract_facet_number_docids,
|
||||||
@ -344,7 +344,7 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (Result<_>, Result<_>) =
|
let (docid_word_positions_chunk, fid_docid_facet_values_chunks): (Result<_>, Result<_>) =
|
||||||
rayon::join(
|
rayon::join(
|
||||||
|| {
|
|| {
|
||||||
let (documents_ids, docid_word_positions_chunk, script_language_pair) =
|
let (documents_ids, docid_word_positions_chunk, script_language_pair) =
|
||||||
@ -372,8 +372,8 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
},
|
},
|
||||||
|| {
|
|| {
|
||||||
let ExtractedFacetValues {
|
let ExtractedFacetValues {
|
||||||
docid_fid_facet_numbers_chunk,
|
fid_docid_facet_numbers_chunk,
|
||||||
docid_fid_facet_strings_chunk,
|
fid_docid_facet_strings_chunk,
|
||||||
fid_facet_is_null_docids_chunk,
|
fid_facet_is_null_docids_chunk,
|
||||||
fid_facet_is_empty_docids_chunk,
|
fid_facet_is_empty_docids_chunk,
|
||||||
fid_facet_exists_docids_chunk,
|
fid_facet_exists_docids_chunk,
|
||||||
@ -384,26 +384,26 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
geo_fields_ids,
|
geo_fields_ids,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// send docid_fid_facet_numbers_chunk to DB writer
|
// send fid_docid_facet_numbers_chunk to DB writer
|
||||||
let docid_fid_facet_numbers_chunk =
|
let fid_docid_facet_numbers_chunk =
|
||||||
unsafe { as_cloneable_grenad(&docid_fid_facet_numbers_chunk)? };
|
unsafe { as_cloneable_grenad(&fid_docid_facet_numbers_chunk)? };
|
||||||
|
|
||||||
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers(
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers(
|
||||||
docid_fid_facet_numbers_chunk.clone(),
|
fid_docid_facet_numbers_chunk.clone(),
|
||||||
)));
|
)));
|
||||||
|
|
||||||
// send docid_fid_facet_strings_chunk to DB writer
|
// send fid_docid_facet_strings_chunk to DB writer
|
||||||
let docid_fid_facet_strings_chunk =
|
let fid_docid_facet_strings_chunk =
|
||||||
unsafe { as_cloneable_grenad(&docid_fid_facet_strings_chunk)? };
|
unsafe { as_cloneable_grenad(&fid_docid_facet_strings_chunk)? };
|
||||||
|
|
||||||
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings(
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings(
|
||||||
docid_fid_facet_strings_chunk.clone(),
|
fid_docid_facet_strings_chunk.clone(),
|
||||||
)));
|
)));
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
docid_fid_facet_numbers_chunk,
|
fid_docid_facet_numbers_chunk,
|
||||||
(
|
(
|
||||||
docid_fid_facet_strings_chunk,
|
fid_docid_facet_strings_chunk,
|
||||||
(
|
(
|
||||||
fid_facet_is_null_docids_chunk,
|
fid_facet_is_null_docids_chunk,
|
||||||
(fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk),
|
(fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk),
|
||||||
@ -413,5 +413,5 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?))
|
Ok((docid_word_positions_chunk?, fid_docid_facet_values_chunks?))
|
||||||
}
|
}
|
||||||
|
@ -193,6 +193,7 @@ pub fn obkvs_keep_last_addition_merge_deletions<'a>(
|
|||||||
inner_merge_del_add_obkvs(obkvs, false)
|
inner_merge_del_add_obkvs(obkvs, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Do a union of all the CboRoaringBitmaps in the values.
|
||||||
pub fn merge_cbo_roaring_bitmaps<'a>(
|
pub fn merge_cbo_roaring_bitmaps<'a>(
|
||||||
_key: &[u8],
|
_key: &[u8],
|
||||||
values: &[Cow<'a, [u8]>],
|
values: &[Cow<'a, [u8]>],
|
||||||
@ -205,3 +206,36 @@ pub fn merge_cbo_roaring_bitmaps<'a>(
|
|||||||
Ok(Cow::from(vec))
|
Ok(Cow::from(vec))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv
|
||||||
|
/// separately and outputs a new DelAdd with both unions.
|
||||||
|
pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
|
||||||
|
_key: &[u8],
|
||||||
|
values: &[Cow<'a, [u8]>],
|
||||||
|
) -> Result<Cow<'a, [u8]>> {
|
||||||
|
if values.len() == 1 {
|
||||||
|
Ok(values[0].clone())
|
||||||
|
} else {
|
||||||
|
// Retrieve the bitmaps from both sides
|
||||||
|
let mut del_bitmaps_bytes = Vec::new();
|
||||||
|
let mut add_bitmaps_bytes = Vec::new();
|
||||||
|
for value in values {
|
||||||
|
let obkv = KvReaderDelAdd::new(value);
|
||||||
|
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
|
||||||
|
del_bitmaps_bytes.push(bitmap_bytes);
|
||||||
|
}
|
||||||
|
if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) {
|
||||||
|
add_bitmaps_bytes.push(bitmap_bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut output_deladd_obkv = KvWriterDelAdd::memory();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
CboRoaringBitmapCodec::merge_into(del_bitmaps_bytes, &mut buffer)?;
|
||||||
|
output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?;
|
||||||
|
buffer.clear();
|
||||||
|
CboRoaringBitmapCodec::merge_into(add_bitmaps_bytes, &mut buffer)?;
|
||||||
|
output_deladd_obkv.insert(DelAdd::Addition, &buffer)?;
|
||||||
|
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -14,8 +14,9 @@ pub use grenad_helpers::{
|
|||||||
};
|
};
|
||||||
pub use merge_functions::{
|
pub use merge_functions::{
|
||||||
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
|
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
|
||||||
merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions,
|
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps,
|
||||||
obkvs_merge_additions_and_deletions, serialize_roaring_bitmap, MergeFn,
|
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions,
|
||||||
|
serialize_roaring_bitmap, MergeFn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::MAX_WORD_LENGTH;
|
use crate::MAX_WORD_LENGTH;
|
||||||
|
Loading…
Reference in New Issue
Block a user