mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 05:54:30 +01:00
provide a new method on the transform to remove documents
This commit is contained in:
parent
8f64fba1ce
commit
421a9cf05e
@ -6,7 +6,7 @@ use roaring::RoaringBitmap;
|
||||
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupValue};
|
||||
use crate::{make_db_snap_from_iter, ExternalDocumentsIds, Index};
|
||||
use crate::{make_db_snap_from_iter, obkv_to_json, ExternalDocumentsIds, Index};
|
||||
|
||||
#[track_caller]
|
||||
pub fn default_db_snapshot_settings_for_test(name: Option<&str>) -> (insta::Settings, String) {
|
||||
@ -427,8 +427,26 @@ pub fn snap_settings(index: &Index) -> String {
|
||||
snap
|
||||
}
|
||||
|
||||
pub fn snap_documents(index: &Index) -> String {
|
||||
let mut snap = String::new();
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
let display = fields_ids_map.ids().collect::<Vec<_>>();
|
||||
|
||||
for document in index.all_documents(&rtxn).unwrap() {
|
||||
let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap();
|
||||
snap.push_str(&serde_json::to_string(&doc).unwrap());
|
||||
snap.push('\n');
|
||||
}
|
||||
|
||||
snap
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! full_snap_of_db {
|
||||
($index:ident, documents) => {{
|
||||
$crate::snapshot_tests::snap_documents(&$index)
|
||||
}};
|
||||
($index:ident, settings) => {{
|
||||
$crate::snapshot_tests::snap_settings(&$index)
|
||||
}};
|
||||
|
@ -79,6 +79,7 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> {
|
||||
progress: FP,
|
||||
should_abort: FA,
|
||||
added_documents: u64,
|
||||
deleted_documents: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
@ -122,6 +123,7 @@ where
|
||||
wtxn,
|
||||
index,
|
||||
added_documents: 0,
|
||||
deleted_documents: 0,
|
||||
})
|
||||
}
|
||||
|
||||
@ -166,6 +168,30 @@ where
|
||||
Ok((self, Ok(indexed_documents)))
|
||||
}
|
||||
|
||||
/// Remove a batch of documents from the current builder.
|
||||
///
|
||||
/// Returns the number of documents deleted from the builder.
|
||||
pub fn remove_documents(
|
||||
mut self,
|
||||
to_delete: Vec<String>,
|
||||
) -> Result<(Self, StdResult<u64, UserError>)> {
|
||||
// Early return when there is no document to add
|
||||
if to_delete.is_empty() {
|
||||
return Ok((self, Ok(0)));
|
||||
}
|
||||
|
||||
let deleted_documents = self
|
||||
.transform
|
||||
.as_mut()
|
||||
.expect("Invalid document deletion state")
|
||||
.remove_documents(to_delete, self.wtxn, &self.should_abort)?
|
||||
as u64;
|
||||
|
||||
self.deleted_documents += deleted_documents;
|
||||
|
||||
Ok((self, Ok(deleted_documents)))
|
||||
}
|
||||
|
||||
#[logging_timer::time("IndexDocuments::{}")]
|
||||
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
|
||||
if self.added_documents == 0 {
|
||||
@ -1879,4 +1905,203 @@ mod tests {
|
||||
|
||||
index.add_documents(doc1).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_and_delete_documents_in_single_transform() {
|
||||
let index = TempIndex::new();
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "bob", "age": 20 } },
|
||||
{ "id": 3, "name": "jean", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"1");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 3,
|
||||
number_of_documents: 2,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":1,"doggo":"kevin"}
|
||||
{"id":3,"name":"jean","age":25}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_update_and_delete_documents_in_single_transform() {
|
||||
let index = TempIndex::new();
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "bob", "age": 20 } },
|
||||
{ "id": 3, "name": "jean", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 2, "doggo": { "name": "jean", "age": 20 } },
|
||||
{ "id": 3, "name": "bob", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"2");
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 5,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":3,"name":"bob","age":25}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_document_and_in_another_transform_update_and_delete_documents() {
|
||||
let index = TempIndex::new();
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "bob", "age": 20 } },
|
||||
{ "id": 3, "name": "jean", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 3,
|
||||
number_of_documents: 3,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":1,"doggo":"kevin"}
|
||||
{"id":2,"doggo":{"name":"bob","age":20}}
|
||||
{"id":3,"name":"jean","age":25}
|
||||
"###);
|
||||
|
||||
// A first batch of documents has been inserted
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 2, "doggo": { "name": "jean", "age": 20 } },
|
||||
{ "id": 3, "name": "bob", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"2");
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 2,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":3,"name":"bob","age":25}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_document_and_then_add_documents_in_the_same_transform() {
|
||||
let index = TempIndex::new();
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"0");
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 2, "doggo": { "name": "jean", "age": 20 } },
|
||||
{ "id": 3, "name": "bob", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 2,
|
||||
number_of_documents: 2,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":2,"doggo":{"name":"jean","age":20}}
|
||||
{"id":3,"name":"bob","age":25}
|
||||
"###);
|
||||
}
|
||||
}
|
||||
|
@ -159,9 +159,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
FA: Fn() -> bool + Sync,
|
||||
{
|
||||
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
|
||||
|
||||
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
||||
|
||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
||||
|
||||
let primary_key = cursor.primary_key().to_string();
|
||||
@ -322,6 +320,64 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
Ok(documents_count)
|
||||
}
|
||||
|
||||
/// The counter part of `read_documents` that removes documents that may have been inserted into the transform previously.
|
||||
pub fn remove_documents<FA>(
|
||||
&mut self,
|
||||
mut to_remove: Vec<String>,
|
||||
wtxn: &mut heed::RwTxn,
|
||||
should_abort: FA,
|
||||
) -> Result<usize>
|
||||
where
|
||||
FA: Fn() -> bool + Sync,
|
||||
{
|
||||
// there may be duplicates in the documents to remove.
|
||||
to_remove.sort_unstable();
|
||||
to_remove.dedup();
|
||||
|
||||
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
||||
|
||||
let mut documents_deleted = 0;
|
||||
for to_remove in to_remove {
|
||||
if should_abort() {
|
||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||
}
|
||||
|
||||
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
|
||||
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
||||
Entry::Occupied(entry) => {
|
||||
let doc_id = *entry.get() as u32;
|
||||
self.original_sorter
|
||||
.insert(doc_id.to_be_bytes(), &[Operation::Deletion as u8])?;
|
||||
self.flattened_sorter
|
||||
.insert(doc_id.to_be_bytes(), &[Operation::Deletion as u8])?;
|
||||
|
||||
// we must NOT update the list of replaced_documents_ids
|
||||
// Either:
|
||||
// 1. It's already in it and there is nothing to do
|
||||
// 2. It wasn't in it because the document was created by a previous batch and since
|
||||
// we're removing it there is nothing to do.
|
||||
self.new_documents_ids.remove(doc_id);
|
||||
entry.remove_entry();
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
// If the document was already in the db we mark it as a `to_delete` document.
|
||||
// It'll be deleted later. We don't need to push anything to the sorters.
|
||||
if let Some(docid) = external_documents_ids.get(entry.key()) {
|
||||
self.replaced_documents_ids.insert(docid);
|
||||
} else {
|
||||
// if the document is nowehere to be found, there is nothing to do and we must NOT
|
||||
// increment the count of documents_deleted
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
documents_deleted += 1;
|
||||
}
|
||||
|
||||
Ok(documents_deleted)
|
||||
}
|
||||
|
||||
// Flatten a document from the fields ids map contained in self and insert the new
|
||||
// created fields. Returns `None` if the document doesn't need to be flattened.
|
||||
fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user