From 0fc446c62f07ce4e5802a2affc39abdcd6a0ef1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 1 Nov 2023 10:07:03 +0100 Subject: [PATCH] Add more timing logs to the Transform --- milli/src/update/index_documents/transform.rs | 130 ++++++------------ 1 file changed, 44 insertions(+), 86 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 840bade2e..23b5c78c1 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -150,6 +150,7 @@ impl<'a, 'i> Transform<'a, 'i> { }) } + #[logging_timer::time] pub fn read_documents( &mut self, reader: EnrichedDocumentsBatchReader, @@ -162,6 +163,8 @@ impl<'a, 'i> Transform<'a, 'i> { FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, { + puffin::profile_function!(); + let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); let external_documents_ids = self.index.external_documents_ids(); let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; @@ -212,13 +215,12 @@ impl<'a, 'i> Transform<'a, 'i> { field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); // Build the new obkv document. - let mut writer = obkv::KvWriter::new(&mut obkv_buffer); + let mut writer = KvWriter::new(&mut obkv_buffer); for (k, v) in field_buffer_cache.iter() { writer.insert(*k, v)?; } let mut original_docid = None; - let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { HEntry::Occupied(entry) => *entry.get() as u32, HEntry::Vacant(entry) => { @@ -275,24 +277,19 @@ impl<'a, 'i> Transform<'a, 'i> { &mut document_sorter_buffer, )?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; - match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(flattened_obkv) => { - // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - true, - keep_original_version, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let base_obkv = KvReader::new(base_obkv); + if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::new(&flattened_obkv), + true, + keep_original_version, + &mut document_sorter_buffer, + )?; } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } } @@ -310,23 +307,18 @@ impl<'a, 'i> Transform<'a, 'i> { // We use the extracted/generated user id as the key for this document. self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; - match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { - Some(flattened_obkv) => { - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - false, - true, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let flattened_obkv = KvReader::new(&obkv_buffer); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + false, + true, + &mut document_sorter_buffer, + )? } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } documents_count += 1; @@ -361,6 +353,7 @@ impl<'a, 'i> Transform<'a, 'i> { /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. /// - If the document to remove was not present in either the db or the transform we do nothing. + #[logging_timer::time] pub fn remove_documents( &mut self, mut to_remove: Vec, @@ -370,6 +363,8 @@ impl<'a, 'i> Transform<'a, 'i> { where FA: Fn() -> bool + Sync, { + puffin::profile_function!(); + // there may be duplicates in the documents to remove. to_remove.sort_unstable(); to_remove.dedup(); @@ -439,24 +434,19 @@ impl<'a, 'i> Transform<'a, 'i> { self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; // flatten it and push it as to delete in the flattened_sorter - match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(flattened_obkv) => { - // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - true, - false, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let flattened_obkv = KvReader::new(base_obkv); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + true, + false, + &mut document_sorter_buffer, + )?; } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; true } @@ -591,42 +581,10 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(()) } - fn remove_deleted_documents_from_field_distribution( - &self, - rtxn: &RoTxn, - field_distribution: &mut FieldDistribution, - ) -> Result<()> { - for deleted_docid in self.replaced_documents_ids.iter() { - let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or( - InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, - )?; - - for (key, _) in obkv.iter() { - let name = - self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Computing field distribution in transform.", - })?; - // We checked that the document was in the db earlier. If we can't find it it means - // there is an inconsistency between the field distribution and the field id map. - let field = - field_distribution.get_mut(name).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Accessing field distribution in transform.", - })?; - *field -= 1; - if *field == 0 { - // since we were able to get the field right before it's safe to unwrap here - field_distribution.remove(name).unwrap(); - } - } - } - Ok(()) - } - /// Generate the `TransformOutput` based on the given sorter that can be generated from any /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// id for the user side and the value must be an obkv where keys are valid fields ids. + #[logging_timer::time] pub(crate) fn output_from_sorter( self, wtxn: &mut heed::RwTxn, @@ -816,7 +774,7 @@ impl<'a, 'i> Transform<'a, 'i> { let (docid, obkv) = result?; obkv_buffer.clear(); - let mut obkv_writer = obkv::KvWriter::<_, FieldId>::new(&mut obkv_buffer); + let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. for (id, name) in new_fields_ids_map.iter() {