Implement Incremental document database stats computing

This commit is contained in:
ManyTheFish 2025-02-17 16:36:33 +01:00
parent 1bd57a9a94
commit 9a33628331
9 changed files with 116 additions and 53 deletions

View File

@ -142,7 +142,7 @@ impl IndexStats {
Ok(IndexStats { Ok(IndexStats {
number_of_embeddings: Some(arroy_stats.number_of_embeddings), number_of_embeddings: Some(arroy_stats.number_of_embeddings),
number_of_embedded_documents: Some(arroy_stats.documents.len()), number_of_embedded_documents: Some(arroy_stats.documents.len()),
documents_database_stats: index.documents_database_stats(rtxn)?, documents_database_stats: index.documents_stats(rtxn)?.unwrap_or_default(),
database_size: index.on_disk_size()?, database_size: index.on_disk_size()?,
used_database_size: index.used_size()?, used_database_size: index.used_size()?,
primary_key: index.primary_key(rtxn)?.map(|s| s.to_string()), primary_key: index.primary_key(rtxn)?.map(|s| s.to_string()),

View File

@ -3,8 +3,6 @@ use heed::Database;
use heed::RoTxn; use heed::RoTxn;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::Result;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
/// The stats of a database. /// The stats of a database.
@ -15,14 +13,6 @@ pub struct DatabaseStats {
total_key_size: u64, total_key_size: u64,
/// The total size of the values in the database. /// The total size of the values in the database.
total_value_size: u64, total_value_size: u64,
/// The maximum size of a key in the database.
max_key_size: u64,
/// The maximum size of a value in the database.
max_value_size: u64,
/// The minimum size of a key in the database.
min_key_size: u64,
/// The minimum size of a value in the database.
min_value_size: u64,
} }
impl DatabaseStats { impl DatabaseStats {
@ -30,38 +20,60 @@ impl DatabaseStats {
/// ///
/// This function iterates over the whole database and computes the stats. /// This function iterates over the whole database and computes the stats.
/// It is not efficient and should be cached somewhere. /// It is not efficient and should be cached somewhere.
pub(crate) fn new(database: Database<Bytes, Bytes>, rtxn: &RoTxn<'_>) -> Result<Self> { pub(crate) fn new(database: Database<Bytes, Bytes>, rtxn: &RoTxn<'_>) -> heed::Result<Self> {
let mut database_stats = Self { let mut database_stats =
number_of_entries: 0, Self { number_of_entries: 0, total_key_size: 0, total_value_size: 0 };
total_key_size: 0,
total_value_size: 0,
max_key_size: 0,
max_value_size: 0,
min_key_size: u64::MAX,
min_value_size: u64::MAX,
};
let mut iter = database.iter(rtxn)?; let mut iter = database.iter(rtxn)?;
while let Some((key, value)) = iter.next().transpose()? { while let Some((key, value)) = iter.next().transpose()? {
let key_size = key.len() as u64; let key_size = key.len() as u64;
let value_size = value.len() as u64; let value_size = value.len() as u64;
database_stats.number_of_entries += 1;
database_stats.total_key_size += key_size; database_stats.total_key_size += key_size;
database_stats.total_value_size += value_size; database_stats.total_value_size += value_size;
database_stats.max_key_size = database_stats.max_key_size.max(key_size);
database_stats.max_value_size = database_stats.max_value_size.max(value_size);
database_stats.min_key_size = database_stats.min_key_size.min(key_size);
database_stats.min_value_size = database_stats.min_value_size.min(value_size);
} }
if database_stats.number_of_entries == 0 { database_stats.number_of_entries = database.len(rtxn)?;
database_stats.min_key_size = 0;
database_stats.min_value_size = 0;
}
Ok(database_stats) Ok(database_stats)
} }
/// Recomputes the stats of the database and returns the new stats.
///
/// This function is used to update the stats of the database when some keys are modified.
/// It is more efficient than the `new` function because it does not iterate over the whole database but only the modified keys comparing the before and after states.
pub(crate) fn recompute<'a, I, K>(
mut stats: Self,
database: Database<Bytes, Bytes>,
before_rtxn: &RoTxn<'_>,
after_rtxn: &RoTxn<'_>,
modified_keys: I,
) -> heed::Result<Self>
where
I: IntoIterator<Item = K>,
K: AsRef<[u8]>,
{
for key in modified_keys {
let key = key.as_ref();
if let Some(value) = database.get(after_rtxn, key)? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
stats.total_key_size = stats.total_key_size.saturating_add(key_size);
stats.total_value_size = stats.total_value_size.saturating_add(value_size);
}
if let Some(value) = database.get(before_rtxn, key)? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
stats.total_key_size = stats.total_key_size.saturating_sub(key_size);
stats.total_value_size = stats.total_value_size.saturating_sub(value_size);
}
}
stats.number_of_entries = database.len(after_rtxn)?;
Ok(stats)
}
pub fn average_key_size(&self) -> u64 { pub fn average_key_size(&self) -> u64 {
self.total_key_size.checked_div(self.number_of_entries).unwrap_or(0) self.total_key_size.checked_div(self.number_of_entries).unwrap_or(0)
} }
@ -81,20 +93,4 @@ impl DatabaseStats {
pub fn total_value_size(&self) -> u64 { pub fn total_value_size(&self) -> u64 {
self.total_value_size self.total_value_size
} }
pub fn max_key_size(&self) -> u64 {
self.max_key_size
}
pub fn max_value_size(&self) -> u64 {
self.max_value_size
}
pub fn min_key_size(&self) -> u64 {
self.min_key_size
}
pub fn min_value_size(&self) -> u64 {
self.min_value_size
}
} }

View File

@ -75,6 +75,7 @@ pub mod main_key {
pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules"; pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules";
pub const FACET_SEARCH: &str = "facet_search"; pub const FACET_SEARCH: &str = "facet_search";
pub const PREFIX_SEARCH: &str = "prefix_search"; pub const PREFIX_SEARCH: &str = "prefix_search";
pub const DOCUMENTS_STATS: &str = "documents_stats";
} }
pub mod db_name { pub mod db_name {
@ -404,9 +405,58 @@ impl Index {
Ok(count.unwrap_or_default()) Ok(count.unwrap_or_default())
} }
/// Returns the stats of the database. /// Updates the stats of the documents database based on the previous stats and the modified docids.
pub fn documents_database_stats(&self, rtxn: &RoTxn<'_>) -> Result<DatabaseStats> { pub fn update_documents_stats(
DatabaseStats::new(self.documents.remap_types::<Bytes, Bytes>(), rtxn) &self,
wtxn: &mut RwTxn<'_>,
modified_docids: roaring::RoaringBitmap,
) -> Result<()> {
let before_rtxn = self.read_txn()?;
let document_stats = match self.documents_stats(&before_rtxn)? {
Some(before_stats) => DatabaseStats::recompute(
before_stats,
self.documents.remap_types(),
&before_rtxn,
wtxn,
modified_docids.iter().map(|docid| docid.to_be_bytes()),
)?,
None => {
// This should never happen when there are already documents in the index, the documents stats should be present.
// If it happens, it means that the index was not properly initialized/upgraded.
debug_assert_eq!(
self.documents.len(&before_rtxn)?,
0,
"The documents stats should be present when there are documents in the index"
);
tracing::warn!("No documents stats found, creating new ones");
DatabaseStats::new(self.documents.remap_types(), &*wtxn)?
}
};
self.put_documents_stats(wtxn, document_stats)?;
Ok(())
}
/// Writes the stats of the documents database.
pub fn put_documents_stats(
&self,
wtxn: &mut RwTxn<'_>,
stats: DatabaseStats,
) -> heed::Result<()> {
eprintln!("putting documents stats: {:?}", stats);
self.main.remap_types::<Str, SerdeJson<DatabaseStats>>().put(
wtxn,
main_key::DOCUMENTS_STATS,
&stats,
)
}
/// Returns the stats of the documents database.
pub fn documents_stats(&self, rtxn: &RoTxn<'_>) -> heed::Result<Option<DatabaseStats>> {
dbg!(self
.main
.remap_types::<Str, SerdeJson<DatabaseStats>>()
.get(rtxn, main_key::DOCUMENTS_STATS))
} }
/* primary key */ /* primary key */

View File

@ -307,6 +307,7 @@ where
let current_span = tracing::Span::current(); let current_span = tracing::Span::current();
// Run extraction pipeline in parallel. // Run extraction pipeline in parallel.
let mut modified_docids = RoaringBitmap::new();
pool.install(|| { pool.install(|| {
let settings_diff_cloned = settings_diff.clone(); let settings_diff_cloned = settings_diff.clone();
rayon::spawn(move || { rayon::spawn(move || {
@ -367,7 +368,7 @@ where
Err(status) => { Err(status) => {
if let Some(typed_chunks) = chunk_accumulator.pop_longest() { if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
let (docids, is_merged_database) = let (docids, is_merged_database) =
write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?; write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks, &mut modified_docids)?;
if !docids.is_empty() { if !docids.is_empty() {
final_documents_ids |= docids; final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len(); let documents_seen_count = final_documents_ids.len();
@ -467,6 +468,10 @@ where
Ok(()) Ok(())
}).map_err(InternalError::from)??; }).map_err(InternalError::from)??;
if !settings_diff.settings_update_only {
// Update the stats of the documents database when there is a document update.
self.index.update_documents_stats(self.wtxn, modified_docids)?;
}
// We write the field distribution into the main database // We write the field distribution into the main database
self.index.put_field_distribution(self.wtxn, &field_distribution)?; self.index.put_field_distribution(self.wtxn, &field_distribution)?;

View File

@ -129,6 +129,7 @@ pub(crate) fn write_typed_chunk_into_index(
index: &Index, index: &Index,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
typed_chunks: Vec<TypedChunk>, typed_chunks: Vec<TypedChunk>,
modified_docids: &mut RoaringBitmap,
) -> Result<(RoaringBitmap, bool)> { ) -> Result<(RoaringBitmap, bool)> {
let mut is_merged_database = false; let mut is_merged_database = false;
match typed_chunks[0] { match typed_chunks[0] {
@ -214,6 +215,7 @@ pub(crate) fn write_typed_chunk_into_index(
kind: DocumentOperationKind::Create, kind: DocumentOperationKind::Create,
}); });
docids.insert(docid); docids.insert(docid);
modified_docids.insert(docid);
} else { } else {
db.delete(wtxn, &docid)?; db.delete(wtxn, &docid)?;
operations.push(DocumentOperation { operations.push(DocumentOperation {
@ -222,6 +224,7 @@ pub(crate) fn write_typed_chunk_into_index(
kind: DocumentOperationKind::Delete, kind: DocumentOperationKind::Delete,
}); });
docids.remove(docid); docids.remove(docid);
modified_docids.insert(docid);
} }
} }
let external_documents_docids = index.external_documents_ids(); let external_documents_docids = index.external_documents_ids();

View File

@ -711,15 +711,17 @@ impl DelAddRoaringBitmap {
DelAddRoaringBitmap { del, add } DelAddRoaringBitmap { del, add }
} }
pub fn apply_to(&self, documents_ids: &mut RoaringBitmap) { pub fn apply_to(&self, documents_ids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap) {
let DelAddRoaringBitmap { del, add } = self; let DelAddRoaringBitmap { del, add } = self;
if let Some(del) = del { if let Some(del) = del {
*documents_ids -= del; *documents_ids -= del;
*modified_docids |= del;
} }
if let Some(add) = add { if let Some(add) = add {
*documents_ids |= add; *documents_ids |= add;
*modified_docids |= add;
} }
} }
} }

View File

@ -32,6 +32,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
field_distribution: &mut BTreeMap<String, u64>, field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>, mut index_embeddings: Vec<IndexEmbeddingConfig>,
document_ids: &mut RoaringBitmap, document_ids: &mut RoaringBitmap,
modified_docids: &mut RoaringBitmap,
) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)> ) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)>
where where
DC: DocumentChanges<'pl>, DC: DocumentChanges<'pl>,
@ -70,7 +71,7 @@ where
// adding the delta should never cause a negative result, as we are removing fields that previously existed. // adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta); *current = current.saturating_add_signed(delta);
} }
document_extractor_data.docids_delta.apply_to(document_ids); document_extractor_data.docids_delta.apply_to(document_ids, modified_docids);
} }
field_distribution.retain(|_, v| *v != 0); field_distribution.retain(|_, v| *v != 0);
@ -256,7 +257,7 @@ where
let Some(deladd) = data.remove(&config.name) else { let Some(deladd) = data.remove(&config.name) else {
continue 'data; continue 'data;
}; };
deladd.apply_to(&mut config.user_provided); deladd.apply_to(&mut config.user_provided, modified_docids);
} }
} }
} }

View File

@ -129,6 +129,7 @@ where
let index_embeddings = index.embedding_configs(wtxn)?; let index_embeddings = index.embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?;
let mut modified_docids = roaring::RoaringBitmap::new();
thread::scope(|s| -> Result<()> { thread::scope(|s| -> Result<()> {
let indexer_span = tracing::Span::current(); let indexer_span = tracing::Span::current();
@ -137,6 +138,7 @@ where
// prevent moving the field_distribution and document_ids in the inner closure... // prevent moving the field_distribution and document_ids in the inner closure...
let field_distribution = &mut field_distribution; let field_distribution = &mut field_distribution;
let document_ids = &mut document_ids; let document_ids = &mut document_ids;
let modified_docids = &mut modified_docids;
let extractor_handle = let extractor_handle =
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.install(move || { pool.install(move || {
@ -151,6 +153,7 @@ where
field_distribution, field_distribution,
index_embeddings, index_embeddings,
document_ids, document_ids,
modified_docids,
) )
}) })
.unwrap() .unwrap()
@ -225,6 +228,7 @@ where
embedders, embedders,
field_distribution, field_distribution,
document_ids, document_ids,
modified_docids,
)?; )?;
Ok(()) Ok(())

View File

@ -121,6 +121,7 @@ pub(super) fn update_index(
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
field_distribution: std::collections::BTreeMap<String, u64>, field_distribution: std::collections::BTreeMap<String, u64>,
document_ids: roaring::RoaringBitmap, document_ids: roaring::RoaringBitmap,
modified_docids: roaring::RoaringBitmap,
) -> Result<()> { ) -> Result<()> {
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
if let Some(new_primary_key) = new_primary_key { if let Some(new_primary_key) = new_primary_key {
@ -132,6 +133,7 @@ pub(super) fn update_index(
index.put_field_distribution(wtxn, &field_distribution)?; index.put_field_distribution(wtxn, &field_distribution)?;
index.put_documents_ids(wtxn, &document_ids)?; index.put_documents_ids(wtxn, &document_ids)?;
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
index.update_documents_stats(wtxn, modified_docids)?;
Ok(()) Ok(())
} }