From 48026aa75c6c12257a83e40b65accbec1712e455 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 13 Feb 2024 15:14:03 +0100 Subject: [PATCH] fix PR comments --- milli/src/update/facet/mod.rs | 188 ++++++++++-------- .../helpers/merge_functions.rs | 21 -- .../src/update/index_documents/helpers/mod.rs | 8 +- milli/src/update/index_documents/mod.rs | 6 +- .../src/update/index_documents/typed_chunk.rs | 13 +- milli/src/update/mod.rs | 5 +- 6 files changed, 117 insertions(+), 124 deletions(-) diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index ef40e3469..ca5a21ce2 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -173,98 +173,110 @@ impl<'i> FacetsUpdate<'i> { incremental_update.execute(wtxn)?; } - if let Some(normalized_delta_data) = self.normalized_delta_data { - let mut iter = normalized_delta_data.into_stream_merger_iter()?; - while let Some((key_bytes, delta_bytes)) = iter.next()? { - let deladd_reader = KvReaderDelAdd::new(delta_bytes); - - let database_set = self - .index - .facet_id_normalized_string_strings - .remap_key_type::() - .get(wtxn, key_bytes)? - .unwrap_or_default(); - - let add_set = deladd_reader - .get(DelAdd::Addition) - .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) - .unwrap_or_default(); - - let del_set = match deladd_reader - .get(DelAdd::Deletion) - .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) - { - Some(del_set) => { - let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap(); - let field_id = FieldId::from_be_bytes(field_id_bytes); - let mut set = BTreeSet::new(); - for facet in del_set { - let key = - FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() }; - // Check if the referenced value doesn't exist anymore before deleting it. - if self.index.facet_id_string_docids.get(wtxn, &key)?.remap_data::().is_none() { - set.insert(facet); - } - } - set - } - None => BTreeSet::new(), - }; - - let set: BTreeSet<_> = - database_set.difference(&del_set).chain(add_set.iter()).cloned().collect(); - - if set.is_empty() { - self.index - .facet_id_normalized_string_strings - .remap_key_type::() - .delete(wtxn, key_bytes)?; - } else { - self.index - .facet_id_normalized_string_strings - .remap_key_type::() - .put(wtxn, key_bytes, &set)?; - } - } - - // We clear the FST of normalized-for-search to compute everything from scratch. - self.index.facet_id_string_fst.clear(wtxn)?; - // We compute one FST by string facet - let mut text_fsts = vec![]; - let mut current_fst: Option<(u16, fst::SetBuilder>)> = None; - let database = - self.index.facet_id_normalized_string_strings.remap_data_type::(); - for result in database.iter(wtxn)? { - let ((field_id, normalized_facet), _) = result?; - current_fst = match current_fst.take() { - Some((fid, fst_builder)) if fid != field_id => { - let fst = fst_builder.into_set(); - text_fsts.push((fid, fst)); - Some((field_id, fst::SetBuilder::memory())) - } - Some((field_id, fst_builder)) => Some((field_id, fst_builder)), - None => Some((field_id, fst::SetBuilder::memory())), - }; - - if let Some((_, fst_builder)) = current_fst.as_mut() { - fst_builder.insert(normalized_facet)?; - } - } - - if let Some((field_id, fst_builder)) = current_fst { - let fst = fst_builder.into_set(); - text_fsts.push((field_id, fst)); - } - - // We write those FSTs in LMDB now - for (field_id, fst) in text_fsts { - self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?; - } + match self.normalized_delta_data { + Some(data) => index_facet_search(wtxn, data, self.index), + None => Ok(()), } - Ok(()) } } +fn index_facet_search( + wtxn: &mut heed::RwTxn, + normalized_delta_data: Merger, MergeFn>, + index: &Index, +) -> Result<()> { + let mut iter = normalized_delta_data.into_stream_merger_iter()?; + while let Some((key_bytes, delta_bytes)) = iter.next()? { + let deladd_reader = KvReaderDelAdd::new(delta_bytes); + + let database_set = index + .facet_id_normalized_string_strings + .remap_key_type::() + .get(wtxn, key_bytes)? + .unwrap_or_default(); + + let add_set = deladd_reader + .get(DelAdd::Addition) + .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) + .unwrap_or_default(); + + let del_set = match deladd_reader + .get(DelAdd::Deletion) + .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) + { + Some(del_set) => { + let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap(); + let field_id = FieldId::from_be_bytes(field_id_bytes); + let mut set = BTreeSet::new(); + for facet in del_set { + let key = FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() }; + // Check if the referenced value doesn't exist anymore before deleting it. + if index + .facet_id_string_docids + .remap_data_type::() + .get(wtxn, &key)? + .is_none() + { + set.insert(facet); + } + } + set + } + None => BTreeSet::new(), + }; + + let set: BTreeSet<_> = + database_set.difference(&del_set).chain(add_set.iter()).cloned().collect(); + + if set.is_empty() { + index + .facet_id_normalized_string_strings + .remap_key_type::() + .delete(wtxn, key_bytes)?; + } else { + index + .facet_id_normalized_string_strings + .remap_key_type::() + .put(wtxn, key_bytes, &set)?; + } + } + + // We clear the FST of normalized-for-search to compute everything from scratch. + index.facet_id_string_fst.clear(wtxn)?; + // We compute one FST by string facet + let mut text_fsts = vec![]; + let mut current_fst: Option<(u16, fst::SetBuilder>)> = None; + let database = index.facet_id_normalized_string_strings.remap_data_type::(); + for result in database.iter(wtxn)? { + let ((field_id, normalized_facet), _) = result?; + current_fst = match current_fst.take() { + Some((fid, fst_builder)) if fid != field_id => { + let fst = fst_builder.into_set(); + text_fsts.push((fid, fst)); + Some((field_id, fst::SetBuilder::memory())) + } + Some((field_id, fst_builder)) => Some((field_id, fst_builder)), + None => Some((field_id, fst::SetBuilder::memory())), + }; + + if let Some((_, fst_builder)) = current_fst.as_mut() { + fst_builder.insert(normalized_facet)?; + } + } + + if let Some((field_id, fst_builder)) = current_fst { + let fst = fst_builder.into_set(); + text_fsts.push((field_id, fst)); + } + + // We write those FSTs in LMDB now + for (field_id, fst) in text_fsts { + index.facet_id_string_fst.put(wtxn, &field_id, &fst)?; + } + + Ok(()) +} + #[cfg(test)] pub(crate) mod test_helpers { use std::cell::Cell; diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 7f5cc5dcd..a265d152f 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -223,27 +223,6 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>( )?) } -pub fn merge_btreeset_string<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result> { - if values.len() == 1 { - Ok(values[0].clone()) - } else { - // TODO improve the perf by using a `#[borrow] Cow`. - let strings: BTreeSet = values - .iter() - .map(AsRef::as_ref) - .map(serde_json::from_slice::>) - .map(StdResult::unwrap) - .reduce(|mut current, new| { - for x in new { - current.insert(x); - } - current - }) - .unwrap(); - Ok(Cow::Owned(serde_json::to_vec(&strings).unwrap())) - } -} - /// Do a union of BtreeSet on both sides of a DelAdd obkv /// separately and outputs a new DelAdd with both unions. pub fn merge_deladd_btreeset_string<'a>( diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index b60f7be7d..5d8f16fae 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -13,10 +13,10 @@ pub use grenad_helpers::{ GrenadParameters, }; pub use merge_functions::{ - keep_first, keep_latest_obkv, merge_btreeset_string, merge_cbo_roaring_bitmaps, - merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps, - obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, MergeFn, + keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_deladd_btreeset_string, + merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, + merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions, + obkvs_merge_additions_and_deletions, MergeFn, }; use crate::MAX_WORD_LENGTH; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index de797541d..61ca1a024 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -25,9 +25,9 @@ use self::enrich::enrich_documents_batch; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; pub use self::helpers::{ as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, - fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, - merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, - merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn, + fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps, + valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index ef9b6707d..07d77c68f 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -41,7 +41,7 @@ impl ChunkAccumulator { pub fn pop_longest(&mut self) -> Option> { match self.inner.iter().max_by_key(|v| v.len()) { Some(left) => { - let position = self.inner.iter().position(|right| left == right); + let position = self.inner.iter().position(|right| left.len() == right.len()); position.map(|p| self.inner.remove(p)).filter(|v| !v.is_empty()) } None => None, @@ -49,7 +49,11 @@ impl ChunkAccumulator { } pub fn insert(&mut self, chunk: TypedChunk) { - match self.inner.iter().position(|right| Some(&chunk) == right.first()) { + match self + .inner + .iter() + .position(|right| right.first().map_or(false, |right| chunk.is_batchable_with(right))) + { Some(position) => { let v = self.inner.get_mut(position).unwrap(); v.push(chunk); @@ -87,8 +91,8 @@ pub(crate) enum TypedChunk { ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>), } -impl PartialEq for TypedChunk { - fn eq(&self, other: &Self) -> bool { +impl TypedChunk { + fn is_batchable_with(&self, other: &Self) -> bool { use TypedChunk::*; match (self, other) { (FieldIdDocidFacetStrings(_), FieldIdDocidFacetStrings(_)) @@ -113,7 +117,6 @@ impl PartialEq for TypedChunk { } } } -impl Eq for TypedChunk {} impl TypedChunk { pub fn to_debug_string(&self) -> String { diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 66c52a52f..195b95d1e 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -3,9 +3,8 @@ pub use self::clear_documents::ClearDocuments; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::index_documents::{ - merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, - DocumentAdditionResult, DocumentId, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, - MergeFn, + merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, DocumentAdditionResult, DocumentId, + IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, MergeFn, }; pub use self::indexer_config::IndexerConfig; pub use self::settings::{validate_embedding_settings, Setting, Settings};