diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index ba99eb418..506ba6581 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1318,7 +1318,7 @@ impl IndexScheduler { match operation { DocumentOperation::Add(_content_uuid) => { let mmap = content_files_iter.next().unwrap(); - let stats = indexer.add_documents(&mmap)?; + let stats = indexer.add_documents(mmap)?; // builder = builder.with_embedders(embedders.clone()); let received_documents = diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 93e051aa2..3cbaf836d 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use heed::types::Bytes; use heed::RoTxn; +use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::super::document_change::DocumentChange; @@ -50,9 +51,10 @@ impl<'pl> DocumentOperation<'pl> { /// TODO please give me a type /// The payload is expected to be in the grenad format - pub fn add_documents(&mut self, payload: &'pl [u8]) -> Result { - let document_count = memchr::Memchr::new(b'\n', payload).count(); - self.operations.push(Payload::Addition(payload)); + pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result { + payload.advise(memmap2::Advice::Sequential)?; + let document_count = memchr::Memchr::new(b'\n', &payload[..]).count(); + self.operations.push(Payload::Addition(&payload[..])); Ok(PayloadStats { bytes: payload.len() as u64, document_count }) } @@ -181,7 +183,18 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { /// TODO is it the best way to provide FieldsIdsMap to the parallel iterator? let fields_ids_map = fields_ids_map.clone(); // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone - let docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); + let mut docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); + // Reorder the offsets to make sure we iterate on the file sequentially + docids_version_offsets.sort_unstable_by_key(|(_, (_, offsets))| { + offsets + .iter() + .rev() + .find_map(|ido| match ido { + InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), + InnerDocOp::Deletion => None, + }) + .unwrap_or(0) + }); Ok(docids_version_offsets .into_par_iter()