From f7652186e171fdbabe06714e1a6f5d629ca97194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 12 Sep 2024 18:01:02 +0200 Subject: [PATCH] WIP geo fields --- milli/src/update/new/channel.rs | 54 ++++++++---- milli/src/update/new/document_change.rs | 22 ++--- .../update/new/indexer/document_deletion.rs | 21 +---- .../update/new/indexer/document_operation.rs | 24 +++--- milli/src/update/new/indexer/mod.rs | 18 +++- milli/src/update/new/indexer/partial_dump.rs | 2 +- milli/src/update/new/merger.rs | 82 ++++++++++++++++++- 7 files changed, 152 insertions(+), 71 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 3eafb7754..9b05c7ce4 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -323,6 +323,7 @@ pub enum MergerOperation { WordPositionDocidsMerger(Merger), DeleteDocument { docid: DocumentId }, InsertDocument { docid: DocumentId, document: Box }, + FinishedDocument, } pub struct MergerReceiver(Receiver); @@ -339,22 +340,8 @@ impl IntoIterator for MergerReceiver { pub struct ExtractorSender(Sender); impl ExtractorSender { - pub fn document_insert( - &self, - docid: DocumentId, - document: Box, - ) -> StdResult<(), SendError<()>> { - match self.0.send(MergerOperation::InsertDocument { docid, document }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } - - pub fn document_delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { - match self.0.send(MergerOperation::DeleteDocument { docid }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } + pub fn document_sender(&self) -> DocumentSender<'_> { + DocumentSender(&self.0) } pub fn send_searchable( @@ -367,3 +354,38 @@ impl ExtractorSender { } } } + +pub struct DocumentSender<'a>(&'a Sender); + +impl DocumentSender<'_> { + pub fn insert( + &self, + docid: DocumentId, + document: Box, + ) -> StdResult<(), SendError<()>> { + match self.0.send(MergerOperation::InsertDocument { docid, document }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + match self.0.send(MergerOperation::DeleteDocument { docid }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn finish(self) -> StdResult<(), SendError<()>> { + match self.0.send(MergerOperation::FinishedDocument) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} + +impl Drop for DocumentSender<'_> { + fn drop(&mut self) { + self.0.send(MergerOperation::FinishedDocument); + } +} diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs index aa37593c9..3e6473e77 100644 --- a/milli/src/update/new/document_change.rs +++ b/milli/src/update/new/document_change.rs @@ -12,20 +12,17 @@ pub enum DocumentChange { pub struct Deletion { docid: DocumentId, - external_docid: String, // ? - current: Box, // ? + current: Box, } pub struct Update { docid: DocumentId, - external_docid: String, // ? - current: Box, // ? + current: Box, new: Box, } pub struct Insertion { docid: DocumentId, - external_docid: String, // ? new: Box, } @@ -40,12 +37,8 @@ impl DocumentChange { } impl Deletion { - pub fn create( - docid: DocumentId, - external_docid: String, - current: Box, - ) -> Self { - Self { docid, external_docid, current } + pub fn create(docid: DocumentId, current: Box) -> Self { + Self { docid, current } } pub fn docid(&self) -> DocumentId { @@ -63,8 +56,8 @@ impl Deletion { } impl Insertion { - pub fn create(docid: DocumentId, external_docid: String, new: Box) -> Self { - Insertion { docid, external_docid, new } + pub fn create(docid: DocumentId, new: Box) -> Self { + Insertion { docid, new } } pub fn docid(&self) -> DocumentId { @@ -79,11 +72,10 @@ impl Insertion { impl Update { pub fn create( docid: DocumentId, - external_docid: String, current: Box, new: Box, ) -> Self { - Update { docid, external_docid, current, new } + Update { docid, current, new } } pub fn docid(&self) -> DocumentId { diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index b4336c14a..b744ec65e 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -4,9 +4,8 @@ use rayon::iter::{ParallelBridge, ParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; -use crate::documents::PrimaryKey; use crate::update::new::{Deletion, DocumentChange, ItemsPool}; -use crate::{FieldsIdsMap, Index, InternalError, Result}; +use crate::{FieldsIdsMap, Index, Result}; pub struct DocumentDeletion { pub to_delete: RoaringBitmap, @@ -23,31 +22,19 @@ impl DocumentDeletion { } impl<'p> DocumentChanges<'p> for DocumentDeletion { - type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); + type Parameter = &'p Index; fn document_changes( self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, ) -> Result> + Clone + 'p> { - let (index, fields, primary_key) = param; + let index = param; let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { items.with(|rtxn| { let current = index.document(rtxn, docid)?; - let external_docid = match primary_key.document_id(current, fields)? { - Ok(document_id) => Ok(document_id) as Result<_>, - Err(_) => Err(InternalError::DocumentsError( - crate::documents::Error::InvalidDocumentFormat, - ) - .into()), - }?; - - Ok(DocumentChange::Deletion(Deletion::create( - docid, - external_docid, - current.boxed(), - ))) + Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) }) })) } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 1670c8145..b299124bd 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -6,7 +6,6 @@ use heed::types::Bytes; use heed::RoTxn; use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use serde_json::from_str; use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; @@ -289,20 +288,17 @@ impl MergeChanges for MergeDocumentForReplacement { let new = writer.into_boxed(); match current { - Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); - Ok(Some(DocumentChange::Update(update))) - } - None => { - let insertion = Insertion::create(docid, external_docid, new); - Ok(Some(DocumentChange::Insertion(insertion))) - } + Some(current) => Ok(Some(DocumentChange::Update(Update::create( + docid, + current.boxed(), + new, + )))), + None => Ok(Some(DocumentChange::Insertion(Insertion::create(docid, new)))), } } Some(InnerDocOp::Deletion) => match current { Some(current) => { - let deletion = Deletion::create(docid, external_docid, current.boxed()); - Ok(Some(DocumentChange::Deletion(deletion))) + Ok(Some(DocumentChange::Deletion(Deletion::create(docid, current.boxed())))) } None => Ok(None), }, @@ -361,7 +357,7 @@ impl MergeChanges for MergeDocumentForUpdates { if operations.is_empty() { match current { Some(current) => { - let deletion = Deletion::create(docid, external_docid, current.boxed()); + let deletion = Deletion::create(docid, current.boxed()); return Ok(Some(DocumentChange::Deletion(deletion))); } None => return Ok(None), @@ -389,11 +385,11 @@ impl MergeChanges for MergeDocumentForUpdates { match current { Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); + let update = Update::create(docid, current.boxed(), new); Ok(Some(DocumentChange::Update(update))) } None => { - let insertion = Insertion::create(docid, external_docid, new); + let insertion = Insertion::create(docid, new); Ok(Some(DocumentChange::Insertion(insertion))) } } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index c1bcd20cf..7350d9499 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -59,6 +59,7 @@ where let fields_ids_map_lock = RwLock::new(fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); + let global_fields_ids_map_clone = global_fields_ids_map.clone(); thread::scope(|s| { // TODO manage the errors correctly @@ -70,27 +71,30 @@ where let document_changes = document_changes.into_par_iter(); // document but we need to create a function that collects and compresses documents. + let document_sender = extractor_sender.document_sender(); document_changes.clone().into_par_iter().try_for_each(|result| { match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); - extractor_sender.document_delete(docid).unwrap(); + document_sender.delete(docid).unwrap(); } DocumentChange::Update(update) => { let docid = update.docid(); let content = update.new(); - extractor_sender.document_insert(docid, content.boxed()).unwrap(); + document_sender.insert(docid, content.boxed()).unwrap(); } DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); let content = insertion.new(); - extractor_sender.document_insert(docid, content.boxed()).unwrap(); + document_sender.insert(docid, content.boxed()).unwrap(); // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } Ok(()) as Result<_> })?; + document_sender.finish().unwrap(); + const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; let max_memory = TEN_GIB / dbg!(rayon::current_num_threads()); let grenad_parameters = GrenadParameters { @@ -197,7 +201,13 @@ where tracing::trace_span!(target: "indexing::documents", parent: ¤t_span, "merge"); let _entered = span.enter(); let rtxn = index.read_txn().unwrap(); - merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index) + merge_grenad_entries( + merger_receiver, + merger_sender, + &rtxn, + index, + global_fields_ids_map_clone, + ) })?; for operation in writer_receiver { diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index fe49ffdd7..50768ba82 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -68,7 +68,7 @@ where } }?; - let insertion = Insertion::create(docid, external_docid, document); + let insertion = Insertion::create(docid, document); Ok(DocumentChange::Insertion(insertion)) })) } diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 22c4baf26..291f79216 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -1,20 +1,23 @@ use std::fs::File; +use std::io::{self, BufWriter}; +use bincode::ErrorKind; use fst::{Set, SetBuilder}; use grenad::Merger; use heed::types::Bytes; -use heed::{Database, RoTxn}; +use heed::{BoxedError, Database, RoTxn}; use memmap2::Mmap; use roaring::RoaringBitmap; -use std::io::BufWriter; use tempfile::tempfile; use super::channel::*; -use super::KvReaderDelAdd; +use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{CboRoaringBitmapCodec, Index, Result}; +use crate::{ + CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, InternalError, Result, +}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] @@ -23,9 +26,11 @@ pub fn merge_grenad_entries( sender: MergerSender, rtxn: &RoTxn, index: &Index, + mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, ) -> Result<()> { let mut buffer = Vec::new(); let mut documents_ids = index.documents_ids(rtxn)?; + let mut geo_extractor = GeoExtractor::new(rtxn, index)?; for merger_operation in receiver { match merger_operation { @@ -125,6 +130,18 @@ pub fn merge_grenad_entries( let _entered = span.enter(); documents_ids.insert(docid); sender.documents().uncompressed(docid, &document).unwrap(); + + if let Some(geo_extractor) = geo_extractor.as_mut() { + let current = index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); + let change = match current { + Some(current) => { + DocumentChange::Update(Update::create(docid, current.boxed(), document)) + } + None => DocumentChange::Insertion(Insertion::create(docid, document)), + }; + geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; + } } MergerOperation::DeleteDocument { docid } => { let span = @@ -134,6 +151,15 @@ pub fn merge_grenad_entries( unreachable!("Tried deleting a document that we do not know about"); } sender.documents().delete(docid).unwrap(); + + if let Some(geo_extractor) = geo_extractor.as_mut() { + let current = index.document(rtxn, docid)?; + let change = DocumentChange::Deletion(Deletion::create(docid, current.boxed())); + geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; + } + } + MergerOperation::FinishedDocument => { + // send the rtree } } } @@ -153,6 +179,54 @@ pub fn merge_grenad_entries( Ok(()) } +pub struct GeoExtractor { + rtree: Option>, +} + +impl GeoExtractor { + pub fn new(rtxn: &RoTxn, index: &Index) -> Result> { + let is_sortable = index.sortable_fields(rtxn)?.contains("_geo"); + let is_filterable = index.filterable_fields(rtxn)?.contains("_geo"); + if is_sortable || is_filterable { + Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? })) + } else { + Ok(None) + } + } + + pub fn manage_change( + &mut self, + fidmap: &mut GlobalFieldsIdsMap, + change: &DocumentChange, + ) -> Result<()> { + match change { + DocumentChange::Deletion(_) => todo!(), + DocumentChange::Update(_) => todo!(), + DocumentChange::Insertion(_) => todo!(), + } + } + + pub fn serialize_rtree(self, writer: &mut W) -> Result { + match self.rtree { + Some(rtree) => { + // TODO What should I do? + bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e { + ErrorKind::Io(e) => Error::IoError(e), + ErrorKind::InvalidUtf8Encoding(_) => todo!(), + ErrorKind::InvalidBoolEncoding(_) => todo!(), + ErrorKind::InvalidCharEncoding => todo!(), + ErrorKind::InvalidTagEncoding(_) => todo!(), + ErrorKind::DeserializeAnyNotSupported => todo!(), + ErrorKind::SizeLimit => todo!(), + ErrorKind::SequenceMustHaveLength => todo!(), + ErrorKind::Custom(_) => todo!(), + }) + } + None => Ok(false), + } + } +} + fn compute_new_words_fst( add_words_fst: SetBuilder>, del_words_fst: SetBuilder>,