Only flatten the required fields

apply review comments

Co-authored-by: Kerollmops <kero@meilisearch.com>
This commit is contained in:
Tamo 2022-04-25 14:09:52 +02:00
parent e1e362fa43
commit f19d2dc548
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
2 changed files with 135 additions and 47 deletions

View File

@ -1337,32 +1337,34 @@ mod tests {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = update::Settings::new(&mut wtxn, &index, &config); let mut builder = update::Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("nested.id".to_owned()); builder.set_primary_key("complex.nested.id".to_owned());
builder.execute(|_| ()).unwrap(); builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = documents!([ let content = documents!([
{ {
"complex": {
"nested": { "nested": {
"id": 0, "id": 0,
}, },
},
"title": "The zeroth document", "title": "The zeroth document",
}, },
{ {
"nested": { "complex.nested": {
"id": 1, "id": 1,
}, },
"title": "The first document", "title": "The first document",
}, },
{ {
"nested": { "complex": {
"id": 2, "nested.id": 2,
}, },
"title": "The second document", "title": "The second document",
}, },
{ {
"nested.id": 3, "complex.nested.id": 3,
"title": "The third document", "title": "The third document",
}, },
]); ]);

View File

@ -340,35 +340,48 @@ impl<'a, 'i> Transform<'a, 'i> {
return Ok(None); return Ok(None);
} }
// store the keys and values the original obkv + the flattened json
// We first extract all the key+value out of the obkv. If a value is not nested
// we keep a reference on its value. If the value is nested we'll get its value
// as an owned `Vec<u8>` after flattening it.
let mut key_value: Vec<(FieldId, Cow<[u8]>)> = Vec::new();
// the object we're going to use to store the fields that need to be flattened.
let mut doc = serde_json::Map::new(); let mut doc = serde_json::Map::new();
for (k, v) in obkv.iter() { // we recreate a json containing only the fields that needs to be flattened.
let key = self.fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId { // all the raw values get inserted directly in the `key_value` vec.
field_id: k, for (key, value) in obkv.iter() {
if json_depth_checker::should_flatten_from_unchecked_slice(value) {
let key = self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
field_id: key,
process: "Flatten from fields ids map.", process: "Flatten from fields ids map.",
})?; })?;
let value = serde_json::from_slice::<serde_json::Value>(v)
let value = serde_json::from_slice::<Value>(value)
.map_err(crate::error::InternalError::SerdeJson)?; .map_err(crate::error::InternalError::SerdeJson)?;
doc.insert(key.to_string(), value); doc.insert(key.to_string(), value);
} else {
key_value.push((key, value.into()));
}
} }
let flattened = flatten_serde_json::flatten(&doc); let flattened = flatten_serde_json::flatten(&doc);
// Once we have the flattened version we can convert it back to obkv and // Once we have the flattened version we insert all the new generated fields_ids
// insert all the new generated fields_ids (if any) in the fields ids map. // (if any) in the fields ids map and serialize the value.
let mut buffer: Vec<u8> = Vec::new(); for (key, value) in flattened.into_iter() {
let mut writer = KvWriter::new(&mut buffer);
let mut flattened: Vec<_> = flattened.into_iter().collect();
// we reorder the field to get all the known field first
flattened
.sort_unstable_by_key(|(key, _)| self.fields_ids_map.id(&key).unwrap_or(FieldId::MAX));
for (key, value) in flattened {
let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
writer.insert(fid, &value)?; key_value.push((fid, value.into()));
} }
// we sort the key. If there was a conflict between the obkv and the new generated value the
// keys will be consecutive.
key_value.sort_unstable_by_key(|(key, _)| *key);
let mut buffer = Vec::new();
Self::create_obkv_from_key_value(&mut key_value, &mut buffer)?;
Ok(Some(buffer)) Ok(Some(buffer))
} }
@ -380,41 +393,114 @@ impl<'a, 'i> Transform<'a, 'i> {
output_buffer: &mut Vec<u8>, output_buffer: &mut Vec<u8>,
field_buffer_cache: &mut Vec<(u16, Cow<[u8]>)>, field_buffer_cache: &mut Vec<(u16, Cow<[u8]>)>,
) -> Result<()> { ) -> Result<()> {
// store the keys and values of the json + the original obkv
let mut key_value: Vec<(FieldId, Cow<[u8]>)> = Vec::new();
// if the primary_key is nested we need to flatten the document before being able to do anything // if the primary_key is nested we need to flatten the document before being able to do anything
let mut doc = serde_json::Map::new(); let mut doc = serde_json::Map::new();
for (k, v) in obkv.iter() { // we recreate a json containing only the fields that needs to be flattened.
// all the raw values get inserted directly in the `key_value` vec.
for (key, value) in obkv.iter() {
if json_depth_checker::should_flatten_from_unchecked_slice(value) {
let key = let key =
mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; mapping.get(&key).ok_or(InternalError::FieldIdMappingMissingEntry { key })?;
let key = self.fields_ids_map.name(*key).ok_or(FieldIdMapMissingEntry::FieldId { let key =
self.fields_ids_map.name(*key).ok_or(FieldIdMapMissingEntry::FieldId {
field_id: *key, field_id: *key,
process: "Flatten from field mapping.", process: "Flatten from field mapping.",
})?; })?;
let value = let value = serde_json::from_slice::<serde_json::Value>(value)
serde_json::from_slice::<serde_json::Value>(v).map_err(InternalError::SerdeJson)?; .map_err(InternalError::SerdeJson)?;
doc.insert(key.to_string(), value); doc.insert(key.to_string(), value);
} else {
key_value.push((key, value.into()));
}
} }
let flattened = flatten_serde_json::flatten(&doc); let flattened = flatten_serde_json::flatten(&doc);
// Once we have the flattened version we can convert it back to obkv and // Once we have the flattened version we insert all the new generated fields_ids
// insert all the new generated fields_ids (if any) in the fields ids map. // (if any) in the fields ids map and serialize the value.
output_buffer.clear(); for (key, value) in flattened.into_iter() {
let mut writer = KvWriter::new(output_buffer);
let mut flattened: Vec<_> = flattened.into_iter().collect();
// we reorder the field to get all the known field first
flattened
.sort_unstable_by_key(|(key, _)| self.fields_ids_map.id(&key).unwrap_or(FieldId::MAX));
for (key, value) in flattened {
let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
writer.insert(fid, &value)?; key_value.push((fid, value.clone().into()));
if field_buffer_cache.iter().find(|(id, _)| *id == fid).is_none() { if field_buffer_cache.iter().find(|(id, _)| *id == fid).is_none() {
field_buffer_cache.push((fid, value.into())); field_buffer_cache.push((fid, value.into()));
} }
} }
// we sort the key. If there was a conflict between the obkv and the new generated value the
// keys will be consecutive.
key_value.sort_unstable_by_key(|(key, _)| *key);
Self::create_obkv_from_key_value(&mut key_value, output_buffer)?;
Ok(())
}
/// Generate an obkv from a slice of key / value sorted by key.
fn create_obkv_from_key_value(
key_value: &mut [(FieldId, Cow<[u8]>)],
output_buffer: &mut Vec<u8>,
) -> Result<()> {
debug_assert!(
key_value.windows(2).all(|vec| vec[0].0 <= vec[1].0),
"The slice of key / value pair must be sorted."
);
output_buffer.clear();
let mut writer = KvWriter::new(output_buffer);
let mut skip_next_value = false;
for things in key_value.windows(2) {
if skip_next_value {
skip_next_value = false;
continue;
}
let (key1, value1) = &things[0];
let (key2, value2) = &things[1];
// now we're going to look for conflicts between the keys. For example the following documents would cause a conflict:
// { "doggo.name": "jean", "doggo": { "name": "paul" } }
// we should find a first "doggo.name" from the obkv and a second one from the flattening.
// but we must generate the following document:
// { "doggo.name": ["jean", "paul"] }
// thus we're going to merge the value from the obkv and the flattened document in a single array and skip the next
// iteration.
if key1 == key2 {
skip_next_value = true;
let value1 = serde_json::from_slice(value1)
.map_err(crate::error::InternalError::SerdeJson)?;
let value2 = serde_json::from_slice(value2)
.map_err(crate::error::InternalError::SerdeJson)?;
let value = match (value1, value2) {
(Value::Array(mut left), Value::Array(mut right)) => {
left.append(&mut right);
Value::Array(left)
}
(Value::Array(mut array), value) | (value, Value::Array(mut array)) => {
array.push(value);
Value::Array(array)
}
(left, right) => Value::Array(vec![left, right]),
};
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
writer.insert(*key1, value)?;
} else {
writer.insert(*key1, value1)?;
}
}
if !skip_next_value {
// the unwrap is safe here, we know there was at least one value in the document
let (key, value) = key_value.last().unwrap();
writer.insert(*key, value)?;
}
Ok(()) Ok(())
} }