From 72c6a21a30fb3db20be4cc6f10eec4c1f16bb68e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 5 Sep 2024 20:08:23 +0200 Subject: [PATCH] Use raw JSON to read the payloads --- Cargo.lock | 1 + index-scheduler/Cargo.toml | 1 + index-scheduler/src/batch.rs | 43 ++++- meilisearch/src/routes/indexes/documents.rs | 5 +- .../update/new/indexer/document_operation.rs | 175 ++++++++++-------- 5 files changed, 131 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e169dbd52..6eb12d80f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2570,6 +2570,7 @@ dependencies = [ "meili-snap", "meilisearch-auth", "meilisearch-types", + "memmap2", "page_size", "rayon", "roaring", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 6f099a025..cb37c9151 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -29,6 +29,7 @@ serde_json = { version = "1.0.120", features = ["preserve_order"] } synchronoise = "1.0.1" tempfile = "3.10.1" thiserror = "1.0.61" +memmap2 = "0.9.4" time = { version = "0.3.36", features = [ "serde-well-known", "formatting", diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 129dbec10..ba99eb418 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -18,6 +18,7 @@ one indexing operation. */ use std::collections::{BTreeSet, HashSet}; +use std::env::VarError; use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; @@ -26,7 +27,7 @@ use std::io::BufWriter; use dump::IndexMetadata; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges}; use meilisearch_types::milli::update::{ @@ -1294,19 +1295,30 @@ impl IndexScheduler { _ => None, }) .unwrap(); - let content_file = self.file_store.get_update(*first_addition_uuid)?; - let reader = - DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?; - let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); - let primary_key = - guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap(); + // let content_file = self.file_store.get_update(*first_addition_uuid)?; + // let reader = + // DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?; + // let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + // let primary_key = + // guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap(); + + let mut content_files = Vec::new(); + for operation in &operations { + if let DocumentOperation::Add(content_uuid) = operation { + let content_file = self.file_store.get_update(*content_uuid)?; + let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; + content_files.push(mmap); + } + } + + let mut content_files_iter = content_files.iter(); let mut indexer = indexer::DocumentOperation::new(method); for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { match operation { - DocumentOperation::Add(content_uuid) => { - let content_file = self.file_store.get_update(content_uuid)?; - let stats = indexer.add_documents(content_file)?; + DocumentOperation::Add(_content_uuid) => { + let mmap = content_files_iter.next().unwrap(); + let stats = indexer.add_documents(&mmap)?; // builder = builder.with_embedders(embedders.clone()); let received_documents = @@ -1357,6 +1369,17 @@ impl IndexScheduler { // let pool = indexer_config.thread_pool.unwrap(); let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); // let fields_ids_map = RwLock::new(fields_ids_map); + + /// TODO correctly guess the primary key in a NDJSON + let pk = match std::env::var("MEILI_PRIMARY_KEY") { + Ok(pk) => pk, + Err(VarError::NotPresent) => "id".to_string(), + Err(e) => panic!("primary key error: {e}"), + }; + + fields_ids_map.insert(&pk); + let primary_key = PrimaryKey::new(&pk, &fields_ids_map).unwrap(); + let param = (index, &rtxn, &primary_key); let document_changes = indexer.document_changes(&mut fields_ids_map, param)?; /// TODO pass/write the FieldsIdsMap diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 85cf33c54..029a125d0 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -459,12 +459,13 @@ async fn document_addition( return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); } - let read_file = buffer.into_inner().into_std().await; + let mut read_file = buffer.into_inner().into_std().await; let documents_count = tokio::task::spawn_blocking(move || { let documents_count = match format { PayloadType::Json => read_json(&read_file, &mut update_file)?, PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, - PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?, + /// TODO do not copy all the content + PayloadType::Ndjson => std::io::copy(&mut read_file, &mut update_file).unwrap(), }; // we NEED to persist the file here because we moved the `udpate_file` in another task. update_file.persist()?; diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index c30665f17..93e051aa2 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -1,31 +1,26 @@ use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; -use std::fs::File; -use std::io::Cursor; use std::sync::Arc; use heed::types::Bytes; use heed::RoTxn; -use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::super::document_change::DocumentChange; use super::super::items_pool::ItemsPool; use super::DocumentChanges; -use crate::documents::{ - obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, -}; +use crate::documents::PrimaryKey; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; -pub struct DocumentOperation { - operations: Vec, +pub struct DocumentOperation<'pl> { + operations: Vec>, index_documents_method: IndexDocumentsMethod, } -pub enum Payload { - Addition(File), +pub enum Payload<'pl> { + Addition(&'pl [u8]), Deletion(Vec), } @@ -35,36 +30,30 @@ pub struct PayloadStats { } #[derive(Clone)] -enum InnerDocOp { - Addition(DocumentOffset), +enum InnerDocOp<'pl> { + Addition(DocumentOffset<'pl>), Deletion, } /// Represents an offset where a document lives /// in an mmapped grenad reader file. #[derive(Clone)] -pub struct DocumentOffset { - /// The mmapped grenad reader file. - pub content: Arc, // grenad::Reader - /// The offset of the document in the file. - pub offset: u32, +pub struct DocumentOffset<'pl> { + /// The mmapped payload files. + pub content: &'pl [u8], } -impl DocumentOperation { +impl<'pl> DocumentOperation<'pl> { pub fn new(method: IndexDocumentsMethod) -> Self { Self { operations: Default::default(), index_documents_method: method } } /// TODO please give me a type /// The payload is expected to be in the grenad format - pub fn add_documents(&mut self, payload: File) -> Result { - let reader = DocumentsBatchReader::from_reader(&payload)?; - let bytes = payload.metadata()?.len(); - let document_count = reader.documents_count() as usize; - + pub fn add_documents(&mut self, payload: &'pl [u8]) -> Result { + let document_count = memchr::Memchr::new(b'\n', payload).count(); self.operations.push(Payload::Addition(payload)); - - Ok(PayloadStats { bytes, document_count }) + Ok(PayloadStats { bytes: payload.len() as u64, document_count }) } pub fn delete_documents(&mut self, to_delete: Vec) { @@ -72,7 +61,7 @@ impl DocumentOperation { } } -impl<'p> DocumentChanges<'p> for DocumentOperation { +impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { type Parameter = (&'p Index, &'p RoTxn<'p>, &'p PrimaryKey<'p>); fn document_changes( @@ -84,48 +73,63 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { let documents_ids = index.documents_ids(rtxn)?; let mut available_docids = AvailableIds::new(&documents_ids); - let mut docids_version_offsets = HashMap::::new(); + let mut docids_version_offsets = HashMap::, _>::new(); for operation in self.operations { match operation { Payload::Addition(payload) => { - let content = unsafe { Mmap::map(&payload).map(Arc::new)? }; - let cursor = Cursor::new(content.as_ref()); - let reader = DocumentsBatchReader::from_reader(cursor)?; + let mut iter = + serde_json::Deserializer::from_slice(payload).into_iter::(); - let (mut batch_cursor, batch_index) = reader.into_cursor_and_fields_index(); - // TODO Fetch all document fields to fill the fields ids map - batch_index.iter().for_each(|(_, name)| { - fields_ids_map.insert(name); - }); + /// TODO manage the error + let mut previous_offset = 0; + while let Some(document) = iter.next().transpose().unwrap() { + // TODO Fetch all document fields to fill the fields ids map + document.0.keys().for_each(|key| { + fields_ids_map.insert(key.as_ref()); + }); - let mut offset: u32 = 0; - while let Some(document) = batch_cursor.next_document()? { + // TODO we must manage the TooManyDocumentIds,InvalidDocumentId + // we must manage the unwrap let external_document_id = - match primary_key.document_id(document, &batch_index)? { - Ok(document_id) => Ok(document_id), - Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { - Err(user_error) - } - Err(DocumentIdExtractionError::MissingDocumentId) => { - Err(UserError::MissingDocumentId { + match get_docid(&document, &[primary_key.name()]).unwrap() { + Some(document_id) => document_id, + None => { + return Err(UserError::MissingDocumentId { primary_key: primary_key.name().to_string(), - document: obkv_to_object(document, &batch_index)?, - }) + document: todo!(), + // document: obkv_to_object(document, &batch_index)?, + } + .into()); } - Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { - Err(UserError::TooManyDocumentIds { - primary_key: primary_key.name().to_string(), - document: obkv_to_object(document, &batch_index)?, - }) - } - }?; + }; - let content = content.clone(); - let document_offset = DocumentOffset { content, offset }; - let document_operation = InnerDocOp::Addition(document_offset); + // let external_document_id = + // match primary_key.document_id(document, &batch_index)? { + // Ok(document_id) => Ok(document_id), + // Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { + // Err(user_error) + // } + // Err(DocumentIdExtractionError::MissingDocumentId) => { + // Err(UserError::MissingDocumentId { + // primary_key: primary_key.name().to_string(), + // document: obkv_to_object(document, &batch_index)?, + // }) + // } + // Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + // Err(UserError::TooManyDocumentIds { + // primary_key: primary_key.name().to_string(), + // document: obkv_to_object(document, &batch_index)?, + // }) + // } + // }?; - match docids_version_offsets.get_mut(&external_document_id) { + let current_offset = iter.byte_offset(); + let document_operation = InnerDocOp::Addition(DocumentOffset { + content: &payload[previous_offset..current_offset], + }); + + match docids_version_offsets.get_mut(external_document_id.as_ref()) { None => { let docid = match index .external_documents_ids() @@ -144,12 +148,13 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { } Some((_, offsets)) => offsets.push(document_operation), } - offset += 1; + + previous_offset = iter.byte_offset(); } } Payload::Deletion(to_delete) => { for external_document_id in to_delete { - match docids_version_offsets.get_mut(&external_document_id) { + match docids_version_offsets.get_mut(external_document_id.as_str()) { None => { let docid = match index .external_documents_ids() @@ -162,7 +167,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { }; docids_version_offsets.insert( - external_document_id, + CowStr(external_document_id.into()), (docid, vec![InnerDocOp::Deletion]), ); } @@ -196,7 +201,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { index, &fields_ids_map, internal_docid, - external_docid, + external_docid.to_string(), // TODO do not clone &operations, ) }) @@ -221,22 +226,20 @@ fn merge_document_for_replacements( let current: Option<&KvReaderFieldId> = current.map(Into::into); match operations.last() { - Some(InnerDocOp::Addition(DocumentOffset { content, offset })) => { - let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; - let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); - let update = cursor.get(*offset)?.expect("must exists"); - + Some(InnerDocOp::Addition(DocumentOffset { content })) => { + let map: TopLevelMap = serde_json::from_slice(content).unwrap(); let mut document_entries = Vec::new(); - update.into_iter().for_each(|(k, v)| { - let field_name = batch_index.name(k).unwrap(); - let id = fields_ids_map.id(field_name).unwrap(); + for (key, v) in map.0 { + let id = fields_ids_map.id(key.as_ref()).unwrap(); document_entries.push((id, v)); - }); + } document_entries.sort_unstable_by_key(|(id, _)| *id); let mut writer = KvWriterFieldId::memory(); - document_entries.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); + document_entries + .into_iter() + .for_each(|(id, value)| writer.insert(id, value.get()).unwrap()); let new = writer.into_boxed(); match current { @@ -305,22 +308,18 @@ fn merge_document_for_updates( } for operation in operations { - let DocumentOffset { content, offset } = match operation { + let DocumentOffset { content } = match operation { InnerDocOp::Addition(offset) => offset, InnerDocOp::Deletion => { unreachable!("Deletion in document operations") } }; - let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; - let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); - let update = cursor.get(*offset)?.expect("must exists"); - - update.into_iter().for_each(|(k, v)| { - let field_name = batch_index.name(k).unwrap(); - let id = fields_ids_map.id(field_name).unwrap(); - document.insert(id, v.to_vec().into()); - }); + let map: TopLevelMap = serde_json::from_slice(content).unwrap(); + for (key, v) in map.0 { + let id = fields_ids_map.id(key.as_ref()).unwrap(); + document.insert(id, v.get().as_bytes().to_vec().into()); + } } let mut writer = KvWriterFieldId::memory(); @@ -348,9 +347,21 @@ use serde_json::value::RawValue; #[derive(Deserialize)] pub struct TopLevelMap<'p>(#[serde(borrow)] BTreeMap, &'p RawValue>); -#[derive(Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub struct CowStr<'p>(#[serde(borrow)] Cow<'p, str>); +impl CowStr<'_> { + fn to_string(&self) -> String { + self.0.to_string() + } +} + +impl AsRef for CowStr<'_> { + fn as_ref(&self) -> &str { + self.0.as_ref() + } +} + impl<'doc> Borrow for CowStr<'doc> { fn borrow(&self) -> &str { self.0.borrow()