diff --git a/milli/src/update/index_documents/cache.rs b/milli/src/update/index_documents/cache.rs index 8c6a99347..5d3664301 100644 --- a/milli/src/update/index_documents/cache.rs +++ b/milli/src/update/index_documents/cache.rs @@ -39,7 +39,24 @@ where del.get_or_insert_with(RoaringBitmap::new).insert(n); Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del(n)) { + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_u32(n)) { + Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), + None => Ok(()), + }, + } + } + + pub fn insert_del( + &mut self, + key: &[u8], + bitmap: RoaringBitmap, + ) -> Result<(), grenad::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del, add: _ }) => { + *del.get_or_insert_with(RoaringBitmap::new) |= bitmap; + Ok(()) + } + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del(bitmap)) { Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), None => Ok(()), }, @@ -52,7 +69,24 @@ where add.get_or_insert_with(RoaringBitmap::new).insert(n); Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add(n)) { + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add_u32(n)) { + Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), + None => Ok(()), + }, + } + } + + pub fn insert_add( + &mut self, + key: &[u8], + bitmap: RoaringBitmap, + ) -> Result<(), grenad::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del: _, add }) => { + *add.get_or_insert_with(RoaringBitmap::new) |= bitmap; + Ok(()) + } + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add(bitmap)) { Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), None => Ok(()), }, @@ -66,7 +100,7 @@ where add.get_or_insert_with(RoaringBitmap::new).insert(n); Ok(()) } - None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_add(n)) { + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_add_u32(n)) { Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), None => Ok(()), }, @@ -121,18 +155,30 @@ pub struct DelAddRoaringBitmap { } impl DelAddRoaringBitmap { - fn new_del_add(n: u32) -> Self { + fn new_del_add(bitmap: RoaringBitmap) -> Self { + DelAddRoaringBitmap { del: Some(bitmap.clone()), add: Some(bitmap) } + } + + fn new_del_add_u32(n: u32) -> Self { DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: Some(RoaringBitmap::from([n])), } } - fn new_del(n: u32) -> Self { + fn new_del(bitmap: RoaringBitmap) -> Self { + DelAddRoaringBitmap { del: Some(bitmap), add: None } + } + + fn new_del_u32(n: u32) -> Self { DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None } } - fn new_add(n: u32) -> Self { + fn new_add(bitmap: RoaringBitmap) -> Self { + DelAddRoaringBitmap { del: None, add: Some(bitmap) } + } + + fn new_add_u32(n: u32) -> Self { DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } } } diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index e70ea7666..dbe52e426 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,6 +1,7 @@ use std::collections::BTreeSet; use std::fs::File; use std::io::{self, BufReader}; +use std::num::NonZeroUsize; use heed::{BytesDecode, BytesEncode}; use obkv::KvReaderU16; @@ -15,6 +16,7 @@ use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap; use crate::update::index_documents::helpers::sorter_into_reader; use crate::update::settings::InnerIndexSettingsDiff; use crate::update::MergeFn; @@ -38,9 +40,8 @@ pub fn extract_word_docids( grenad::Reader>, )> { let max_memory = indexer.max_memory_by_thread(); - let mut conn = REDIS_CLIENT.get_connection().unwrap(); - let mut word_fid_docids_sorter = create_sorter( + let word_fid_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, @@ -48,6 +49,12 @@ pub fn extract_word_docids( indexer.max_nb_chunks, max_memory.map(|m| m / 3), ); + let mut cached_word_fid_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, _>::new( + NonZeroUsize::new(300).unwrap(), + word_fid_docids_sorter, + REDIS_CLIENT.get_connection().unwrap(), + ); + let mut key_buffer = Vec::new(); let mut del_words = BTreeSet::new(); let mut add_words = BTreeSet::new(); @@ -81,8 +88,7 @@ pub fn extract_word_docids( &mut key_buffer, &del_words, &add_words, - &mut word_fid_docids_sorter, - &mut conn, + &mut cached_word_fid_docids_sorter, )?; del_words.clear(); @@ -95,7 +101,7 @@ pub fn extract_word_docids( tempfile::tempfile()?, ); - let mut word_docids_sorter = create_sorter( + let word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, @@ -103,8 +109,13 @@ pub fn extract_word_docids( indexer.max_nb_chunks, max_memory.map(|m| m / 3), ); + let mut cached_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new( + NonZeroUsize::new(100).unwrap(), + word_docids_sorter, + REDIS_CLIENT.get_connection().unwrap(), + ); - let mut exact_word_docids_sorter = create_sorter( + let exact_word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, @@ -112,9 +123,13 @@ pub fn extract_word_docids( indexer.max_nb_chunks, max_memory.map(|m| m / 3), ); + let mut cached_exact_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new( + NonZeroUsize::new(100).unwrap(), + exact_word_docids_sorter, + REDIS_CLIENT.get_connection().unwrap(), + ); - let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; - let mut buffer = Vec::new(); + let mut iter = cached_word_fid_docids_sorter.into_sorter()?.into_stream_merger_iter()?; // NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters. while let Some((key, value)) = iter.next()? { // only keep the value if their is a change to apply in the DB. @@ -127,36 +142,31 @@ pub fn extract_word_docids( // merge all deletions let obkv = KvReaderDelAdd::new(value); + let w = w.as_bytes(); if let Some(value) = obkv.get(DelAdd::Deletion) { let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid); - buffer.clear(); - let mut obkv = KvWriterDelAdd::new(&mut buffer); - obkv.insert(DelAdd::Deletion, value)?; - redis::cmd("INCR").arg(w.as_bytes()).query::(&mut conn).unwrap(); + let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?; if delete_from_exact { - exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + cached_exact_word_docids_sorter.insert_del(w, bitmap)?; } else { - word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + cached_word_docids_sorter.insert_del(w, bitmap)?; } } // merge all additions if let Some(value) = obkv.get(DelAdd::Addition) { let add_in_exact = settings_diff.new.exact_attributes.contains(&fid); - buffer.clear(); - let mut obkv = KvWriterDelAdd::new(&mut buffer); - obkv.insert(DelAdd::Addition, value)?; - redis::cmd("INCR").arg(w.as_bytes()).query::(&mut conn).unwrap(); + let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?; if add_in_exact { - exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + cached_exact_word_docids_sorter.insert_add(w, bitmap)?; } else { - word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + cached_word_docids_sorter.insert_add(w, bitmap)?; } } } Ok(( - sorter_into_reader(word_docids_sorter, indexer)?, - sorter_into_reader(exact_word_docids_sorter, indexer)?, + sorter_into_reader(cached_word_docids_sorter.into_sorter()?, indexer)?, + sorter_into_reader(cached_exact_word_docids_sorter.into_sorter()?, indexer)?, writer_into_reader(word_fid_docids_writer)?, )) } @@ -168,38 +178,34 @@ fn words_into_sorter( key_buffer: &mut Vec, del_words: &BTreeSet>, add_words: &BTreeSet>, - word_fid_docids_sorter: &mut grenad::Sorter, - conn: &mut redis::Connection, + cached_word_fid_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>, ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; - let mut buffer = Vec::new(); for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) { - buffer.clear(); - let mut value_writer = KvWriterDelAdd::new(&mut buffer); let word_bytes = match eob { - Left(word_bytes) => { - value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); - word_bytes - } - Right(word_bytes) => { - value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); - word_bytes - } - Both(word_bytes, _) => { - value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); - value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); - word_bytes - } + Left(word_bytes) => word_bytes, + Right(word_bytes) => word_bytes, + Both(word_bytes, _) => word_bytes, }; key_buffer.clear(); key_buffer.extend_from_slice(word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&fid.to_be_bytes()); - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); - word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; + + match eob { + Left(_) => { + cached_word_fid_docids_sorter.insert_del_u32(key_buffer, document_id)?; + } + Right(_) => { + cached_word_fid_docids_sorter.insert_add_u32(key_buffer, document_id)?; + } + Both(_, _) => { + cached_word_fid_docids_sorter.insert_del_add_u32(key_buffer, document_id)?; + } + } } Ok(())