From 1165ba217197f7abae6ee4e9d9b159bc09cdf275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Wed, 21 Sep 2022 15:53:39 +0200 Subject: [PATCH] Make facet deletion incremental --- milli/src/update/delete_documents.rs | 92 ++++++++------------------- milli/src/update/facet/bulk.rs | 19 ++++-- milli/src/update/facet/delete.rs | 92 +++++++++++++++++++++++++++ milli/src/update/facet/incremental.rs | 48 +++++++------- milli/src/update/facet/mod.rs | 31 +++++++-- 5 files changed, 182 insertions(+), 100 deletions(-) create mode 100644 milli/src/update/facet/delete.rs diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index a56a61026..de2f4480c 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -1,4 +1,5 @@ use std::collections::btree_map::Entry; +use std::collections::{HashMap, HashSet}; use fst::IntoStreamer; use heed::types::{ByteSlice, DecodeIgnore, Str}; @@ -8,17 +9,16 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use time::OffsetDateTime; -use super::{ClearDocuments, FacetsUpdateBulk}; +use super::facet::delete::FacetsDelete; +use super::ClearDocuments; use crate::error::{InternalError, UserError}; use crate::facet::FacetType; -use crate::heed_codec::facet::{ - ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetIgnoreCodec, -}; +use crate::heed_codec::facet::FieldDocIdFacetCodec; use crate::heed_codec::CboRoaringBitmapCodec; use crate::index::{db_name, main_key}; use crate::{ - ExternalDocumentsIds, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, - RoaringBitmapCodec, SmallString32, BEU32, + ExternalDocumentsIds, FieldId, FieldIdMapMissingEntry, Index, Result, RoaringBitmapCodec, + SmallString32, BEU32, }; pub struct DeleteDocuments<'t, 'u, 'i> { @@ -444,13 +444,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { } for facet_type in [FacetType::Number, FacetType::String] { - remove_docids_from_facet_id_docids( - self.wtxn, - self.index, - &self.to_delete_docids, - fields_ids_map.clone(), - facet_type, - )?; + let mut affected_facet_values = HashMap::new(); for field_id in self.index.faceted_fields_ids(self.wtxn)? { // Remove docids from the number faceted documents ids let mut docids = @@ -458,14 +452,24 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { docids -= &self.to_delete_docids; self.index.put_faceted_documents_ids(self.wtxn, field_id, facet_type, &docids)?; - remove_docids_from_field_id_docid_facet_value( + let facet_values = remove_docids_from_field_id_docid_facet_value( &self.index, self.wtxn, facet_type, field_id, &self.to_delete_docids, )?; + if !facet_values.is_empty() { + affected_facet_values.insert(field_id, facet_values); + } } + FacetsDelete::new( + self.index, + facet_type, + affected_facet_values, + &self.to_delete_docids, + ) + .execute(self.wtxn)?; } // We delete the documents ids that are under the facet field id values. @@ -546,7 +550,7 @@ fn remove_docids_from_field_id_docid_facet_value<'i, 'a>( facet_type: FacetType, field_id: FieldId, to_remove: &RoaringBitmap, -) -> heed::Result<()> { +) -> heed::Result>> { let db = match facet_type { FacetType::String => { index.field_id_docid_facet_strings.remap_types::() @@ -555,19 +559,23 @@ fn remove_docids_from_field_id_docid_facet_value<'i, 'a>( index.field_id_docid_facet_f64s.remap_types::() } }; + let mut all_affected_facet_values = HashSet::default(); let mut iter = db .prefix_iter_mut(wtxn, &field_id.to_be_bytes())? - .remap_key_type::(); + .remap_key_type::>(); while let Some(result) = iter.next() { - let ((_, docid, _), _) = result?; + let ((_, docid, facet_value), _) = result?; if to_remove.contains(docid) { + if !all_affected_facet_values.contains(facet_value) { + all_affected_facet_values.insert(facet_value.to_owned()); + } // safety: we don't keep references from inside the LMDB database. unsafe { iter.del_current()? }; } } - Ok(()) + Ok(all_affected_facet_values) } fn remove_docids_from_facet_id_exists_docids<'a, C>( @@ -595,54 +603,6 @@ where Ok(()) } -fn remove_docids_from_facet_id_docids<'a>( - wtxn: &'a mut heed::RwTxn, - index: &Index, - to_remove: &RoaringBitmap, - fields_ids_map: FieldsIdsMap, - facet_type: FacetType, -) -> Result<()> { - let db = match facet_type { - FacetType::String => { - index.facet_id_string_docids.remap_key_type::>() - } - FacetType::Number => { - index.facet_id_f64_docids.remap_key_type::>() - } - }; - let mut modified = false; - for field_id in fields_ids_map.ids() { - let mut level0_prefix = vec![]; - level0_prefix.extend_from_slice(&field_id.to_be_bytes()); - level0_prefix.push(0); - let mut iter = db - .as_polymorph() - .prefix_iter_mut::<_, ByteSlice, FacetGroupValueCodec>(wtxn, &level0_prefix)?; - - while let Some(result) = iter.next() { - let (bytes, mut value) = result?; - let previous_len = value.bitmap.len(); - value.bitmap -= to_remove; - if value.bitmap.is_empty() { - // safety: we don't keep references from inside the LMDB database. - unsafe { iter.del_current()? }; - modified = true; - } else if value.bitmap.len() != previous_len { - let bytes = bytes.to_owned(); - // safety: we don't keep references from inside the LMDB database. - unsafe { iter.put_current(&bytes, &value)? }; - modified = true; - } - } - } - if !modified { - return Ok(()); - } - let builder = FacetsUpdateBulk::new_not_updating_level_0(index, facet_type); - builder.execute(wtxn)?; - - Ok(()) -} #[cfg(test)] mod tests { diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index e82af5d66..d3db0a0fa 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -29,6 +29,7 @@ pub struct FacetsUpdateBulk<'i> { group_size: u8, min_level_size: u8, facet_type: FacetType, + field_ids: Vec, // None if level 0 does not need to be updated new_data: Option>, } @@ -36,20 +37,30 @@ pub struct FacetsUpdateBulk<'i> { impl<'i> FacetsUpdateBulk<'i> { pub fn new( index: &'i Index, + field_ids: Vec, facet_type: FacetType, new_data: grenad::Reader, group_size: u8, min_level_size: u8, ) -> FacetsUpdateBulk<'i> { - FacetsUpdateBulk { index, group_size, min_level_size, facet_type, new_data: Some(new_data) } + FacetsUpdateBulk { + index, + field_ids, + group_size, + min_level_size, + facet_type, + new_data: Some(new_data), + } } pub fn new_not_updating_level_0( index: &'i Index, + field_ids: Vec, facet_type: FacetType, ) -> FacetsUpdateBulk<'i> { FacetsUpdateBulk { index, + field_ids, group_size: FACET_GROUP_SIZE, min_level_size: FACET_MIN_LEVEL_SIZE, facet_type, @@ -61,7 +72,7 @@ impl<'i> FacetsUpdateBulk<'i> { pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { debug!("Computing and writing the facet values levels docids into LMDB on disk..."); - let Self { index, group_size, min_level_size, facet_type, new_data } = self; + let Self { index, field_ids, group_size, min_level_size, facet_type, new_data } = self; let db = match facet_type { FacetType::String => { @@ -76,8 +87,6 @@ impl<'i> FacetsUpdateBulk<'i> { let inner = FacetsUpdateBulkInner { db, new_data, group_size, min_level_size }; - let field_ids = index.faceted_fields_ids(wtxn)?.iter().copied().collect::>(); - inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| { index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?; Ok(()) @@ -405,7 +414,7 @@ mod tests { index.verify_structure_validity(&wtxn, 1); // delete all the elements for the facet id 0 for i in 0..100u32 { - index.delete(&mut wtxn, 0, &(i as f64), i); + index.delete_single_docid(&mut wtxn, 0, &(i as f64), i); } index.verify_structure_validity(&wtxn, 0); index.verify_structure_validity(&wtxn, 1); diff --git a/milli/src/update/facet/delete.rs b/milli/src/update/facet/delete.rs new file mode 100644 index 000000000..efe1d800a --- /dev/null +++ b/milli/src/update/facet/delete.rs @@ -0,0 +1,92 @@ +use super::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE}; +use crate::{ + facet::FacetType, + heed_codec::facet::{ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec}, + update::{FacetsUpdateBulk, FacetsUpdateIncrementalInner}, + FieldId, Index, Result, +}; +use heed::RwTxn; +use roaring::RoaringBitmap; +use std::collections::{HashMap, HashSet}; + +pub struct FacetsDelete<'i, 'b> { + index: &'i Index, + database: heed::Database, FacetGroupValueCodec>, + facet_type: FacetType, + affected_facet_values: HashMap>>, + docids_to_delete: &'b RoaringBitmap, + group_size: u8, + max_group_size: u8, + min_level_size: u8, +} +impl<'i, 'b> FacetsDelete<'i, 'b> { + pub fn new( + index: &'i Index, + facet_type: FacetType, + affected_facet_values: HashMap>>, + docids_to_delete: &'b RoaringBitmap, + ) -> Self { + let database = match facet_type { + FacetType::String => { + index.facet_id_string_docids.remap_key_type::>() + } + FacetType::Number => { + index.facet_id_f64_docids.remap_key_type::>() + } + }; + Self { + index, + database, + facet_type, + affected_facet_values, + docids_to_delete, + group_size: FACET_GROUP_SIZE, + max_group_size: FACET_MAX_GROUP_SIZE, + min_level_size: FACET_MIN_LEVEL_SIZE, + } + } + + pub fn execute(self, wtxn: &mut RwTxn) -> Result<()> { + for (field_id, affected_facet_values) in self.affected_facet_values { + if affected_facet_values.len() >= (self.database.len(wtxn)? / 50) { + // Bulk delete + let mut modified = false; + + for facet_value in affected_facet_values { + let key = + FacetGroupKey { field_id, level: 0, left_bound: facet_value.as_slice() }; + let mut old = self.database.get(wtxn, &key)?.unwrap(); + let previous_len = old.bitmap.len(); + old.bitmap -= self.docids_to_delete; + if old.bitmap.is_empty() { + modified = true; + self.database.delete(wtxn, &key)?; + } else if old.bitmap.len() != previous_len { + modified = true; + self.database.put(wtxn, &key, &old)?; + } + } + if modified { + let builder = FacetsUpdateBulk::new_not_updating_level_0( + self.index, + vec![field_id], + self.facet_type, + ); + builder.execute(wtxn)?; + } + } else { + // Incremental + let inc = FacetsUpdateIncrementalInner { + db: self.database, + group_size: self.group_size, + min_level_size: self.min_level_size, + max_group_size: self.max_group_size, + }; + for facet_value in affected_facet_values { + inc.delete(wtxn, field_id, facet_value.as_slice(), &self.docids_to_delete)?; + } + } + } + Ok(()) + } +} diff --git a/milli/src/update/facet/incremental.rs b/milli/src/update/facet/incremental.rs index c2115aee5..895713d43 100644 --- a/milli/src/update/facet/incremental.rs +++ b/milli/src/update/facet/incremental.rs @@ -485,20 +485,20 @@ impl FacetsUpdateIncrementalInner { field_id: u16, level: u8, facet_value: &[u8], - docid: u32, + docids: &RoaringBitmap, ) -> Result { if level == 0 { - return self.delete_in_level_0(txn, field_id, facet_value, docid); + return self.delete_in_level_0(txn, field_id, facet_value, docids); } let (deletion_key, mut bitmap) = self.find_insertion_key_value(field_id, level, facet_value, txn)?; - let result = self.delete_in_level(txn, field_id, level - 1, facet_value.clone(), docid)?; + let result = self.delete_in_level(txn, field_id, level - 1, facet_value.clone(), docids)?; let mut decrease_size = false; let next_key = match result { DeletionResult::InPlace => { - bitmap.bitmap.remove(docid); + bitmap.bitmap -= docids; self.db.put(txn, &deletion_key.as_ref(), &bitmap)?; return Ok(DeletionResult::InPlace); } @@ -527,7 +527,7 @@ impl FacetsUpdateIncrementalInner { if reduced_range { updated_deletion_key.left_bound = next_key.clone().unwrap(); } - updated_value.bitmap.remove(docid); + updated_value.bitmap -= docids; let _ = self.db.delete(txn, &deletion_key.as_ref())?; self.db.put(txn, &updated_deletion_key.as_ref(), &updated_value)?; if reduced_range { @@ -543,11 +543,11 @@ impl FacetsUpdateIncrementalInner { txn: &'t mut RwTxn, field_id: u16, facet_value: &[u8], - docid: u32, + docids: &RoaringBitmap, ) -> Result { let key = FacetGroupKey { field_id, level: 0, left_bound: facet_value }; let mut bitmap = self.db.get(&txn, &key)?.unwrap().bitmap; - bitmap.remove(docid); + bitmap -= docids; if bitmap.is_empty() { let mut next_key = None; @@ -571,7 +571,7 @@ impl FacetsUpdateIncrementalInner { txn: &'t mut RwTxn, field_id: u16, facet_value: &[u8], - docid: u32, + docids: &RoaringBitmap, ) -> Result<()> { if self .db @@ -584,7 +584,7 @@ impl FacetsUpdateIncrementalInner { let highest_level = get_highest_level(&txn, self.db, field_id)?; let result = - self.delete_in_level(txn, field_id, highest_level as u8, facet_value, docid)?; + self.delete_in_level(txn, field_id, highest_level as u8, facet_value, docids)?; match result { DeletionResult::InPlace => return Ok(()), DeletionResult::Reduce { .. } => return Ok(()), @@ -807,7 +807,7 @@ mod tests { for i in (200..256).into_iter().rev() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -816,7 +816,7 @@ mod tests { for i in (150..200).into_iter().rev() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -824,7 +824,7 @@ mod tests { let mut txn = index.env.write_txn().unwrap(); for i in (100..150).into_iter().rev() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -832,14 +832,14 @@ mod tests { let mut txn = index.env.write_txn().unwrap(); for i in (17..100).into_iter().rev() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); milli_snap!(format!("{index}"), 17); let mut txn = index.env.write_txn().unwrap(); for i in (15..17).into_iter().rev() { - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -847,7 +847,7 @@ mod tests { let mut txn = index.env.write_txn().unwrap(); for i in (0..15).into_iter().rev() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -867,7 +867,7 @@ mod tests { } for i in 0..128 { - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -875,7 +875,7 @@ mod tests { let mut txn = index.env.write_txn().unwrap(); for i in 128..216 { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -883,7 +883,7 @@ mod tests { let mut txn = index.env.write_txn().unwrap(); for i in 216..256 { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(i as f64), i as u32); + index.delete_single_docid(&mut txn, 0, &(i as f64), i as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -908,7 +908,7 @@ mod tests { for i in 0..128 { let key = keys[i]; index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(key as f64), key as u32); + index.delete_single_docid(&mut txn, 0, &(key as f64), key as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -917,7 +917,7 @@ mod tests { for i in 128..216 { let key = keys[i]; index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(key as f64), key as u32); + index.delete_single_docid(&mut txn, 0, &(key as f64), key as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -926,7 +926,7 @@ mod tests { for i in 216..256 { let key = keys[i]; index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(key as f64), key as u32); + index.delete_single_docid(&mut txn, 0, &(key as f64), key as u32); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -979,7 +979,7 @@ mod tests { for &key in keys.iter() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &(key as f64), key + 100); + index.delete_single_docid(&mut txn, 0, &(key as f64), key + 100); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -1010,7 +1010,7 @@ mod tests { for &key in keys.iter() { index.verify_structure_validity(&txn, 0); - index.delete(&mut txn, 0, &format!("{key:x}").as_str(), key + 100); + index.delete_single_docid(&mut txn, 0, &format!("{key:x}").as_str(), key + 100); } index.verify_structure_validity(&txn, 0); txn.commit().unwrap(); @@ -1131,7 +1131,7 @@ mod fuzz { OperationKind::Delete(value) => { if let Some(keys) = value_to_keys.get(value) { for key in keys { - index.delete(&mut txn, *field_id, key, *value as u32); + index.delete_single_docid(&mut txn, *field_id, key, *value as u32); trivial_db.delete(*field_id, *key, *value as u32); } } diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index c5046784f..c75713158 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -74,15 +74,15 @@ pub const FACET_MAX_GROUP_SIZE: u8 = 8; pub const FACET_GROUP_SIZE: u8 = 4; pub const FACET_MIN_LEVEL_SIZE: u8 = 5; -use std::fs::File; - use self::incremental::FacetsUpdateIncremental; use super::FacetsUpdateBulk; use crate::facet::FacetType; use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec}; use crate::{Index, Result}; +use std::fs::File; pub mod bulk; +pub mod delete; pub mod incremental; pub struct FacetsUpdate<'i> { @@ -120,8 +120,11 @@ impl<'i> FacetsUpdate<'i> { return Ok(()); } if self.new_data.len() >= (self.database.len(wtxn)? as u64 / 50) { + let field_ids = + self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::>(); let bulk_update = FacetsUpdateBulk::new( self.index, + field_ids, self.facet_type, self.new_data, self.group_size, @@ -273,12 +276,12 @@ pub(crate) mod tests { let key_bytes = BoundCodec::bytes_encode(&key).unwrap(); update.insert(wtxn, field_id, &key_bytes, docids).unwrap(); } - pub fn delete<'a>( + pub fn delete_single_docid<'a>( &self, wtxn: &'a mut RwTxn, field_id: u16, key: &'a >::EItem, - value: u32, + docid: u32, ) { let update = FacetsUpdateIncrementalInner { db: self.content, @@ -287,7 +290,25 @@ pub(crate) mod tests { max_group_size: self.max_group_size.get(), }; let key_bytes = BoundCodec::bytes_encode(&key).unwrap(); - update.delete(wtxn, field_id, &key_bytes, value).unwrap(); + let mut docids = RoaringBitmap::new(); + docids.insert(docid); + update.delete(wtxn, field_id, &key_bytes, &docids).unwrap(); + } + pub fn delete<'a>( + &self, + wtxn: &'a mut RwTxn, + field_id: u16, + key: &'a >::EItem, + docids: &RoaringBitmap, + ) { + let update = FacetsUpdateIncrementalInner { + db: self.content, + group_size: self.group_size.get(), + min_level_size: self.min_level_size.get(), + max_group_size: self.max_group_size.get(), + }; + let key_bytes = BoundCodec::bytes_encode(&key).unwrap(); + update.delete(wtxn, field_id, &key_bytes, docids).unwrap(); } pub fn bulk_insert<'a, 'b>(