fix(update): fields distribution after documents merge

This commit is contained in:
Alexey Shekhirin 2021-05-04 22:01:11 +03:00
parent 1207a058d0
commit f8d0f5265f
No known key found for this signature in database
GPG Key ID: AF9A26AA133B5B98
2 changed files with 13 additions and 30 deletions

View File

@ -566,11 +566,11 @@ pub(crate) mod tests {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &br#"[ let content = &br#"[
{ "name": "kevin" }, { "id": 1, "name": "kevin" },
{ "name": "bob", "age": 20 } { "id": 2, "name": "bob", "age": 20 },
{ "id": 2, "name": "bob", "age": 20 }
]"#[..]; ]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.enable_autogenerate_docids();
builder.update_format(UpdateFormat::Json); builder.update_format(UpdateFormat::Json);
builder.execute(content, |_, _| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -579,6 +579,7 @@ pub(crate) mod tests {
let fields_distribution = index.fields_distribution(&rtxn).unwrap(); let fields_distribution = index.fields_distribution(&rtxn).unwrap();
assert_eq!(fields_distribution, hashmap! { assert_eq!(fields_distribution, hashmap! {
"id".to_string() => 2,
"name".to_string() => 2, "name".to_string() => 2,
"age".to_string() => 1, "age".to_string() => 1,
}); });

View File

@ -1,5 +1,4 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io::{Read, Seek, SeekFrom}; use std::io::{Read, Seek, SeekFrom};
use std::iter::Peekable; use std::iter::Peekable;
@ -76,7 +75,6 @@ impl Transform<'_, '_> {
F: Fn(UpdateIndexingStep) + Sync, F: Fn(UpdateIndexingStep) + Sync,
{ {
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
// Deserialize the whole batch of documents in memory. // Deserialize the whole batch of documents in memory.
@ -106,7 +104,7 @@ impl Transform<'_, '_> {
return Ok(TransformOutput { return Ok(TransformOutput {
primary_key, primary_key,
fields_ids_map, fields_ids_map,
fields_distribution, fields_distribution: self.index.fields_distribution(self.rtxn)?,
external_documents_ids: ExternalDocumentsIds::default(), external_documents_ids: ExternalDocumentsIds::default(),
new_documents_ids: RoaringBitmap::new(), new_documents_ids: RoaringBitmap::new(),
replaced_documents_ids: RoaringBitmap::new(), replaced_documents_ids: RoaringBitmap::new(),
@ -137,8 +135,6 @@ impl Transform<'_, '_> {
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
let mut documents_count = 0; let mut documents_count = 0;
let mut fields_ids_distribution = HashMap::new();
for result in documents { for result in documents {
let document = result?; let document = result?;
@ -153,9 +149,7 @@ impl Transform<'_, '_> {
// We prepare the fields ids map with the documents keys. // We prepare the fields ids map with the documents keys.
for (key, _value) in &document { for (key, _value) in &document {
let field_id = fields_ids_map.insert(&key).context("field id limit reached")?; fields_ids_map.insert(&key).context("field id limit reached")?;
*fields_ids_distribution.entry(field_id).or_insert(0) += 1;
} }
// We retrieve the user id from the document based on the primary key name, // We retrieve the user id from the document based on the primary key name,
@ -198,11 +192,6 @@ impl Transform<'_, '_> {
documents_count += 1; documents_count += 1;
} }
for (field_id, count) in fields_ids_distribution {
let field_name = fields_ids_map.name(field_id).unwrap();
*fields_distribution.entry(field_name.to_string()).or_default() += count;
}
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat { progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
documents_seen: documents_count, documents_seen: documents_count,
}); });
@ -213,7 +202,6 @@ impl Transform<'_, '_> {
sorter, sorter,
primary_key, primary_key,
fields_ids_map, fields_ids_map,
fields_distribution,
documents_count, documents_count,
external_documents_ids, external_documents_ids,
progress_callback, progress_callback,
@ -226,7 +214,6 @@ impl Transform<'_, '_> {
F: Fn(UpdateIndexingStep) + Sync, F: Fn(UpdateIndexingStep) + Sync,
{ {
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
let mut csv = csv::Reader::from_reader(reader); let mut csv = csv::Reader::from_reader(reader);
@ -284,8 +271,6 @@ impl Transform<'_, '_> {
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
let mut documents_count = 0; let mut documents_count = 0;
let mut fields_ids_distribution = HashMap::new();
let mut record = csv::StringRecord::new(); let mut record = csv::StringRecord::new();
while csv.read_record(&mut record)? { while csv.read_record(&mut record)? {
obkv_buffer.clear(); obkv_buffer.clear();
@ -324,8 +309,6 @@ impl Transform<'_, '_> {
json_buffer.clear(); json_buffer.clear();
serde_json::to_writer(&mut json_buffer, &field)?; serde_json::to_writer(&mut json_buffer, &field)?;
writer.insert(*field_id, &json_buffer)?; writer.insert(*field_id, &json_buffer)?;
*fields_ids_distribution.entry(*field_id).or_insert(0) += 1;
} }
// We use the extracted/generated user id as the key for this document. // We use the extracted/generated user id as the key for this document.
@ -333,11 +316,6 @@ impl Transform<'_, '_> {
documents_count += 1; documents_count += 1;
} }
for (field_id, count) in fields_ids_distribution {
let field_name = fields_ids_map.name(field_id).unwrap();
*fields_distribution.entry(field_name.to_string()).or_default() += count;
}
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat { progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
documents_seen: documents_count, documents_seen: documents_count,
}); });
@ -352,7 +330,6 @@ impl Transform<'_, '_> {
sorter, sorter,
primary_key_name, primary_key_name,
fields_ids_map, fields_ids_map,
fields_distribution,
documents_count, documents_count,
external_documents_ids, external_documents_ids,
progress_callback, progress_callback,
@ -367,7 +344,6 @@ impl Transform<'_, '_> {
sorter: grenad::Sorter<MergeFn>, sorter: grenad::Sorter<MergeFn>,
primary_key: String, primary_key: String,
fields_ids_map: FieldsIdsMap, fields_ids_map: FieldsIdsMap,
fields_distribution: FieldsDistribution,
approximate_number_of_documents: usize, approximate_number_of_documents: usize,
mut external_documents_ids: ExternalDocumentsIds<'_>, mut external_documents_ids: ExternalDocumentsIds<'_>,
progress_callback: F, progress_callback: F,
@ -376,6 +352,7 @@ impl Transform<'_, '_> {
F: Fn(UpdateIndexingStep) + Sync, F: Fn(UpdateIndexingStep) + Sync,
{ {
let documents_ids = self.index.documents_ids(self.rtxn)?; let documents_ids = self.index.documents_ids(self.rtxn)?;
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
// Once we have sort and deduplicated the documents we write them into a final file. // Once we have sort and deduplicated the documents we write them into a final file.
@ -396,7 +373,6 @@ impl Transform<'_, '_> {
let mut documents_count = 0; let mut documents_count = 0;
let mut iter = sorter.into_iter()?; let mut iter = sorter.into_iter()?;
while let Some((external_id, update_obkv)) = iter.next()? { while let Some((external_id, update_obkv)) = iter.next()? {
if self.log_every_n.map_or(false, |len| documents_count % len == 0) { if self.log_every_n.map_or(false, |len| documents_count % len == 0) {
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
documents_seen: documents_count, documents_seen: documents_count,
@ -438,6 +414,12 @@ impl Transform<'_, '_> {
// We insert the document under the documents ids map into the final file. // We insert the document under the documents ids map into the final file.
final_sorter.insert(docid.to_be_bytes(), obkv)?; final_sorter.insert(docid.to_be_bytes(), obkv)?;
documents_count += 1; documents_count += 1;
let reader = obkv::KvReader::new(obkv);
for (field_id, _) in reader.iter() {
let field_name = fields_ids_map.name(field_id).unwrap();
*fields_distribution.entry(field_name.to_string()).or_default() += 1;
}
} }
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {