Batch::remove_documents_from_db_no_batch

This commit is contained in:
Louis Dureuil 2023-11-09 14:23:02 +01:00
parent b11c2afac0
commit 3053e01c05
No known key found for this signature in database
2 changed files with 116 additions and 0 deletions

View File

@ -194,6 +194,39 @@ where
Ok((self, Ok(deleted_documents))) Ok((self, Ok(deleted_documents)))
} }
/// Removes documents from db using their internal document ids.
///
/// # Warning
///
/// This function is dangerous and will only work correctly if:
///
/// - All the passed ids currently exist in the database
/// - No batching using the standards `remove_documents` and `add_documents` took place
///
/// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function.
pub fn remove_documents_from_db_no_batch(
mut self,
to_delete: &RoaringBitmap,
) -> Result<(Self, u64)> {
puffin::profile_function!();
// Early return when there is no document to add
if to_delete.is_empty() {
return Ok((self, 0));
}
let deleted_documents = self
.transform
.as_mut()
.expect("Invalid document deletion state")
.remove_documents_from_db_no_batch(to_delete, self.wtxn, &self.should_abort)?
as u64;
self.deleted_documents += deleted_documents;
Ok((self, deleted_documents))
}
#[logging_timer::time("IndexDocuments::{}")] #[logging_timer::time("IndexDocuments::{}")]
pub fn execute(mut self) -> Result<DocumentAdditionResult> { pub fn execute(mut self) -> Result<DocumentAdditionResult> {
puffin::profile_function!(); puffin::profile_function!();

View File

@ -481,6 +481,89 @@ impl<'a, 'i> Transform<'a, 'i> {
Ok(documents_deleted) Ok(documents_deleted)
} }
/// The counter part of `read_documents` that removes documents either from the transform or the database.
/// It can be called before, after or in between two calls of the `read_documents`.
///
/// It needs to update all the internal datastructure in the transform.
/// - If the document is coming from the database -> it's marked as a to_delete document
/// - If the document to remove was inserted by the `read_documents` method before AND was present in the db,
/// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it.
/// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db,
/// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids.
/// - If the document to remove was not present in either the db or the transform we do nothing.
#[logging_timer::time]
pub fn remove_documents_from_db_no_batch<FA>(
&mut self,
to_remove: &RoaringBitmap,
wtxn: &mut heed::RwTxn,
should_abort: FA,
) -> Result<usize>
where
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let mut documents_deleted = 0;
let mut document_sorter_value_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
let external_ids = self.index.external_id_of(wtxn, to_remove.iter())?;
for (to_remove, external_docid) in to_remove.iter().zip(external_ids) {
let external_docid = external_docid?;
if should_abort() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
self.replaced_documents_ids.insert(to_remove);
// fetch the obkv document
let original_key = BEU32::new(to_remove);
let base_obkv = self
.index
.documents
.remap_data_type::<heed::types::ByteSlice>()
.get(wtxn, &original_key)?
.ok_or(InternalError::DatabaseMissingEntry {
db_name: db_name::DOCUMENTS,
key: None,
})?;
// Key is the concatenation of the internal docid and the external one.
document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&to_remove.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes());
// push it as to delete in the original_sorter
document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(
KvReaderU16::new(base_obkv),
true,
false,
&mut document_sorter_value_buffer,
)?;
self.original_sorter
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
// flatten it and push it as to delete in the flattened_sorter
let flattened_obkv = KvReader::new(base_obkv);
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
// we recreate our buffer with the flattened documents
document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(
KvReaderU16::new(&obkv),
true,
false,
&mut document_sorter_value_buffer,
)?;
}
self.flattened_sorter.insert(to_remove.to_be_bytes(), &document_sorter_value_buffer)?;
documents_deleted += 1;
}
Ok(documents_deleted)
}
// Flatten a document from the fields ids map contained in self and insert the new // Flatten a document from the fields ids map contained in self and insert the new
// created fields. Returns `None` if the document doesn't need to be flattened. // created fields. Returns `None` if the document doesn't need to be flattened.
fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> { fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {