From 092a383419152fd0008eeebc1d143ec4ca1a8646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 17 Jul 2024 16:27:39 +0200 Subject: [PATCH] Use the sorter cache when extracting the word pair proximity docids --- .../extract_word_pair_proximity_docids.rs | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 23bad9b2e..881a17818 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::fs::File; use std::io::BufReader; +use std::num::NonZeroUsize; use std::{cmp, io}; use obkv::KvReaderU16; @@ -12,7 +13,8 @@ use super::helpers::{ use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE}; -use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd}; +use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap; use crate::update::settings::InnerIndexSettingsDiff; use crate::{DocumentId, Result}; @@ -26,8 +28,6 @@ pub fn extract_word_pair_proximity_docids( indexer: GrenadParameters, settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { - let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); - // early return if the data shouldn't be deleted nor created. if settings_diff.settings_update_only && !settings_diff.reindex_proximities() { let writer = create_writer( @@ -42,15 +42,20 @@ pub fn extract_word_pair_proximity_docids( let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord; let max_memory = indexer.max_memory_by_thread(); - let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE) + let mut cached_word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE) .map(|_| { - create_sorter( + let sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, max_memory.map(|m| m / MAX_DISTANCE as usize), + ); + SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new( + NonZeroUsize::new(100).unwrap(), + sorter, + super::REDIS_CLIENT.get_connection().unwrap(), ) }) .collect(); @@ -79,8 +84,7 @@ pub fn extract_word_pair_proximity_docids( current_document_id.unwrap(), &del_word_pair_proximity, &add_word_pair_proximity, - &mut word_pair_proximity_docids_sorters, - &mut conn, + &mut cached_word_pair_proximity_docids_sorters, )?; del_word_pair_proximity.clear(); add_word_pair_proximity.clear(); @@ -170,8 +174,7 @@ pub fn extract_word_pair_proximity_docids( document_id, &del_word_pair_proximity, &add_word_pair_proximity, - &mut word_pair_proximity_docids_sorters, - &mut conn, + &mut cached_word_pair_proximity_docids_sorters, )?; } { @@ -185,8 +188,8 @@ pub fn extract_word_pair_proximity_docids( tempfile::tempfile()?, ); - for sorter in word_pair_proximity_docids_sorters { - sorter.write_into_stream_writer(&mut writer)?; + for cached_sorter in cached_word_pair_proximity_docids_sorters { + cached_sorter.into_sorter()?.write_into_stream_writer(&mut writer)?; } writer_into_reader(writer) @@ -201,35 +204,24 @@ fn document_word_positions_into_sorter( document_id: DocumentId, del_word_pair_proximity: &BTreeMap<(String, String), u8>, add_word_pair_proximity: &BTreeMap<(String, String), u8>, - word_pair_proximity_docids_sorters: &mut [grenad::Sorter], - conn: &mut redis::Connection, + cached_word_pair_proximity_docids_sorters: &mut [SorterCacheDelAddCboRoaringBitmap< + 20, + MergeFn, + >], ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; - let mut buffer = Vec::new(); let mut key_buffer = Vec::new(); for eob in merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| { d.cmp(a) }) { - buffer.clear(); - let mut value_writer = KvWriterDelAdd::new(&mut buffer); let ((w1, w2), prox) = match eob { - Left(key_value) => { - value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); - key_value - } - Right(key_value) => { - value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); - key_value - } - Both(key_value, _) => { - value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); - value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); - key_value - } + Left(key_value) => key_value, + Right(key_value) => key_value, + Both(key_value, _) => key_value, }; key_buffer.clear(); @@ -238,9 +230,20 @@ fn document_word_positions_into_sorter( key_buffer.push(0); key_buffer.extend_from_slice(w2.as_bytes()); - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); - word_pair_proximity_docids_sorters[*prox as usize - 1] - .insert(&key_buffer, value_writer.into_inner().unwrap())?; + match eob { + Left(_) => { + cached_word_pair_proximity_docids_sorters[*prox as usize - 1] + .insert_del_u32(&key_buffer, document_id)?; + } + Right(_) => { + cached_word_pair_proximity_docids_sorters[*prox as usize - 1] + .insert_add_u32(&key_buffer, document_id)?; + } + Both(_, _) => { + cached_word_pair_proximity_docids_sorters[*prox as usize - 1] + .insert_del_add_u32(&key_buffer, document_id)?; + } + } } Ok(())