From f156d7dd3b45f524a9e716ee88b09b7bb363b280 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 5 Jul 2022 19:15:16 +0200 Subject: [PATCH 1/2] Stop reindexing already indexed documents --- milli/src/update/index_documents/transform.rs | 77 ++++++++++++------- 1 file changed, 50 insertions(+), 27 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 0de90924a..705bbb21c 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -200,24 +200,26 @@ impl<'a, 'i> Transform<'a, 'i> { let mut original_docid = None; - let docid = match self.new_external_documents_ids_builder.entry(external_id.into()) { - Entry::Occupied(entry) => *entry.get() as u32, - Entry::Vacant(entry) => { - // If the document was already in the db we mark it as a replaced document. - // It'll be deleted later. We keep its original docid to insert it in the grenad. - if let Some(docid) = external_documents_ids.get(entry.key()) { - self.replaced_documents_ids.insert(docid); - original_docid = Some(docid); + let docid = + match self.new_external_documents_ids_builder.entry(external_id.clone().into()) { + Entry::Occupied(entry) => *entry.get() as u32, + Entry::Vacant(entry) => { + // If the document was already in the db we mark it as a replaced document. + // It'll be deleted later. We keep its original docid to insert it in the grenad. + if let Some(docid) = external_documents_ids.get(entry.key()) { + self.replaced_documents_ids.insert(docid); + original_docid = Some(docid); + } + let docid = self + .available_documents_ids + .next() + .ok_or(UserError::DocumentLimitReached)?; + entry.insert(docid as u64); + docid } - let docid = self - .available_documents_ids - .next() - .ok_or(UserError::DocumentLimitReached)?; - entry.insert(docid as u64); - docid - } - }; + }; + let mut skip_insertion = false; if let Some(original_docid) = original_docid { let original_key = BEU32::new(original_docid); let base_obkv = self @@ -230,24 +232,39 @@ impl<'a, 'i> Transform<'a, 'i> { key: None, })?; - // we associate the base document with the new key, everything will get merged later. - self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?; - match self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? { - Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, - None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?, + // we check if the two documents are exactly equal. If it's the case we can skip this document entirely + if base_obkv == obkv_buffer { + // we're not replacing anything + self.replaced_documents_ids.remove(original_docid); + // and we need to put back the original id as it was before + self.new_external_documents_ids_builder.remove(&*external_id); + skip_insertion = true; + } else { + // we associate the base document with the new key, everything will get merged later. + self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?; + match self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? { + Some(buffer) => { + self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)? + } + None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?, + } } } else { self.new_documents_ids.insert(docid); } - // We use the extracted/generated user id as the key for this document. - self.original_sorter.insert(&docid.to_be_bytes(), &obkv_buffer)?; - documents_count += 1; + if !skip_insertion { + // We use the extracted/generated user id as the key for this document. + self.original_sorter.insert(&docid.to_be_bytes(), obkv_buffer.clone())?; - match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { - Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, - None => self.flattened_sorter.insert(docid.to_be_bytes(), &obkv_buffer)?, + match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { + Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, + None => { + self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())? + } + } } + documents_count += 1; progress_callback(UpdateIndexingStep::RemapDocumentAddition { documents_seen: documents_count, @@ -394,6 +411,11 @@ impl<'a, 'i> Transform<'a, 'i> { rtxn: &RoTxn, field_distribution: &mut FieldDistribution, ) -> Result<()> { + println!( + "The following documents are going to be deleted from the field distribution: {:?}", + self.replaced_documents_ids + ); + for deleted_docid in self.replaced_documents_ids.iter() { let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or( InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, @@ -461,6 +483,7 @@ impl<'a, 'i> Transform<'a, 'i> { let mut documents_count = 0; while let Some((key, val)) = iter.next()? { + println!("Reading a document"); // send a callback to show at which step we are documents_count += 1; progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { From 7fc35c558616abd5178529d2cc73d19ac1e7ccb9 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 7 Jul 2022 15:02:06 +0200 Subject: [PATCH 2/2] remove the useless prints --- milli/src/update/index_documents/transform.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 705bbb21c..b61395a96 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -411,11 +411,6 @@ impl<'a, 'i> Transform<'a, 'i> { rtxn: &RoTxn, field_distribution: &mut FieldDistribution, ) -> Result<()> { - println!( - "The following documents are going to be deleted from the field distribution: {:?}", - self.replaced_documents_ids - ); - for deleted_docid in self.replaced_documents_ids.iter() { let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or( InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, @@ -483,7 +478,6 @@ impl<'a, 'i> Transform<'a, 'i> { let mut documents_count = 0; while let Some((key, val)) = iter.next()? { - println!("Reading a document"); // send a callback to show at which step we are documents_count += 1; progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {