diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 7b226ac01..48e29508f 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -142,7 +142,7 @@ impl IndexStats { Ok(IndexStats { number_of_embeddings: Some(arroy_stats.number_of_embeddings), number_of_embedded_documents: Some(arroy_stats.documents.len()), - documents_database_stats: index.documents_database_stats(rtxn)?, + documents_database_stats: index.documents_stats(rtxn)?.unwrap_or_default(), database_size: index.on_disk_size()?, used_database_size: index.used_size()?, primary_key: index.primary_key(rtxn)?.map(|s| s.to_string()), diff --git a/crates/milli/src/database_stats.rs b/crates/milli/src/database_stats.rs index c15280b78..cd7adab4d 100644 --- a/crates/milli/src/database_stats.rs +++ b/crates/milli/src/database_stats.rs @@ -3,8 +3,6 @@ use heed::Database; use heed::RoTxn; use serde::{Deserialize, Serialize}; -use crate::Result; - #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] #[serde(rename_all = "camelCase")] /// The stats of a database. @@ -15,14 +13,6 @@ pub struct DatabaseStats { total_key_size: u64, /// The total size of the values in the database. total_value_size: u64, - /// The maximum size of a key in the database. - max_key_size: u64, - /// The maximum size of a value in the database. - max_value_size: u64, - /// The minimum size of a key in the database. - min_key_size: u64, - /// The minimum size of a value in the database. - min_value_size: u64, } impl DatabaseStats { @@ -30,38 +20,60 @@ impl DatabaseStats { /// /// This function iterates over the whole database and computes the stats. /// It is not efficient and should be cached somewhere. - pub(crate) fn new(database: Database, rtxn: &RoTxn<'_>) -> Result { - let mut database_stats = Self { - number_of_entries: 0, - total_key_size: 0, - total_value_size: 0, - max_key_size: 0, - max_value_size: 0, - min_key_size: u64::MAX, - min_value_size: u64::MAX, - }; + pub(crate) fn new(database: Database, rtxn: &RoTxn<'_>) -> heed::Result { + let mut database_stats = + Self { number_of_entries: 0, total_key_size: 0, total_value_size: 0 }; let mut iter = database.iter(rtxn)?; while let Some((key, value)) = iter.next().transpose()? { let key_size = key.len() as u64; let value_size = value.len() as u64; - database_stats.number_of_entries += 1; database_stats.total_key_size += key_size; database_stats.total_value_size += value_size; - database_stats.max_key_size = database_stats.max_key_size.max(key_size); - database_stats.max_value_size = database_stats.max_value_size.max(value_size); - database_stats.min_key_size = database_stats.min_key_size.min(key_size); - database_stats.min_value_size = database_stats.min_value_size.min(value_size); } - if database_stats.number_of_entries == 0 { - database_stats.min_key_size = 0; - database_stats.min_value_size = 0; - } + database_stats.number_of_entries = database.len(rtxn)?; Ok(database_stats) } + /// Recomputes the stats of the database and returns the new stats. + /// + /// This function is used to update the stats of the database when some keys are modified. + /// It is more efficient than the `new` function because it does not iterate over the whole database but only the modified keys comparing the before and after states. + pub(crate) fn recompute<'a, I, K>( + mut stats: Self, + database: Database, + before_rtxn: &RoTxn<'_>, + after_rtxn: &RoTxn<'_>, + modified_keys: I, + ) -> heed::Result + where + I: IntoIterator, + K: AsRef<[u8]>, + { + for key in modified_keys { + let key = key.as_ref(); + if let Some(value) = database.get(after_rtxn, key)? { + let key_size = key.len() as u64; + let value_size = value.len() as u64; + stats.total_key_size = stats.total_key_size.saturating_add(key_size); + stats.total_value_size = stats.total_value_size.saturating_add(value_size); + } + + if let Some(value) = database.get(before_rtxn, key)? { + let key_size = key.len() as u64; + let value_size = value.len() as u64; + stats.total_key_size = stats.total_key_size.saturating_sub(key_size); + stats.total_value_size = stats.total_value_size.saturating_sub(value_size); + } + } + + stats.number_of_entries = database.len(after_rtxn)?; + + Ok(stats) + } + pub fn average_key_size(&self) -> u64 { self.total_key_size.checked_div(self.number_of_entries).unwrap_or(0) } @@ -81,20 +93,4 @@ impl DatabaseStats { pub fn total_value_size(&self) -> u64 { self.total_value_size } - - pub fn max_key_size(&self) -> u64 { - self.max_key_size - } - - pub fn max_value_size(&self) -> u64 { - self.max_value_size - } - - pub fn min_key_size(&self) -> u64 { - self.min_key_size - } - - pub fn min_value_size(&self) -> u64 { - self.min_value_size - } } diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 11e0ff1f9..4d1e7a2b9 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -75,6 +75,7 @@ pub mod main_key { pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules"; pub const FACET_SEARCH: &str = "facet_search"; pub const PREFIX_SEARCH: &str = "prefix_search"; + pub const DOCUMENTS_STATS: &str = "documents_stats"; } pub mod db_name { @@ -404,9 +405,58 @@ impl Index { Ok(count.unwrap_or_default()) } - /// Returns the stats of the database. - pub fn documents_database_stats(&self, rtxn: &RoTxn<'_>) -> Result { - DatabaseStats::new(self.documents.remap_types::(), rtxn) + /// Updates the stats of the documents database based on the previous stats and the modified docids. + pub fn update_documents_stats( + &self, + wtxn: &mut RwTxn<'_>, + modified_docids: roaring::RoaringBitmap, + ) -> Result<()> { + let before_rtxn = self.read_txn()?; + let document_stats = match self.documents_stats(&before_rtxn)? { + Some(before_stats) => DatabaseStats::recompute( + before_stats, + self.documents.remap_types(), + &before_rtxn, + wtxn, + modified_docids.iter().map(|docid| docid.to_be_bytes()), + )?, + None => { + // This should never happen when there are already documents in the index, the documents stats should be present. + // If it happens, it means that the index was not properly initialized/upgraded. + debug_assert_eq!( + self.documents.len(&before_rtxn)?, + 0, + "The documents stats should be present when there are documents in the index" + ); + tracing::warn!("No documents stats found, creating new ones"); + DatabaseStats::new(self.documents.remap_types(), &*wtxn)? + } + }; + + self.put_documents_stats(wtxn, document_stats)?; + Ok(()) + } + + /// Writes the stats of the documents database. + pub fn put_documents_stats( + &self, + wtxn: &mut RwTxn<'_>, + stats: DatabaseStats, + ) -> heed::Result<()> { + eprintln!("putting documents stats: {:?}", stats); + self.main.remap_types::>().put( + wtxn, + main_key::DOCUMENTS_STATS, + &stats, + ) + } + + /// Returns the stats of the documents database. + pub fn documents_stats(&self, rtxn: &RoTxn<'_>) -> heed::Result> { + dbg!(self + .main + .remap_types::>() + .get(rtxn, main_key::DOCUMENTS_STATS)) } /* primary key */ diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 154db7875..4cb44c91b 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -307,6 +307,7 @@ where let current_span = tracing::Span::current(); // Run extraction pipeline in parallel. + let mut modified_docids = RoaringBitmap::new(); pool.install(|| { let settings_diff_cloned = settings_diff.clone(); rayon::spawn(move || { @@ -367,7 +368,7 @@ where Err(status) => { if let Some(typed_chunks) = chunk_accumulator.pop_longest() { let (docids, is_merged_database) = - write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?; + write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks, &mut modified_docids)?; if !docids.is_empty() { final_documents_ids |= docids; let documents_seen_count = final_documents_ids.len(); @@ -467,6 +468,10 @@ where Ok(()) }).map_err(InternalError::from)??; + if !settings_diff.settings_update_only { + // Update the stats of the documents database when there is a document update. + self.index.update_documents_stats(self.wtxn, modified_docids)?; + } // We write the field distribution into the main database self.index.put_field_distribution(self.wtxn, &field_distribution)?; diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index d5c250e2d..0809d9601 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -129,6 +129,7 @@ pub(crate) fn write_typed_chunk_into_index( index: &Index, settings_diff: &InnerIndexSettingsDiff, typed_chunks: Vec, + modified_docids: &mut RoaringBitmap, ) -> Result<(RoaringBitmap, bool)> { let mut is_merged_database = false; match typed_chunks[0] { @@ -214,6 +215,7 @@ pub(crate) fn write_typed_chunk_into_index( kind: DocumentOperationKind::Create, }); docids.insert(docid); + modified_docids.insert(docid); } else { db.delete(wtxn, &docid)?; operations.push(DocumentOperation { @@ -222,6 +224,7 @@ pub(crate) fn write_typed_chunk_into_index( kind: DocumentOperationKind::Delete, }); docids.remove(docid); + modified_docids.insert(docid); } } let external_documents_docids = index.external_documents_ids(); diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index 47bca6193..f9829032b 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -711,15 +711,17 @@ impl DelAddRoaringBitmap { DelAddRoaringBitmap { del, add } } - pub fn apply_to(&self, documents_ids: &mut RoaringBitmap) { + pub fn apply_to(&self, documents_ids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap) { let DelAddRoaringBitmap { del, add } = self; if let Some(del) = del { *documents_ids -= del; + *modified_docids |= del; } if let Some(add) = add { *documents_ids |= add; + *modified_docids |= add; } } } diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 53478f029..3299d610f 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -32,6 +32,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( field_distribution: &mut BTreeMap, mut index_embeddings: Vec, document_ids: &mut RoaringBitmap, + modified_docids: &mut RoaringBitmap, ) -> Result<(FacetFieldIdsDelta, Vec)> where DC: DocumentChanges<'pl>, @@ -70,7 +71,7 @@ where // adding the delta should never cause a negative result, as we are removing fields that previously existed. *current = current.saturating_add_signed(delta); } - document_extractor_data.docids_delta.apply_to(document_ids); + document_extractor_data.docids_delta.apply_to(document_ids, modified_docids); } field_distribution.retain(|_, v| *v != 0); @@ -256,7 +257,7 @@ where let Some(deladd) = data.remove(&config.name) else { continue 'data; }; - deladd.apply_to(&mut config.user_provided); + deladd.apply_to(&mut config.user_provided, modified_docids); } } } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 890191323..9717b358b 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -129,6 +129,7 @@ where let index_embeddings = index.embedding_configs(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; + let mut modified_docids = roaring::RoaringBitmap::new(); thread::scope(|s| -> Result<()> { let indexer_span = tracing::Span::current(); @@ -137,6 +138,7 @@ where // prevent moving the field_distribution and document_ids in the inner closure... let field_distribution = &mut field_distribution; let document_ids = &mut document_ids; + let modified_docids = &mut modified_docids; let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.install(move || { @@ -151,6 +153,7 @@ where field_distribution, index_embeddings, document_ids, + modified_docids, ) }) .unwrap() @@ -225,6 +228,7 @@ where embedders, field_distribution, document_ids, + modified_docids, )?; Ok(()) diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 707599ba3..c4c046360 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -121,6 +121,7 @@ pub(super) fn update_index( embedders: EmbeddingConfigs, field_distribution: std::collections::BTreeMap, document_ids: roaring::RoaringBitmap, + modified_docids: roaring::RoaringBitmap, ) -> Result<()> { index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; if let Some(new_primary_key) = new_primary_key { @@ -132,6 +133,7 @@ pub(super) fn update_index( index.put_field_distribution(wtxn, &field_distribution)?; index.put_documents_ids(wtxn, &document_ids)?; index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; + index.update_documents_stats(wtxn, modified_docids)?; Ok(()) }