mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-05 12:38:55 +01:00
Merge #187
187: Fix fields distribution after documents merge r=Kerollmops a=shekhirin Resolves https://github.com/meilisearch/milli/issues/174 The problem was with calculation of fields distribution before the merge in `output_from_sorter()`. So if you'd import two documents with the same primary key value, fields distribution will count it as two documents, while `output_from_sorter()` will merge these documents into one. --- ```console ➜ Downloads cat short_movies.json [ {"id":"47474","title":"The Serpent's Egg","poster":"https://image.tmdb.org/t/p/w500/n7z0doFkXHcvo8QQWHLFnkEPXRU.jpg","overview":"The Serpent's Egg follows a week in the life of Abel Rosenberg, an out-of-work American circus acrobat living in poverty-stricken Berlin following Germany's defeat in World War I.","release_date":246844800,"genres":["Thriller","Drama","Mystery"]}, {"id":"47474","title":"The Serpent's Egg","poster":"https://image.tmdb.org/t/p/w500/n7z0doFkXHcvo8QQWHLFnkEPXRU.jpg","overview":"The Serpent's Egg follows a week in the life of Abel Rosenberg, an out-of-work American circus acrobat living in poverty-stricken Berlin following Germany's defeat in World War I.","release_date":246844800,"genres":["Thriller","Drama","Mystery"]} ] ➜ Downloads curl -X POST -H "Content-Type: text/json" --data-binary @short_movies.json 127.0.0.1:7700/indexes/movies/documents {"updateId":0} ``` ## Before ```console ➜ Downloads curl -s 127.0.0.1:7700/indexes/movies/stats | jq { "numberOfDocuments": 1, "isIndexing": false, "fieldsDistribution": { "release_date": 2, "poster": 2, "title": 2, "overview": 2, "genres": 2, "id": 2 } } ``` ## After ```console ➜ Downloads curl -s 127.0.0.1:7700/indexes/movies/stats | jq { "numberOfDocuments": 1, "isIndexing": false, "fieldsDistribution": { "poster": 1, "release_date": 1, "title": 1, "genres": 1, "id": 1, "overview": 1 } } ``` Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
commit
7e63e32960
@ -566,11 +566,11 @@ pub(crate) mod tests {
|
|||||||
|
|
||||||
let mut wtxn = index.write_txn().unwrap();
|
let mut wtxn = index.write_txn().unwrap();
|
||||||
let content = &br#"[
|
let content = &br#"[
|
||||||
{ "name": "kevin" },
|
{ "id": 1, "name": "kevin" },
|
||||||
{ "name": "bob", "age": 20 }
|
{ "id": 2, "name": "bob", "age": 20 },
|
||||||
|
{ "id": 2, "name": "bob", "age": 20 }
|
||||||
]"#[..];
|
]"#[..];
|
||||||
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
|
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
|
||||||
builder.enable_autogenerate_docids();
|
|
||||||
builder.update_format(UpdateFormat::Json);
|
builder.update_format(UpdateFormat::Json);
|
||||||
builder.execute(content, |_, _| ()).unwrap();
|
builder.execute(content, |_, _| ()).unwrap();
|
||||||
wtxn.commit().unwrap();
|
wtxn.commit().unwrap();
|
||||||
@ -579,6 +579,7 @@ pub(crate) mod tests {
|
|||||||
|
|
||||||
let fields_distribution = index.fields_distribution(&rtxn).unwrap();
|
let fields_distribution = index.fields_distribution(&rtxn).unwrap();
|
||||||
assert_eq!(fields_distribution, hashmap! {
|
assert_eq!(fields_distribution, hashmap! {
|
||||||
|
"id".to_string() => 2,
|
||||||
"name".to_string() => 2,
|
"name".to_string() => 2,
|
||||||
"age".to_string() => 1,
|
"age".to_string() => 1,
|
||||||
});
|
});
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{Read, Seek, SeekFrom};
|
use std::io::{Read, Seek, SeekFrom};
|
||||||
use std::iter::Peekable;
|
use std::iter::Peekable;
|
||||||
@ -76,7 +75,6 @@ impl Transform<'_, '_> {
|
|||||||
F: Fn(UpdateIndexingStep) + Sync,
|
F: Fn(UpdateIndexingStep) + Sync,
|
||||||
{
|
{
|
||||||
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
|
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
|
||||||
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
|
|
||||||
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
|
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
|
||||||
|
|
||||||
// Deserialize the whole batch of documents in memory.
|
// Deserialize the whole batch of documents in memory.
|
||||||
@ -106,7 +104,7 @@ impl Transform<'_, '_> {
|
|||||||
return Ok(TransformOutput {
|
return Ok(TransformOutput {
|
||||||
primary_key,
|
primary_key,
|
||||||
fields_ids_map,
|
fields_ids_map,
|
||||||
fields_distribution,
|
fields_distribution: self.index.fields_distribution(self.rtxn)?,
|
||||||
external_documents_ids: ExternalDocumentsIds::default(),
|
external_documents_ids: ExternalDocumentsIds::default(),
|
||||||
new_documents_ids: RoaringBitmap::new(),
|
new_documents_ids: RoaringBitmap::new(),
|
||||||
replaced_documents_ids: RoaringBitmap::new(),
|
replaced_documents_ids: RoaringBitmap::new(),
|
||||||
@ -137,8 +135,6 @@ impl Transform<'_, '_> {
|
|||||||
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
||||||
let mut documents_count = 0;
|
let mut documents_count = 0;
|
||||||
|
|
||||||
let mut fields_ids_distribution = HashMap::new();
|
|
||||||
|
|
||||||
for result in documents {
|
for result in documents {
|
||||||
let document = result?;
|
let document = result?;
|
||||||
|
|
||||||
@ -153,9 +149,7 @@ impl Transform<'_, '_> {
|
|||||||
|
|
||||||
// We prepare the fields ids map with the documents keys.
|
// We prepare the fields ids map with the documents keys.
|
||||||
for (key, _value) in &document {
|
for (key, _value) in &document {
|
||||||
let field_id = fields_ids_map.insert(&key).context("field id limit reached")?;
|
fields_ids_map.insert(&key).context("field id limit reached")?;
|
||||||
|
|
||||||
*fields_ids_distribution.entry(field_id).or_insert(0) += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We retrieve the user id from the document based on the primary key name,
|
// We retrieve the user id from the document based on the primary key name,
|
||||||
@ -198,11 +192,6 @@ impl Transform<'_, '_> {
|
|||||||
documents_count += 1;
|
documents_count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (field_id, count) in fields_ids_distribution {
|
|
||||||
let field_name = fields_ids_map.name(field_id).unwrap();
|
|
||||||
*fields_distribution.entry(field_name.to_string()).or_default() += count;
|
|
||||||
}
|
|
||||||
|
|
||||||
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
|
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
|
||||||
documents_seen: documents_count,
|
documents_seen: documents_count,
|
||||||
});
|
});
|
||||||
@ -213,7 +202,6 @@ impl Transform<'_, '_> {
|
|||||||
sorter,
|
sorter,
|
||||||
primary_key,
|
primary_key,
|
||||||
fields_ids_map,
|
fields_ids_map,
|
||||||
fields_distribution,
|
|
||||||
documents_count,
|
documents_count,
|
||||||
external_documents_ids,
|
external_documents_ids,
|
||||||
progress_callback,
|
progress_callback,
|
||||||
@ -226,7 +214,6 @@ impl Transform<'_, '_> {
|
|||||||
F: Fn(UpdateIndexingStep) + Sync,
|
F: Fn(UpdateIndexingStep) + Sync,
|
||||||
{
|
{
|
||||||
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
|
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
|
||||||
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
|
|
||||||
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
|
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
|
||||||
|
|
||||||
let mut csv = csv::Reader::from_reader(reader);
|
let mut csv = csv::Reader::from_reader(reader);
|
||||||
@ -284,8 +271,6 @@ impl Transform<'_, '_> {
|
|||||||
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
||||||
let mut documents_count = 0;
|
let mut documents_count = 0;
|
||||||
|
|
||||||
let mut fields_ids_distribution = HashMap::new();
|
|
||||||
|
|
||||||
let mut record = csv::StringRecord::new();
|
let mut record = csv::StringRecord::new();
|
||||||
while csv.read_record(&mut record)? {
|
while csv.read_record(&mut record)? {
|
||||||
obkv_buffer.clear();
|
obkv_buffer.clear();
|
||||||
@ -324,8 +309,6 @@ impl Transform<'_, '_> {
|
|||||||
json_buffer.clear();
|
json_buffer.clear();
|
||||||
serde_json::to_writer(&mut json_buffer, &field)?;
|
serde_json::to_writer(&mut json_buffer, &field)?;
|
||||||
writer.insert(*field_id, &json_buffer)?;
|
writer.insert(*field_id, &json_buffer)?;
|
||||||
|
|
||||||
*fields_ids_distribution.entry(*field_id).or_insert(0) += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We use the extracted/generated user id as the key for this document.
|
// We use the extracted/generated user id as the key for this document.
|
||||||
@ -333,11 +316,6 @@ impl Transform<'_, '_> {
|
|||||||
documents_count += 1;
|
documents_count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (field_id, count) in fields_ids_distribution {
|
|
||||||
let field_name = fields_ids_map.name(field_id).unwrap();
|
|
||||||
*fields_distribution.entry(field_name.to_string()).or_default() += count;
|
|
||||||
}
|
|
||||||
|
|
||||||
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
|
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
|
||||||
documents_seen: documents_count,
|
documents_seen: documents_count,
|
||||||
});
|
});
|
||||||
@ -352,7 +330,6 @@ impl Transform<'_, '_> {
|
|||||||
sorter,
|
sorter,
|
||||||
primary_key_name,
|
primary_key_name,
|
||||||
fields_ids_map,
|
fields_ids_map,
|
||||||
fields_distribution,
|
|
||||||
documents_count,
|
documents_count,
|
||||||
external_documents_ids,
|
external_documents_ids,
|
||||||
progress_callback,
|
progress_callback,
|
||||||
@ -367,7 +344,6 @@ impl Transform<'_, '_> {
|
|||||||
sorter: grenad::Sorter<MergeFn>,
|
sorter: grenad::Sorter<MergeFn>,
|
||||||
primary_key: String,
|
primary_key: String,
|
||||||
fields_ids_map: FieldsIdsMap,
|
fields_ids_map: FieldsIdsMap,
|
||||||
fields_distribution: FieldsDistribution,
|
|
||||||
approximate_number_of_documents: usize,
|
approximate_number_of_documents: usize,
|
||||||
mut external_documents_ids: ExternalDocumentsIds<'_>,
|
mut external_documents_ids: ExternalDocumentsIds<'_>,
|
||||||
progress_callback: F,
|
progress_callback: F,
|
||||||
@ -376,6 +352,7 @@ impl Transform<'_, '_> {
|
|||||||
F: Fn(UpdateIndexingStep) + Sync,
|
F: Fn(UpdateIndexingStep) + Sync,
|
||||||
{
|
{
|
||||||
let documents_ids = self.index.documents_ids(self.rtxn)?;
|
let documents_ids = self.index.documents_ids(self.rtxn)?;
|
||||||
|
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
|
||||||
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
|
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
|
||||||
|
|
||||||
// Once we have sort and deduplicated the documents we write them into a final file.
|
// Once we have sort and deduplicated the documents we write them into a final file.
|
||||||
@ -396,7 +373,6 @@ impl Transform<'_, '_> {
|
|||||||
let mut documents_count = 0;
|
let mut documents_count = 0;
|
||||||
let mut iter = sorter.into_iter()?;
|
let mut iter = sorter.into_iter()?;
|
||||||
while let Some((external_id, update_obkv)) = iter.next()? {
|
while let Some((external_id, update_obkv)) = iter.next()? {
|
||||||
|
|
||||||
if self.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
if self.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
||||||
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
||||||
documents_seen: documents_count,
|
documents_seen: documents_count,
|
||||||
@ -438,6 +414,12 @@ impl Transform<'_, '_> {
|
|||||||
// We insert the document under the documents ids map into the final file.
|
// We insert the document under the documents ids map into the final file.
|
||||||
final_sorter.insert(docid.to_be_bytes(), obkv)?;
|
final_sorter.insert(docid.to_be_bytes(), obkv)?;
|
||||||
documents_count += 1;
|
documents_count += 1;
|
||||||
|
|
||||||
|
let reader = obkv::KvReader::new(obkv);
|
||||||
|
for (field_id, _) in reader.iter() {
|
||||||
|
let field_name = fields_ids_map.name(field_id).unwrap();
|
||||||
|
*fields_distribution.entry(field_name.to_string()).or_default() += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
||||||
|
Loading…
Reference in New Issue
Block a user