Make sure we iterate over the payload documents in order

This commit is contained in:
Clément Renault 2024-09-05 22:31:17 +02:00
parent 72c6a21a30
commit 8fd0afaaaa
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 18 additions and 5 deletions

View File

@ -1318,7 +1318,7 @@ impl IndexScheduler {
match operation {
DocumentOperation::Add(_content_uuid) => {
let mmap = content_files_iter.next().unwrap();
let stats = indexer.add_documents(&mmap)?;
let stats = indexer.add_documents(mmap)?;
// builder = builder.with_embedders(embedders.clone());
let received_documents =

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use heed::types::Bytes;
use heed::RoTxn;
use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::super::document_change::DocumentChange;
@ -50,9 +51,10 @@ impl<'pl> DocumentOperation<'pl> {
/// TODO please give me a type
/// The payload is expected to be in the grenad format
pub fn add_documents(&mut self, payload: &'pl [u8]) -> Result<PayloadStats> {
let document_count = memchr::Memchr::new(b'\n', payload).count();
self.operations.push(Payload::Addition(payload));
pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<PayloadStats> {
payload.advise(memmap2::Advice::Sequential)?;
let document_count = memchr::Memchr::new(b'\n', &payload[..]).count();
self.operations.push(Payload::Addition(&payload[..]));
Ok(PayloadStats { bytes: payload.len() as u64, document_count })
}
@ -181,7 +183,18 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> {
/// TODO is it the best way to provide FieldsIdsMap to the parallel iterator?
let fields_ids_map = fields_ids_map.clone();
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect();
let mut docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect();
// Reorder the offsets to make sure we iterate on the file sequentially
docids_version_offsets.sort_unstable_by_key(|(_, (_, offsets))| {
offsets
.iter()
.rev()
.find_map(|ido| match ido {
InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize),
InnerDocOp::Deletion => None,
})
.unwrap_or(0)
});
Ok(docids_version_offsets
.into_par_iter()