Introduce a dedicated function to write proximity entries in database

This commit is contained in:
Clément Renault 2024-05-29 15:17:51 +02:00 committed by ManyTheFish
parent fad4675abe
commit 0f578348f1

View File

@ -7,7 +7,7 @@ use bytemuck::allocation::pod_collect_to_vec;
use charabia::{Language, Script}; use charabia::{Language, Script};
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::types::Bytes; use heed::types::Bytes;
use heed::RwTxn; use heed::{BytesDecode, RwTxn};
use obkv::{KvReader, KvWriter}; use obkv::{KvReader, KvWriter};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@ -20,13 +20,16 @@ use super::MergeFn;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::index::db_name::DOCUMENTS; use crate::index::db_name::DOCUMENTS;
use crate::proximity::MAX_DISTANCE;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate; use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::{ use crate::update::index_documents::helpers::{
as_cloneable_grenad, keep_latest_obkv, try_split_array_at, as_cloneable_grenad, keep_latest_obkv, try_split_array_at,
}; };
use crate::update::settings::InnerIndexSettingsDiff;
use crate::{ use crate::{
lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, InternalError, Result, SerializationError, lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
Result, SerializationError, U8StrStrCodec,
}; };
/// This struct accumulates and group the TypedChunks /// This struct accumulates and group the TypedChunks
@ -122,7 +125,6 @@ impl TypedChunk {
/// Return new documents seen. /// Return new documents seen.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
pub(crate) fn write_typed_chunk_into_index( pub(crate) fn write_typed_chunk_into_index(
typed_chunks: Vec<TypedChunk>,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
index: &Index, index: &Index,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
@ -487,13 +489,22 @@ pub(crate) fn write_typed_chunk_into_index(
} }
let merger = builder.build(); let merger = builder.build();
write_entries_into_database( if settings_diff.only_additional_fields().is_some() {
merger, write_proximity_entries_into_database_additional_searchables(
&index.word_pair_proximity_docids, merger,
wtxn, &index.word_pair_proximity_docids,
deladd_serialize_add_side, wtxn,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, )?;
)?; } else {
write_entries_into_database(
merger,
&index.word_pair_proximity_docids,
wtxn,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
}
is_merged_database = true; is_merged_database = true;
} }
TypedChunk::FieldIdDocidFacetNumbers(_) => { TypedChunk::FieldIdDocidFacetNumbers(_) => {
@ -832,3 +843,51 @@ where
} }
Ok(()) Ok(())
} }
/// Akin to the `write_entries_into_database` function but specialized
/// for the case when we only index additional searchable fields only.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
fn write_proximity_entries_into_database_additional_searchables<R>(
merger: Merger<R, MergeFn>,
database: &heed::Database<U8StrStrCodec, CboRoaringBitmapCodec>,
wtxn: &mut RwTxn,
) -> Result<()>
where
R: io::Read + io::Seek,
{
let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
if valid_lmdb_key(key) {
let (proximity_to_insert, word1, word2) =
U8StrStrCodec::bytes_decode(key).map_err(heed::Error::Decoding)?;
let data_to_insert = match KvReaderDelAdd::new(value).get(DelAdd::Addition) {
Some(value) => {
CboRoaringBitmapCodec::bytes_decode(value).map_err(heed::Error::Decoding)?
}
None => continue,
};
let mut data_to_remove = RoaringBitmap::new();
for prox in 1..(MAX_DISTANCE as u8) {
let key = (prox, word1, word2);
let database_value = database.get(wtxn, &key)?.unwrap_or_default();
let value = if prox == proximity_to_insert {
// Proximity that should be changed.
// Union values and remove lower proximity data
(&database_value | &data_to_insert) - &data_to_remove
} else {
// Remove lower proximity data
&database_value - &data_to_remove
};
// add the current data in data_to_remove for the next proximities
data_to_remove |= &value;
if database_value != value {
database.put(wtxn, &key, &value)?;
}
}
}
}
Ok(())
}