mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Fasten the document deletion
When a document deletion occurs, instead of deleting the document we mark it as deleted in the new “soft deleted” bitmap. It is then removed from the search, and all the other endpoints.
This commit is contained in:
parent
77c837fc1b
commit
3b309f654a
10 changed files with 413 additions and 284 deletions
|
@ -4,7 +4,6 @@ use std::collections::{HashMap, HashSet};
|
|||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
|
||||
use byteorder::ReadBytesExt;
|
||||
use fxhash::FxHashMap;
|
||||
use heed::RoTxn;
|
||||
use itertools::Itertools;
|
||||
|
@ -57,7 +56,7 @@ pub struct Transform<'a, 'i> {
|
|||
flattened_sorter: grenad::Sorter<MergeFn>,
|
||||
replaced_documents_ids: RoaringBitmap,
|
||||
new_documents_ids: RoaringBitmap,
|
||||
// To increase the cache locality and the heap usage we use smartstring.
|
||||
// To increase the cache locality and decrease the heap usage we use compact smartstring.
|
||||
new_external_documents_ids_builder: FxHashMap<SmartString<smartstring::Compact>, u64>,
|
||||
documents_count: usize,
|
||||
}
|
||||
|
@ -130,13 +129,17 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
indexer_settings.max_memory.map(|mem| mem / 2),
|
||||
);
|
||||
let documents_ids = index.documents_ids(wtxn)?;
|
||||
let soft_deleted_documents_ids = index.soft_deleted_documents_ids(wtxn)?;
|
||||
|
||||
Ok(Transform {
|
||||
index,
|
||||
fields_ids_map: index.fields_ids_map(wtxn)?,
|
||||
indexer_settings,
|
||||
autogenerate_docids,
|
||||
available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids),
|
||||
available_documents_ids: AvailableDocumentsIds::from_documents_ids(
|
||||
&documents_ids,
|
||||
&soft_deleted_documents_ids,
|
||||
),
|
||||
original_sorter,
|
||||
flattened_sorter,
|
||||
index_documents_method,
|
||||
|
@ -248,45 +251,39 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
writer.insert(*k, v)?;
|
||||
}
|
||||
|
||||
let (docid, should_insert_original_document) =
|
||||
match external_documents_ids.get(&*external_id) {
|
||||
// if the document is in the db but has already been inserted
|
||||
// (ie: already exists in the list of replaced documents ids),
|
||||
// we should not add the original document a second time.
|
||||
Some(docid) => (docid, !self.replaced_documents_ids.contains(docid)),
|
||||
None => {
|
||||
// if the document has already been inserted in this
|
||||
// batch we need to get its docid
|
||||
match self.new_external_documents_ids_builder.entry(external_id.into()) {
|
||||
Entry::Occupied(entry) => (*entry.get() as u32, false),
|
||||
// if the document has never been encountered we give it a new docid
|
||||
// and push this new docid to the external documents ids builder
|
||||
Entry::Vacant(entry) => {
|
||||
let new_docid = self
|
||||
.available_documents_ids
|
||||
.next()
|
||||
.ok_or(UserError::DocumentLimitReached)?;
|
||||
entry.insert(new_docid as u64);
|
||||
(new_docid, false)
|
||||
}
|
||||
}
|
||||
let mut original_docid = None;
|
||||
|
||||
let docid = match self.new_external_documents_ids_builder.entry(external_id.into()) {
|
||||
Entry::Occupied(entry) => *entry.get() as u32,
|
||||
Entry::Vacant(entry) => {
|
||||
// If the document was already in the db we mark it as a replaced document.
|
||||
// It'll be deleted later. We keep its original docid to insert it in the grenad.
|
||||
if let Some(docid) = external_documents_ids.get(entry.key()) {
|
||||
self.replaced_documents_ids.insert(docid);
|
||||
original_docid = Some(docid);
|
||||
}
|
||||
};
|
||||
let docid = self
|
||||
.available_documents_ids
|
||||
.next()
|
||||
.ok_or(UserError::DocumentLimitReached)?;
|
||||
entry.insert(docid as u64);
|
||||
docid
|
||||
}
|
||||
};
|
||||
|
||||
if should_insert_original_document {
|
||||
self.replaced_documents_ids.insert(docid);
|
||||
|
||||
let key = BEU32::new(docid);
|
||||
if let Some(original_docid) = original_docid {
|
||||
let original_key = BEU32::new(original_docid);
|
||||
let base_obkv = self
|
||||
.index
|
||||
.documents
|
||||
.remap_data_type::<heed::types::ByteSlice>()
|
||||
.get(wtxn, &key)?
|
||||
.get(wtxn, &original_key)?
|
||||
.ok_or(InternalError::DatabaseMissingEntry {
|
||||
db_name: db_name::DOCUMENTS,
|
||||
key: None,
|
||||
})?;
|
||||
|
||||
// we associate the base document with the new key, everything will get merged later.
|
||||
self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?;
|
||||
match self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? {
|
||||
Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?,
|
||||
|
@ -506,6 +503,39 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_deleted_documents_from_field_distribution(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
field_distribution: &mut FieldDistribution,
|
||||
) -> Result<()> {
|
||||
for deleted_docid in self.replaced_documents_ids.iter() {
|
||||
let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or(
|
||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
for (key, _) in obkv.iter() {
|
||||
let name =
|
||||
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
|
||||
field_id: key,
|
||||
process: "Computing field distribution in transform.",
|
||||
})?;
|
||||
// We checked that the document was in the db earlier. If we can't find it it means
|
||||
// there is an inconsistency between the field distribution and the field id map.
|
||||
let field =
|
||||
field_distribution.get_mut(name).ok_or(FieldIdMapMissingEntry::FieldId {
|
||||
field_id: key,
|
||||
process: "Accessing field distribution in transform.",
|
||||
})?;
|
||||
*field -= 1;
|
||||
if *field == 0 {
|
||||
// since we were able to get the field right before it's safe to unwrap here
|
||||
field_distribution.remove(name).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
|
||||
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
|
||||
/// id for the user side and the value must be an obkv where keys are valid fields ids.
|
||||
|
@ -532,9 +562,14 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
// Once we have all the documents in the sorter, we write the documents
|
||||
// in the writer. We also generate the field distribution.
|
||||
// To compute the field distribution we need to;
|
||||
// 1. Remove all the deleted documents from the field distribution
|
||||
// 2. Add all the new documents to the field distribution
|
||||
let mut field_distribution = self.index.field_distribution(wtxn)?;
|
||||
|
||||
self.remove_deleted_documents_from_field_distribution(wtxn, &mut field_distribution)?;
|
||||
|
||||
// Here we are going to do the document count + field distribution + `write_into_stream_writer`
|
||||
let mut iter = self.original_sorter.into_stream_merger_iter()?;
|
||||
// used only for the callback
|
||||
let mut documents_count = 0;
|
||||
|
@ -547,36 +582,6 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
total_documents: self.documents_count,
|
||||
});
|
||||
|
||||
let u32_key = key.clone().read_u32::<byteorder::BigEndian>()?;
|
||||
// if the document was already in the db we remove all of its field
|
||||
// from the field distribution.
|
||||
if self.replaced_documents_ids.contains(u32_key) {
|
||||
let obkv = self.index.documents.get(wtxn, &BEU32::new(u32_key))?.ok_or(
|
||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
for (key, _) in obkv.iter() {
|
||||
let name =
|
||||
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
|
||||
field_id: key,
|
||||
process: "Computing field distribution in transform.",
|
||||
})?;
|
||||
// We checked that the document was in the db earlier. If we can't find it it means
|
||||
// there is an inconsistency between the field distribution and the field id map.
|
||||
let field = field_distribution.get_mut(name).ok_or(
|
||||
FieldIdMapMissingEntry::FieldId {
|
||||
field_id: key,
|
||||
process: "Accessing field distribution in transform.",
|
||||
},
|
||||
)?;
|
||||
*field -= 1;
|
||||
if *field == 0 {
|
||||
// since we were able to get the field right before it's safe to unwrap here
|
||||
field_distribution.remove(name).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We increment all the field of the current document in the field distribution.
|
||||
let obkv = KvReader::new(val);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue