Compute word docids prefix cache

This commit is contained in:
ManyTheFish 2023-11-08 16:41:26 +01:00
parent 688266c83e
commit 70ce40828c
7 changed files with 116 additions and 53 deletions

View File

@ -102,3 +102,17 @@ pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool { pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool {
del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition) del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition)
} }
/// A function that extracts and returns the Add side of a DelAdd obkv.
/// This is useful when there are no previous value in the database and
/// therefore we don't need to do a diff with what's already there.
///
/// If there is no Add side we currently write an empty buffer
/// which is a valid CboRoaringBitmap.
#[allow(clippy::ptr_arg)] // required to avoid signature mismatch
pub fn deladd_serialize_add_side<'a>(
obkv: &'a [u8],
_buffer: &mut Vec<u8>,
) -> crate::Result<&'a [u8]> {
Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default())
}

View File

@ -9,6 +9,7 @@ use log::debug;
use super::{ClonableMmap, MergeFn}; use super::{ClonableMmap, MergeFn};
use crate::error::InternalError; use crate::error::InternalError;
use crate::update::index_documents::valid_lmdb_key;
use crate::Result; use crate::Result;
pub type CursorClonableMmap = io::Cursor<ClonableMmap>; pub type CursorClonableMmap = io::Cursor<ClonableMmap>;
@ -282,6 +283,49 @@ pub fn sorter_into_lmdb_database(
Ok(()) Ok(())
} }
/// Write provided sorter in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database.
pub fn write_sorter_into_database<K, V, FS, FM>(
sorter: Sorter<MergeFn>,
database: &heed::Database<K, V>,
wtxn: &mut heed::RwTxn,
index_is_empty: bool,
serialize_value: FS,
merge_values: FM,
) -> Result<()>
where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
{
puffin::profile_function!();
let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>();
let mut merger_iter = sorter.into_stream_merger_iter()?;
while let Some((key, value)) = merger_iter.next()? {
if valid_lmdb_key(key) {
buffer.clear();
let value = if index_is_empty {
Some(serialize_value(value, &mut buffer)?)
} else {
match database.get(wtxn, key)? {
Some(prev_value) => merge_values(value, prev_value, &mut buffer)?,
None => Some(serialize_value(value, &mut buffer)?),
}
};
match value {
Some(value) => database.put(wtxn, key, value)?,
None => {
database.delete(wtxn, key)?;
}
}
}
}
Ok(())
}
/// Used when trying to merge readers, but you don't actually care about the values. /// Used when trying to merge readers, but you don't actually care about the values.
pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> { pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
Ok(Cow::Owned(Vec::new())) Ok(Cow::Owned(Vec::new()))

View File

@ -239,3 +239,19 @@ pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
} }
} }
/// A function that merges a DelAdd of bitmao into an already existing bitmap.
///
/// The first argument is the DelAdd obkv of CboRoaringBitmaps and
/// the second one is the CboRoaringBitmap to merge into.
pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>(
deladd_obkv: &[u8],
previous: &[u8],
buffer: &'a mut Vec<u8>,
) -> Result<Option<&'a [u8]>> {
Ok(CboRoaringBitmapCodec::merge_deladd_into(
KvReaderDelAdd::new(deladd_obkv),
previous,
buffer,
)?)
}

View File

@ -9,12 +9,13 @@ pub use clonable_mmap::{ClonableMmap, CursorClonableMmap};
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
pub use grenad_helpers::{ pub use grenad_helpers::{
as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks,
merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, writer_into_reader, merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, write_sorter_into_database,
GrenadParameters, MergeableReader, writer_into_reader, GrenadParameters, MergeableReader,
}; };
pub use merge_functions::{ pub use merge_functions::{
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string, concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps,
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions,
serialize_roaring_bitmap, MergeFn, serialize_roaring_bitmap, MergeFn,
}; };

View File

@ -26,8 +26,10 @@ pub use self::enrich::{
}; };
pub use self::helpers::{ pub use self::helpers::{
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps,
sorter_into_lmdb_database, valid_lmdb_key, writer_into_reader, ClonableMmap, MergeFn, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key, write_sorter_into_database,
writer_into_reader, ClonableMmap, MergeFn,
}; };
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};

View File

@ -13,7 +13,10 @@ use obkv::{KvReader, KvWriter};
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::helpers::{self, merge_ignore_values, valid_lmdb_key, CursorClonableMmap}; use super::helpers::{
self, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values,
valid_lmdb_key, CursorClonableMmap,
};
use super::{ClonableMmap, MergeFn}; use super::{ClonableMmap, MergeFn};
use crate::distance::NDotProductPoint; use crate::distance::NDotProductPoint;
use crate::error::UserError; use crate::error::UserError;
@ -21,12 +24,11 @@ use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::index::db_name::DOCUMENTS; use crate::index::db_name::DOCUMENTS;
use crate::index::Hnsw; use crate::index::Hnsw;
use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate; use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at}; use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at};
use crate::{ use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, Result, lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, Result, SerializationError, BEU32,
SerializationError, BEU32,
}; };
pub(crate) enum TypedChunk { pub(crate) enum TypedChunk {
@ -186,7 +188,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
is_merged_database = true; is_merged_database = true;
} }
@ -202,7 +204,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?; let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?;
@ -212,7 +214,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?; let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
@ -222,7 +224,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
// create fst from word docids // create fst from word docids
@ -244,7 +246,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
is_merged_database = true; is_merged_database = true;
} }
@ -265,7 +267,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
is_merged_database = true; is_merged_database = true;
} }
@ -276,7 +278,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
is_merged_database = true; is_merged_database = true;
} }
@ -287,7 +289,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
is_merged_database = true; is_merged_database = true;
} }
@ -298,7 +300,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn, wtxn,
index_is_empty, index_is_empty,
deladd_serialize_add_side, deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
is_merged_database = true; is_merged_database = true;
} }
@ -495,33 +497,6 @@ fn merge_word_docids_reader_into_fst(
Ok(builder.into_set()) Ok(builder.into_set())
} }
/// A function that extracts and returns the Add side of a DelAdd obkv.
/// This is useful when there are no previous value in the database and
/// therefore we don't need to do a diff with what's already there.
///
/// If there is no Add side we currently write an empty buffer
/// which is a valid CboRoaringBitmap.
#[allow(clippy::ptr_arg)] // required to avoid signature mismatch
fn deladd_serialize_add_side<'a>(obkv: &'a [u8], _buffer: &mut Vec<u8>) -> Result<&'a [u8]> {
Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default())
}
/// A function that merges a DelAdd of bitmao into an already existing bitmap.
///
/// The first argument is the DelAdd obkv of CboRoaringBitmaps and
/// the second one is the CboRoaringBitmap to merge into.
fn merge_deladd_cbo_roaring_bitmaps<'a>(
deladd_obkv: &[u8],
previous: &[u8],
buffer: &'a mut Vec<u8>,
) -> Result<Option<&'a [u8]>> {
Ok(CboRoaringBitmapCodec::merge_deladd_into(
KvReaderDelAdd::new(deladd_obkv),
previous,
buffer,
)?)
}
/// Write provided entries in database using serialize_value function. /// Write provided entries in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database. /// merge_values function is used if an entry already exist in the database.
fn write_entries_into_database<R, K, V, FS, FM>( fn write_entries_into_database<R, K, V, FS, FM>(

View File

@ -4,9 +4,11 @@ use grenad::CompressionType;
use heed::types::{ByteSlice, Str}; use heed::types::{ByteSlice, Str};
use heed::Database; use heed::Database;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{ use crate::update::index_documents::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key, create_sorter, merge_deladd_cbo_roaring_bitmaps,
CursorClonableMmap, MergeFn, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
write_sorter_into_database, CursorClonableMmap, MergeFn,
}; };
use crate::{CboRoaringBitmapCodec, Result}; use crate::{CboRoaringBitmapCodec, Result};
@ -51,7 +53,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
// and write into it at the same time, therefore we write into another file. // and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter( let mut prefix_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
self.chunk_compression_type, self.chunk_compression_type,
self.chunk_compression_level, self.chunk_compression_level,
self.max_nb_chunks, self.max_nb_chunks,
@ -92,11 +94,16 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
// We fetch the docids associated to the newly added word prefix fst only. // We fetch the docids associated to the newly added word prefix fst only.
let db = self.word_docids.remap_data_type::<ByteSlice>(); let db = self.word_docids.remap_data_type::<ByteSlice>();
let mut buffer = Vec::new();
for prefix in new_prefix_fst_words { for prefix in new_prefix_fst_words {
let prefix = std::str::from_utf8(prefix.as_bytes())?; let prefix = std::str::from_utf8(prefix.as_bytes())?;
for result in db.prefix_iter(self.wtxn, prefix)? { for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?; let (_word, data) = result?;
prefix_docids_sorter.insert(prefix, data)?; buffer.clear();
let mut writer = KvWriterDelAdd::new(&mut buffer);
writer.insert(DelAdd::Addition, data)?;
prefix_docids_sorter.insert(prefix, writer.into_inner()?)?;
} }
} }
@ -110,12 +117,16 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
drop(iter); drop(iter);
let database_is_empty = self.word_prefix_docids.is_empty(self.wtxn)?;
// We finally write the word prefix docids into the LMDB database. // We finally write the word prefix docids into the LMDB database.
sorter_into_lmdb_database( write_sorter_into_database(
self.wtxn,
*self.word_prefix_docids.as_polymorph(),
prefix_docids_sorter, prefix_docids_sorter,
merge_cbo_roaring_bitmaps, &self.word_prefix_docids,
self.wtxn,
database_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?; )?;
Ok(()) Ok(())