From 85f42fbc036e850cf55b044eb948de72abbf5ebe Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 24 Oct 2023 17:04:48 +0200 Subject: [PATCH] Handle external to internal id mapping from TypedChunk::Documents --- .../src/update/index_documents/typed_chunk.rs | 81 +++++++++++++++---- 1 file changed, 65 insertions(+), 16 deletions(-) diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 39537cce7..1f1ac4adf 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -15,13 +15,16 @@ use super::helpers::{self, merge_ignore_values, valid_lmdb_key, CursorClonableMm use super::{ClonableMmap, MergeFn}; use crate::distance::NDotProductPoint; use crate::error::UserError; +use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; use crate::index::Hnsw; use crate::update::del_add::{DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at}; +use crate::update::index_documents::validate_document_id_value; use crate::{ - lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, Result, BEU32, + lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, + Result, BEU32, }; pub(crate) enum TypedChunk { @@ -118,36 +121,82 @@ pub(crate) fn write_typed_chunk_into_index( let mut is_merged_database = false; match typed_chunk { TypedChunk::Documents(obkv_documents_iter) => { - let mut docids = index.documents_ids(wtxn)?; + let mut operations: Vec = Default::default(); + let mut docids = index.documents_ids(wtxn)?; + let primary_key = index.primary_key(wtxn)?.unwrap(); + let primary_key = index.fields_ids_map(wtxn)?.id(primary_key).unwrap(); let mut cursor = obkv_documents_iter.into_cursor()?; while let Some((docid, reader)) = cursor.move_on_next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let reader: KvReader = KvReader::new(reader); - let mut written = false; + let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap(); + for (field_id, value) in reader.iter() { - let Some(value) = KvReaderDelAdd::new(value).get(DelAdd::Addition) else { - continue; - }; - // TODO: writer.is_empty - written = true; - writer.insert(field_id, value)?; + let del_add_reader = KvReaderDelAdd::new(value); + match ( + del_add_reader.get(DelAdd::Deletion), + del_add_reader.get(DelAdd::Addition), + ) { + (None, None) => {} + (None, Some(value)) => { + // if primary key, new document + if field_id == primary_key { + // FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk + // rather than re-extract it here + // FIXME: unwraps + let document_id = serde_json::from_slice(value) + .map_err(InternalError::SerdeJson) + .unwrap(); + let external_id = + validate_document_id_value(document_id).unwrap().unwrap(); + operations.push(DocumentOperation { + external_id, + internal_id: docid, + kind: DocumentOperationKind::Create, + }); + docids.insert(docid); + } + // anyway, write + writer.insert(field_id, value)?; + } + (Some(value), None) => { + // if primary key, deleted document + if field_id == primary_key { + // FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk + // rather than re-extract it here + // FIXME: unwraps + let document_id = serde_json::from_slice(value) + .map_err(InternalError::SerdeJson) + .unwrap(); + let external_id = + validate_document_id_value(document_id).unwrap().unwrap(); + operations.push(DocumentOperation { + external_id, + internal_id: docid, + kind: DocumentOperationKind::Delete, + }); + docids.remove(docid); + } + } + (Some(_), Some(value)) => { + // updated field, write + writer.insert(field_id, value)?; + } + } } let db = index.documents.remap_data_type::(); - let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap(); - if written { + if !writer.is_empty() { db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?; - docids.insert(docid); } else { db.delete(wtxn, &BEU32::new(docid))?; - // FIXME: unwrap - if !docids.remove(docid) { - panic!("Attempt to remove a document id that doesn't exist") - } } } + let mut external_documents_docids = index.external_documents_ids(wtxn)?.into_static(); + external_documents_docids.apply(operations); + index.put_external_documents_ids(wtxn, &external_documents_docids)?; index.put_documents_ids(wtxn, &docids)?; }