mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 20:07:09 +02:00
Merge branch 'main' into enhance-language-detection
This commit is contained in:
commit
8aa808d51b
202 changed files with 10490 additions and 3066 deletions
|
@ -591,9 +591,9 @@ fn remove_from_word_docids(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_docids_from_field_id_docid_facet_value<'i, 'a>(
|
||||
index: &'i Index,
|
||||
wtxn: &'a mut heed::RwTxn,
|
||||
fn remove_docids_from_field_id_docid_facet_value(
|
||||
index: &Index,
|
||||
wtxn: &mut heed::RwTxn,
|
||||
facet_type: FacetType,
|
||||
field_id: FieldId,
|
||||
to_remove: &RoaringBitmap,
|
||||
|
|
|
@ -157,9 +157,9 @@ impl FacetsUpdateIncrementalInner {
|
|||
///
|
||||
/// ## Return
|
||||
/// See documentation of `insert_in_level`
|
||||
fn insert_in_level_0<'t>(
|
||||
fn insert_in_level_0(
|
||||
&self,
|
||||
txn: &'t mut RwTxn,
|
||||
txn: &mut RwTxn,
|
||||
field_id: u16,
|
||||
facet_value: &[u8],
|
||||
docids: &RoaringBitmap,
|
||||
|
@ -211,9 +211,9 @@ impl FacetsUpdateIncrementalInner {
|
|||
/// - `InsertionResult::Insert` means that inserting the `facet_value` into the `level` resulted
|
||||
/// in the addition of a new key in that level, and that therefore the number of children
|
||||
/// of the parent node should be incremented.
|
||||
fn insert_in_level<'t>(
|
||||
fn insert_in_level(
|
||||
&self,
|
||||
txn: &'t mut RwTxn,
|
||||
txn: &mut RwTxn,
|
||||
field_id: u16,
|
||||
level: u8,
|
||||
facet_value: &[u8],
|
||||
|
@ -348,9 +348,9 @@ impl FacetsUpdateIncrementalInner {
|
|||
}
|
||||
|
||||
/// Insert the given facet value and corresponding document ids in the database.
|
||||
pub fn insert<'t>(
|
||||
pub fn insert(
|
||||
&self,
|
||||
txn: &'t mut RwTxn,
|
||||
txn: &mut RwTxn,
|
||||
field_id: u16,
|
||||
facet_value: &[u8],
|
||||
docids: &RoaringBitmap,
|
||||
|
@ -470,9 +470,9 @@ impl FacetsUpdateIncrementalInner {
|
|||
/// in level 1, the key with the left bound `3` had to be changed to the next facet value (e.g. 4).
|
||||
/// In that case `DeletionResult::Reduce` is returned. The parent of the reduced key may need to adjust
|
||||
/// its left bound as well.
|
||||
fn delete_in_level<'t>(
|
||||
fn delete_in_level(
|
||||
&self,
|
||||
txn: &'t mut RwTxn,
|
||||
txn: &mut RwTxn,
|
||||
field_id: u16,
|
||||
level: u8,
|
||||
facet_value: &[u8],
|
||||
|
@ -529,9 +529,9 @@ impl FacetsUpdateIncrementalInner {
|
|||
}
|
||||
}
|
||||
|
||||
fn delete_in_level_0<'t>(
|
||||
fn delete_in_level_0(
|
||||
&self,
|
||||
txn: &'t mut RwTxn,
|
||||
txn: &mut RwTxn,
|
||||
field_id: u16,
|
||||
facet_value: &[u8],
|
||||
docids: &RoaringBitmap,
|
||||
|
@ -557,9 +557,9 @@ impl FacetsUpdateIncrementalInner {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn delete<'t>(
|
||||
pub fn delete(
|
||||
&self,
|
||||
txn: &'t mut RwTxn,
|
||||
txn: &mut RwTxn,
|
||||
field_id: u16,
|
||||
facet_value: &[u8],
|
||||
docids: &RoaringBitmap,
|
||||
|
|
|
@ -98,7 +98,12 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
|||
// If the settings specifies that a _geo field must be used therefore we must check the
|
||||
// validity of it in all the documents of this batch and this is when we return `Some`.
|
||||
let geo_field_id = match documents_batch_index.id("_geo") {
|
||||
Some(geo_field_id) if index.sortable_fields(rtxn)?.contains("_geo") => Some(geo_field_id),
|
||||
Some(geo_field_id)
|
||||
if index.sortable_fields(rtxn)?.contains("_geo")
|
||||
|| index.filterable_fields(rtxn)?.contains("_geo") =>
|
||||
{
|
||||
Some(geo_field_id)
|
||||
}
|
||||
_otherwise => None,
|
||||
};
|
||||
|
||||
|
@ -367,11 +372,17 @@ pub fn extract_finite_float_from_value(value: Value) -> StdResult<f64, Value> {
|
|||
|
||||
pub fn validate_geo_from_json(id: &DocumentId, bytes: &[u8]) -> Result<StdResult<(), GeoError>> {
|
||||
use GeoError::*;
|
||||
let debug_id = || Value::from(id.debug());
|
||||
let debug_id = || {
|
||||
serde_json::from_slice(id.value().as_bytes()).unwrap_or_else(|_| Value::from(id.debug()))
|
||||
};
|
||||
match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
|
||||
Value::Object(mut object) => match (object.remove("lat"), object.remove("lng")) {
|
||||
(Some(lat), Some(lng)) => {
|
||||
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
|
||||
(Ok(_), Ok(_)) if !object.is_empty() => Ok(Err(UnexpectedExtraFields {
|
||||
document_id: debug_id(),
|
||||
value: object.into(),
|
||||
})),
|
||||
(Ok(_), Ok(_)) => Ok(Ok(())),
|
||||
(Err(value), Ok(_)) => Ok(Err(BadLatitude { document_id: debug_id(), value })),
|
||||
(Ok(_), Err(value)) => Ok(Err(BadLongitude { document_id: debug_id(), value })),
|
||||
|
@ -384,6 +395,7 @@ pub fn validate_geo_from_json(id: &DocumentId, bytes: &[u8]) -> Result<StdResult
|
|||
(Some(_), None) => Ok(Err(MissingLongitude { document_id: debug_id() })),
|
||||
(None, None) => Ok(Err(MissingLatitudeAndLongitude { document_id: debug_id() })),
|
||||
},
|
||||
Value::Null => Ok(Ok(())),
|
||||
value => Ok(Err(NotAnObject { document_id: debug_id(), value })),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
|||
} else if lat.is_some() && lng.is_none() {
|
||||
return Err(GeoError::MissingLongitude { document_id: document_id() })?;
|
||||
}
|
||||
// else => the _geo object was `null`, there is nothing to do
|
||||
}
|
||||
|
||||
writer_into_reader(writer)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::borrow::Cow;
|
||||
use std::fs::File;
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::io::{self, Seek};
|
||||
use std::time::Instant;
|
||||
|
||||
use grenad::{CompressionType, Sorter};
|
||||
|
@ -66,7 +66,7 @@ pub fn sorter_into_reader(
|
|||
|
||||
pub fn writer_into_reader(writer: grenad::Writer<File>) -> Result<grenad::Reader<File>> {
|
||||
let mut file = writer.into_inner()?;
|
||||
file.seek(SeekFrom::Start(0))?;
|
||||
file.rewind()?;
|
||||
grenad::Reader::new(file).map_err(Into::into)
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ use roaring::RoaringBitmap;
|
|||
|
||||
use super::read_u32_ne_bytes;
|
||||
use crate::heed_codec::CboRoaringBitmapCodec;
|
||||
use crate::update::index_documents::transform::Operation;
|
||||
use crate::Result;
|
||||
|
||||
pub type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>>;
|
||||
|
@ -57,21 +58,6 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<
|
|||
Ok(obkvs.last().unwrap().clone())
|
||||
}
|
||||
|
||||
/// Merge all the obks in the order we see them.
|
||||
pub fn merge_obkvs<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
|
||||
Ok(obkvs
|
||||
.iter()
|
||||
.cloned()
|
||||
.reduce(|acc, current| {
|
||||
let first = obkv::KvReader::new(&acc);
|
||||
let second = obkv::KvReader::new(¤t);
|
||||
let mut buffer = Vec::new();
|
||||
merge_two_obkvs(first, second, &mut buffer);
|
||||
Cow::from(buffer)
|
||||
})
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffer: &mut Vec<u8>) {
|
||||
use itertools::merge_join_by;
|
||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||
|
@ -88,6 +74,41 @@ pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffe
|
|||
writer.finish().unwrap();
|
||||
}
|
||||
|
||||
/// Merge all the obks in the order we see them.
|
||||
pub fn merge_obkvs_and_operations<'a>(
|
||||
_key: &[u8],
|
||||
obkvs: &[Cow<'a, [u8]>],
|
||||
) -> Result<Cow<'a, [u8]>> {
|
||||
// [add, add, delete, add, add]
|
||||
// we can ignore everything that happened before the last delete.
|
||||
let starting_position =
|
||||
obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0);
|
||||
|
||||
// [add, add, delete]
|
||||
// if the last operation was a deletion then we simply return the deletion
|
||||
if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8
|
||||
{
|
||||
return Ok(obkvs[obkvs.len() - 1].clone());
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
// (add, add, delete) [add, add]
|
||||
// in the other case, no deletion will be encountered during the merge
|
||||
let mut ret =
|
||||
obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| {
|
||||
let first = obkv::KvReader::new(&acc);
|
||||
let second = obkv::KvReader::new(¤t[1..]);
|
||||
merge_two_obkvs(first, second, &mut buffer);
|
||||
|
||||
// we want the result of the merge into our accumulator
|
||||
std::mem::swap(&mut acc, &mut buffer);
|
||||
acc
|
||||
});
|
||||
|
||||
ret.insert(0, Operation::Addition as u8);
|
||||
Ok(Cow::from(ret))
|
||||
}
|
||||
|
||||
pub fn merge_cbo_roaring_bitmaps<'a>(
|
||||
_key: &[u8],
|
||||
values: &[Cow<'a, [u8]>],
|
||||
|
|
|
@ -13,9 +13,9 @@ pub use grenad_helpers::{
|
|||
GrenadParameters, MergeableReader,
|
||||
};
|
||||
pub use merge_functions::{
|
||||
concat_u32s_array, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_obkvs,
|
||||
merge_roaring_bitmaps, merge_two_obkvs, roaring_bitmap_from_u32s_array,
|
||||
serialize_roaring_bitmap, MergeFn,
|
||||
concat_u32s_array, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps,
|
||||
merge_obkvs_and_operations, merge_roaring_bitmaps, merge_two_obkvs,
|
||||
roaring_bitmap_from_u32s_array, serialize_roaring_bitmap, MergeFn,
|
||||
};
|
||||
|
||||
use crate::MAX_WORD_LENGTH;
|
||||
|
|
|
@ -79,6 +79,7 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> {
|
|||
progress: FP,
|
||||
should_abort: FA,
|
||||
added_documents: u64,
|
||||
deleted_documents: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
|
@ -122,6 +123,7 @@ where
|
|||
wtxn,
|
||||
index,
|
||||
added_documents: 0,
|
||||
deleted_documents: 0,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -166,6 +168,30 @@ where
|
|||
Ok((self, Ok(indexed_documents)))
|
||||
}
|
||||
|
||||
/// Remove a batch of documents from the current builder.
|
||||
///
|
||||
/// Returns the number of documents deleted from the builder.
|
||||
pub fn remove_documents(
|
||||
mut self,
|
||||
to_delete: Vec<String>,
|
||||
) -> Result<(Self, StdResult<u64, UserError>)> {
|
||||
// Early return when there is no document to add
|
||||
if to_delete.is_empty() {
|
||||
return Ok((self, Ok(0)));
|
||||
}
|
||||
|
||||
let deleted_documents = self
|
||||
.transform
|
||||
.as_mut()
|
||||
.expect("Invalid document deletion state")
|
||||
.remove_documents(to_delete, self.wtxn, &self.should_abort)?
|
||||
as u64;
|
||||
|
||||
self.deleted_documents += deleted_documents;
|
||||
|
||||
Ok((self, Ok(deleted_documents)))
|
||||
}
|
||||
|
||||
#[logging_timer::time("IndexDocuments::{}")]
|
||||
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
|
||||
if self.added_documents == 0 {
|
||||
|
@ -965,34 +991,6 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn index_all_flavour_of_geo() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::ReplaceDocuments;
|
||||
|
||||
index
|
||||
.update_settings(|settings| {
|
||||
settings.set_filterable_fields(hashset!(S("_geo")));
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
index
|
||||
.add_documents(documents!([
|
||||
{ "id": 0, "_geo": { "lat": 31, "lng": [42] } },
|
||||
{ "id": 1, "_geo": { "lat": "31" }, "_geo.lng": 42 },
|
||||
{ "id": 2, "_geo": { "lng": "42" }, "_geo.lat": "31" },
|
||||
{ "id": 3, "_geo.lat": 31, "_geo.lng": "42" },
|
||||
]))
|
||||
.unwrap();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
|
||||
let mut search = crate::Search::new(&rtxn, &index);
|
||||
search.filter(crate::Filter::from_str("_geoRadius(31, 42, 0.000001)").unwrap().unwrap());
|
||||
let crate::SearchResult { documents_ids, .. } = search.execute().unwrap();
|
||||
assert_eq!(documents_ids, vec![0, 1, 2, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn geo_error() {
|
||||
let mut index = TempIndex::new();
|
||||
|
@ -1934,4 +1932,328 @@ mod tests {
|
|||
let expected_cj_cmn_docids = [1, 5].iter().collect();
|
||||
assert_eq!(cj_cmn_docs, expected_cj_cmn_docids);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_and_delete_documents_in_single_transform() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "bob", "age": 20 } },
|
||||
{ "id": 3, "name": "jean", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"1");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 3,
|
||||
number_of_documents: 2,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":1,"doggo":"kevin"}
|
||||
{"id":3,"name":"jean","age":25}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_update_and_delete_documents_in_single_transform() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "bob", "age": 20 } },
|
||||
{ "id": 3, "name": "jean", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 2, "catto": "jorts" },
|
||||
{ "id": 3, "legs": 4 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"2");
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 5,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":3,"name":"jean","age":25,"legs":4}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_document_and_in_another_transform_update_and_delete_documents() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "bob", "age": 20 } },
|
||||
{ "id": 3, "name": "jean", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 3,
|
||||
number_of_documents: 3,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":1,"doggo":"kevin"}
|
||||
{"id":2,"doggo":{"name":"bob","age":20}}
|
||||
{"id":3,"name":"jean","age":25}
|
||||
"###);
|
||||
|
||||
// A first batch of documents has been inserted
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 2, "catto": "jorts" },
|
||||
{ "id": 3, "legs": 4 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"2");
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 2,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":3,"name":"jean","age":25,"legs":4}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_document_and_then_add_documents_in_the_same_transform() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"0");
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 2, "doggo": { "name": "jean", "age": 20 } },
|
||||
{ "id": 3, "name": "bob", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 2,
|
||||
number_of_documents: 2,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":2,"doggo":{"name":"jean","age":20}}
|
||||
{"id":3,"name":"bob","age":25}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delete_the_same_document_multiple_time() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (builder, removed) =
|
||||
builder.remove_documents(vec![S("1"), S("2"), S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"0");
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
{ "id": 2, "doggo": { "name": "jean", "age": 20 } },
|
||||
{ "id": 3, "name": "bob", "age": 25 },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"3");
|
||||
|
||||
let (builder, removed) =
|
||||
builder.remove_documents(vec![S("1"), S("2"), S("1"), S("2")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"2");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 3,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":3,"name":"bob","age":25}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_document_and_in_another_transform_delete_the_document_then_add_it_again() {
|
||||
let mut index = TempIndex::new();
|
||||
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "doggo": "kevin" },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"1");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 1,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":1,"doggo":"kevin"}
|
||||
"###);
|
||||
|
||||
// A first batch of documents has been inserted
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let builder = IndexDocuments::new(
|
||||
&mut wtxn,
|
||||
&index,
|
||||
&index.indexer_config,
|
||||
index.index_documents_config.clone(),
|
||||
|_| (),
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (builder, removed) = builder.remove_documents(vec![S("1")]).unwrap();
|
||||
insta::assert_display_snapshot!(removed.unwrap(), @"1");
|
||||
|
||||
let documents = documents!([
|
||||
{ "id": 1, "catto": "jorts" },
|
||||
]);
|
||||
let (builder, added) = builder.add_documents(documents).unwrap();
|
||||
insta::assert_display_snapshot!(added.unwrap(), @"1");
|
||||
|
||||
let addition = builder.execute().unwrap();
|
||||
insta::assert_debug_snapshot!(addition, @r###"
|
||||
DocumentAdditionResult {
|
||||
indexed_documents: 1,
|
||||
number_of_documents: 1,
|
||||
}
|
||||
"###);
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
db_snap!(index, documents, @r###"
|
||||
{"id":1,"catto":"jorts"}
|
||||
"###);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::borrow::Cow;
|
|||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::io::{Read, Seek};
|
||||
|
||||
use fxhash::FxHashMap;
|
||||
use heed::RoTxn;
|
||||
|
@ -12,7 +12,9 @@ use roaring::RoaringBitmap;
|
|||
use serde_json::Value;
|
||||
use smartstring::SmartString;
|
||||
|
||||
use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn};
|
||||
use super::helpers::{
|
||||
create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn,
|
||||
};
|
||||
use super::{IndexDocumentsMethod, IndexerConfig};
|
||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
||||
use crate::error::{Error, InternalError, UserError};
|
||||
|
@ -50,8 +52,12 @@ pub struct Transform<'a, 'i> {
|
|||
pub index_documents_method: IndexDocumentsMethod,
|
||||
available_documents_ids: AvailableDocumentsIds,
|
||||
|
||||
// Both grenad follows the same format:
|
||||
// key | value
|
||||
// u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored
|
||||
original_sorter: grenad::Sorter<MergeFn>,
|
||||
flattened_sorter: grenad::Sorter<MergeFn>,
|
||||
|
||||
replaced_documents_ids: RoaringBitmap,
|
||||
new_documents_ids: RoaringBitmap,
|
||||
// To increase the cache locality and decrease the heap usage we use compact smartstring.
|
||||
|
@ -59,6 +65,14 @@ pub struct Transform<'a, 'i> {
|
|||
documents_count: usize,
|
||||
}
|
||||
|
||||
/// This enum is specific to the grenad sorter stored in the transform.
|
||||
/// It's used as the first byte of the grenads and tells you if the document id was an addition or a deletion.
|
||||
#[repr(u8)]
|
||||
pub enum Operation {
|
||||
Addition,
|
||||
Deletion,
|
||||
}
|
||||
|
||||
/// Create a mapping between the field ids found in the document batch and the one that were
|
||||
/// already present in the index.
|
||||
///
|
||||
|
@ -94,7 +108,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
// with the same user id must be merged or fully replaced in the same batch.
|
||||
let merge_function = match index_documents_method {
|
||||
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
|
||||
IndexDocumentsMethod::UpdateDocuments => merge_obkvs,
|
||||
IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations,
|
||||
};
|
||||
|
||||
// We initialize the sorter with the user indexing settings.
|
||||
|
@ -151,9 +165,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
FA: Fn() -> bool + Sync,
|
||||
{
|
||||
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
|
||||
|
||||
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
||||
|
||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
||||
|
||||
let primary_key = cursor.primary_key().to_string();
|
||||
|
@ -161,6 +173,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
||||
|
||||
let mut obkv_buffer = Vec::new();
|
||||
let mut document_sorter_buffer = Vec::new();
|
||||
let mut documents_count = 0;
|
||||
let mut docid_buffer: Vec<u8> = Vec::new();
|
||||
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
||||
|
@ -212,10 +225,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
Entry::Occupied(entry) => *entry.get() as u32,
|
||||
Entry::Vacant(entry) => {
|
||||
// If the document was already in the db we mark it as a replaced document.
|
||||
// It'll be deleted later. We keep its original docid to insert it in the grenad.
|
||||
// It'll be deleted later.
|
||||
if let Some(docid) = external_documents_ids.get(entry.key()) {
|
||||
self.replaced_documents_ids.insert(docid);
|
||||
original_docid = Some(docid);
|
||||
// If it was already in the list of replaced documents it means it was deleted
|
||||
// by the remove_document method. We should starts as if it never existed.
|
||||
if self.replaced_documents_ids.insert(docid) {
|
||||
original_docid = Some(docid);
|
||||
}
|
||||
}
|
||||
let docid = self
|
||||
.available_documents_ids
|
||||
|
@ -248,26 +264,46 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
skip_insertion = true;
|
||||
} else {
|
||||
// we associate the base document with the new key, everything will get merged later.
|
||||
self.original_sorter.insert(docid.to_be_bytes(), base_obkv)?;
|
||||
document_sorter_buffer.clear();
|
||||
document_sorter_buffer.push(Operation::Addition as u8);
|
||||
document_sorter_buffer.extend_from_slice(base_obkv);
|
||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
|
||||
Some(buffer) => {
|
||||
self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?
|
||||
Some(flattened_obkv) => {
|
||||
// we recreate our buffer with the flattened documents
|
||||
document_sorter_buffer.clear();
|
||||
document_sorter_buffer.push(Operation::Addition as u8);
|
||||
document_sorter_buffer.extend_from_slice(&flattened_obkv);
|
||||
self.flattened_sorter
|
||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
||||
}
|
||||
None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?,
|
||||
None => self
|
||||
.flattened_sorter
|
||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !skip_insertion {
|
||||
self.new_documents_ids.insert(docid);
|
||||
|
||||
document_sorter_buffer.clear();
|
||||
document_sorter_buffer.push(Operation::Addition as u8);
|
||||
document_sorter_buffer.extend_from_slice(&obkv_buffer);
|
||||
// We use the extracted/generated user id as the key for this document.
|
||||
self.original_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?;
|
||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||
|
||||
match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? {
|
||||
Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?,
|
||||
None => {
|
||||
self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?
|
||||
Some(flattened_obkv) => {
|
||||
document_sorter_buffer.clear();
|
||||
document_sorter_buffer.push(Operation::Addition as u8);
|
||||
document_sorter_buffer.extend_from_slice(&flattened_obkv);
|
||||
self.flattened_sorter
|
||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
||||
}
|
||||
None => self
|
||||
.flattened_sorter
|
||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
|
||||
}
|
||||
}
|
||||
documents_count += 1;
|
||||
|
@ -293,6 +329,73 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
Ok(documents_count)
|
||||
}
|
||||
|
||||
/// The counter part of `read_documents` that removes documents either from the transform or the database.
|
||||
/// It can be called before, after or in between two calls of the `read_documents`.
|
||||
///
|
||||
/// It needs to update all the internal datastructure in the transform.
|
||||
/// - If the document is coming from the database -> it's marked as a to_delete document
|
||||
/// - If the document to remove was inserted by the `read_documents` method before AND was present in the db,
|
||||
/// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it.
|
||||
/// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db,
|
||||
/// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids.
|
||||
/// - If the document to remove was not present in either the db or the transform we do nothing.
|
||||
pub fn remove_documents<FA>(
|
||||
&mut self,
|
||||
mut to_remove: Vec<String>,
|
||||
wtxn: &mut heed::RwTxn,
|
||||
should_abort: FA,
|
||||
) -> Result<usize>
|
||||
where
|
||||
FA: Fn() -> bool + Sync,
|
||||
{
|
||||
// there may be duplicates in the documents to remove.
|
||||
to_remove.sort_unstable();
|
||||
to_remove.dedup();
|
||||
|
||||
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
||||
|
||||
let mut documents_deleted = 0;
|
||||
for to_remove in to_remove {
|
||||
if should_abort() {
|
||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||
}
|
||||
|
||||
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
|
||||
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
||||
Entry::Occupied(entry) => {
|
||||
let doc_id = *entry.get() as u32;
|
||||
self.original_sorter
|
||||
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
|
||||
self.flattened_sorter
|
||||
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
|
||||
|
||||
// we must NOT update the list of replaced_documents_ids
|
||||
// Either:
|
||||
// 1. It's already in it and there is nothing to do
|
||||
// 2. It wasn't in it because the document was created by a previous batch and since
|
||||
// we're removing it there is nothing to do.
|
||||
self.new_documents_ids.remove(doc_id);
|
||||
entry.remove_entry();
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
// If the document was already in the db we mark it as a `to_delete` document.
|
||||
// It'll be deleted later. We don't need to push anything to the sorters.
|
||||
if let Some(docid) = external_documents_ids.get(entry.key()) {
|
||||
self.replaced_documents_ids.insert(docid);
|
||||
} else {
|
||||
// if the document is nowehere to be found, there is nothing to do and we must NOT
|
||||
// increment the count of documents_deleted
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
documents_deleted += 1;
|
||||
}
|
||||
|
||||
Ok(documents_deleted)
|
||||
}
|
||||
|
||||
// Flatten a document from the fields ids map contained in self and insert the new
|
||||
// created fields. Returns `None` if the document doesn't need to be flattened.
|
||||
fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {
|
||||
|
@ -487,6 +590,11 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
let mut documents_count = 0;
|
||||
|
||||
while let Some((key, val)) = iter.next()? {
|
||||
if val[0] == Operation::Deletion as u8 {
|
||||
continue;
|
||||
}
|
||||
let val = &val[1..];
|
||||
|
||||
// send a callback to show at which step we are
|
||||
documents_count += 1;
|
||||
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
||||
|
@ -510,7 +618,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
|
||||
let mut original_documents = writer.into_inner()?;
|
||||
// We then extract the file and reset the seek to be able to read it again.
|
||||
original_documents.seek(SeekFrom::Start(0))?;
|
||||
original_documents.rewind()?;
|
||||
|
||||
// We create a final writer to write the new documents in order from the sorter.
|
||||
let mut writer = create_writer(
|
||||
|
@ -518,11 +626,20 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
self.indexer_settings.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
// Once we have written all the documents into the final sorter, we write the documents
|
||||
// into this writer, extract the file and reset the seek to be able to read it again.
|
||||
self.flattened_sorter.write_into_stream_writer(&mut writer)?;
|
||||
|
||||
// Once we have written all the documents into the final sorter, we write the nested documents
|
||||
// into this writer.
|
||||
// We get rids of the `Operation` byte and skip the deleted documents as well.
|
||||
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
|
||||
while let Some((key, val)) = iter.next()? {
|
||||
if val[0] == Operation::Deletion as u8 {
|
||||
continue;
|
||||
}
|
||||
let val = &val[1..];
|
||||
writer.insert(key, val)?;
|
||||
}
|
||||
let mut flattened_documents = writer.into_inner()?;
|
||||
flattened_documents.seek(SeekFrom::Start(0))?;
|
||||
flattened_documents.rewind()?;
|
||||
|
||||
let mut new_external_documents_ids_builder: Vec<_> =
|
||||
self.new_external_documents_ids_builder.into_iter().collect();
|
||||
|
@ -650,10 +767,10 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||
// Once we have written all the documents, we extract
|
||||
// the file and reset the seek to be able to read it again.
|
||||
let mut original_documents = original_writer.into_inner()?;
|
||||
original_documents.seek(SeekFrom::Start(0))?;
|
||||
original_documents.rewind()?;
|
||||
|
||||
let mut flattened_documents = flattened_writer.into_inner()?;
|
||||
flattened_documents.seek(SeekFrom::Start(0))?;
|
||||
flattened_documents.rewind()?;
|
||||
|
||||
let output = TransformOutput {
|
||||
primary_key,
|
||||
|
@ -701,3 +818,45 @@ impl TransformOutput {
|
|||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn merge_obkvs() {
|
||||
let mut doc_0 = Vec::new();
|
||||
let mut kv_writer = KvWriter::new(&mut doc_0);
|
||||
kv_writer.insert(0_u8, [0]).unwrap();
|
||||
kv_writer.finish().unwrap();
|
||||
doc_0.insert(0, Operation::Addition as u8);
|
||||
|
||||
let ret = merge_obkvs_and_operations(&[], &[Cow::from(doc_0.as_slice())]).unwrap();
|
||||
assert_eq!(*ret, doc_0);
|
||||
|
||||
let ret = merge_obkvs_and_operations(
|
||||
&[],
|
||||
&[Cow::from([Operation::Deletion as u8].as_slice()), Cow::from(doc_0.as_slice())],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(*ret, doc_0);
|
||||
|
||||
let ret = merge_obkvs_and_operations(
|
||||
&[],
|
||||
&[Cow::from(doc_0.as_slice()), Cow::from([Operation::Deletion as u8].as_slice())],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(*ret, [Operation::Deletion as u8]);
|
||||
|
||||
let ret = merge_obkvs_and_operations(
|
||||
&[],
|
||||
&[
|
||||
Cow::from([Operation::Addition as u8, 1].as_slice()),
|
||||
Cow::from([Operation::Deletion as u8].as_slice()),
|
||||
Cow::from(doc_0.as_slice()),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(*ret, doc_0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
|
|||
use std::result::Result as StdResult;
|
||||
|
||||
use charabia::{Tokenizer, TokenizerBuilder};
|
||||
use deserr::{DeserializeError, DeserializeFromValue};
|
||||
use deserr::{DeserializeError, Deserr};
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use time::OffsetDateTime;
|
||||
|
@ -23,9 +23,9 @@ pub enum Setting<T> {
|
|||
NotSet,
|
||||
}
|
||||
|
||||
impl<T, E> DeserializeFromValue<E> for Setting<T>
|
||||
impl<T, E> Deserr<E> for Setting<T>
|
||||
where
|
||||
T: DeserializeFromValue<E>,
|
||||
T: Deserr<E>,
|
||||
E: DeserializeError,
|
||||
{
|
||||
fn deserialize_from_value<V: deserr::IntoValue>(
|
||||
|
@ -37,9 +37,6 @@ where
|
|||
_ => T::deserialize_from_value(value, location).map(Setting::Set),
|
||||
}
|
||||
}
|
||||
fn default() -> Option<Self> {
|
||||
Some(Self::NotSet)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for Setting<T> {
|
||||
|
|
|
@ -140,16 +140,20 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> {
|
|||
|
||||
// We remove all the entries that are no more required in this word prefix position
|
||||
// docids database.
|
||||
let mut iter =
|
||||
self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data();
|
||||
while let Some(((prefix, _), _)) = iter.next().transpose()? {
|
||||
if del_prefix_fst_words.contains(prefix.as_bytes()) {
|
||||
unsafe { iter.del_current()? };
|
||||
// We also avoid iterating over the whole `word_prefix_position_docids` database if we know in
|
||||
// advance that the `if del_prefix_fst_words.contains(prefix.as_bytes()) {` condition below
|
||||
// will always be false (i.e. if `del_prefix_fst_words` is empty).
|
||||
if !del_prefix_fst_words.is_empty() {
|
||||
let mut iter =
|
||||
self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data();
|
||||
while let Some(((prefix, _), _)) = iter.next().transpose()? {
|
||||
if del_prefix_fst_words.contains(prefix.as_bytes()) {
|
||||
unsafe { iter.del_current()? };
|
||||
}
|
||||
}
|
||||
drop(iter);
|
||||
}
|
||||
|
||||
drop(iter);
|
||||
|
||||
// We finally write all the word prefix position docids into the LMDB database.
|
||||
sorter_into_lmdb_database(
|
||||
self.wtxn,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue