Use raw JSON to read the payloads

This commit is contained in:
Clément Renault 2024-09-05 20:08:23 +02:00
parent 8412be4a7d
commit 72c6a21a30
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 131 additions and 94 deletions

1
Cargo.lock generated
View File

@ -2570,6 +2570,7 @@ dependencies = [
"meili-snap",
"meilisearch-auth",
"meilisearch-types",
"memmap2",
"page_size",
"rayon",
"roaring",

View File

@ -29,6 +29,7 @@ serde_json = { version = "1.0.120", features = ["preserve_order"] }
synchronoise = "1.0.1"
tempfile = "3.10.1"
thiserror = "1.0.61"
memmap2 = "0.9.4"
time = { version = "0.3.36", features = [
"serde-well-known",
"formatting",

View File

@ -18,6 +18,7 @@ one indexing operation.
*/
use std::collections::{BTreeSet, HashSet};
use std::env::VarError;
use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
@ -26,7 +27,7 @@ use std::io::BufWriter;
use dump::IndexMetadata;
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges};
use meilisearch_types::milli::update::{
@ -1294,19 +1295,30 @@ impl IndexScheduler {
_ => None,
})
.unwrap();
let content_file = self.file_store.get_update(*first_addition_uuid)?;
let reader =
DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?;
let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
let primary_key =
guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap();
// let content_file = self.file_store.get_update(*first_addition_uuid)?;
// let reader =
// DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?;
// let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
// let primary_key =
// guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap();
let mut content_files = Vec::new();
for operation in &operations {
if let DocumentOperation::Add(content_uuid) = operation {
let content_file = self.file_store.get_update(*content_uuid)?;
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
content_files.push(mmap);
}
}
let mut content_files_iter = content_files.iter();
let mut indexer = indexer::DocumentOperation::new(method);
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
match operation {
DocumentOperation::Add(content_uuid) => {
let content_file = self.file_store.get_update(content_uuid)?;
let stats = indexer.add_documents(content_file)?;
DocumentOperation::Add(_content_uuid) => {
let mmap = content_files_iter.next().unwrap();
let stats = indexer.add_documents(&mmap)?;
// builder = builder.with_embedders(embedders.clone());
let received_documents =
@ -1357,6 +1369,17 @@ impl IndexScheduler {
// let pool = indexer_config.thread_pool.unwrap();
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
// let fields_ids_map = RwLock::new(fields_ids_map);
/// TODO correctly guess the primary key in a NDJSON
let pk = match std::env::var("MEILI_PRIMARY_KEY") {
Ok(pk) => pk,
Err(VarError::NotPresent) => "id".to_string(),
Err(e) => panic!("primary key error: {e}"),
};
fields_ids_map.insert(&pk);
let primary_key = PrimaryKey::new(&pk, &fields_ids_map).unwrap();
let param = (index, &rtxn, &primary_key);
let document_changes = indexer.document_changes(&mut fields_ids_map, param)?;
/// TODO pass/write the FieldsIdsMap

View File

@ -459,12 +459,13 @@ async fn document_addition(
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
}
let read_file = buffer.into_inner().into_std().await;
let mut read_file = buffer.into_inner().into_std().await;
let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = match format {
PayloadType::Json => read_json(&read_file, &mut update_file)?,
PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?,
/// TODO do not copy all the content
PayloadType::Ndjson => std::io::copy(&mut read_file, &mut update_file).unwrap(),
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?;

View File

@ -1,31 +1,26 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::Cursor;
use std::sync::Arc;
use heed::types::Bytes;
use heed::RoTxn;
use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::super::document_change::DocumentChange;
use super::super::items_pool::ItemsPool;
use super::DocumentChanges;
use crate::documents::{
obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey,
};
use crate::documents::PrimaryKey;
use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
pub struct DocumentOperation {
operations: Vec<Payload>,
pub struct DocumentOperation<'pl> {
operations: Vec<Payload<'pl>>,
index_documents_method: IndexDocumentsMethod,
}
pub enum Payload {
Addition(File),
pub enum Payload<'pl> {
Addition(&'pl [u8]),
Deletion(Vec<String>),
}
@ -35,36 +30,30 @@ pub struct PayloadStats {
}
#[derive(Clone)]
enum InnerDocOp {
Addition(DocumentOffset),
enum InnerDocOp<'pl> {
Addition(DocumentOffset<'pl>),
Deletion,
}
/// Represents an offset where a document lives
/// in an mmapped grenad reader file.
#[derive(Clone)]
pub struct DocumentOffset {
/// The mmapped grenad reader file.
pub content: Arc<Mmap>, // grenad::Reader
/// The offset of the document in the file.
pub offset: u32,
pub struct DocumentOffset<'pl> {
/// The mmapped payload files.
pub content: &'pl [u8],
}
impl DocumentOperation {
impl<'pl> DocumentOperation<'pl> {
pub fn new(method: IndexDocumentsMethod) -> Self {
Self { operations: Default::default(), index_documents_method: method }
}
/// TODO please give me a type
/// The payload is expected to be in the grenad format
pub fn add_documents(&mut self, payload: File) -> Result<PayloadStats> {
let reader = DocumentsBatchReader::from_reader(&payload)?;
let bytes = payload.metadata()?.len();
let document_count = reader.documents_count() as usize;
pub fn add_documents(&mut self, payload: &'pl [u8]) -> Result<PayloadStats> {
let document_count = memchr::Memchr::new(b'\n', payload).count();
self.operations.push(Payload::Addition(payload));
Ok(PayloadStats { bytes, document_count })
Ok(PayloadStats { bytes: payload.len() as u64, document_count })
}
pub fn delete_documents(&mut self, to_delete: Vec<String>) {
@ -72,7 +61,7 @@ impl DocumentOperation {
}
}
impl<'p> DocumentChanges<'p> for DocumentOperation {
impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> {
type Parameter = (&'p Index, &'p RoTxn<'p>, &'p PrimaryKey<'p>);
fn document_changes(
@ -84,48 +73,63 @@ impl<'p> DocumentChanges<'p> for DocumentOperation {
let documents_ids = index.documents_ids(rtxn)?;
let mut available_docids = AvailableIds::new(&documents_ids);
let mut docids_version_offsets = HashMap::<String, _>::new();
let mut docids_version_offsets = HashMap::<CowStr<'pl>, _>::new();
for operation in self.operations {
match operation {
Payload::Addition(payload) => {
let content = unsafe { Mmap::map(&payload).map(Arc::new)? };
let cursor = Cursor::new(content.as_ref());
let reader = DocumentsBatchReader::from_reader(cursor)?;
let mut iter =
serde_json::Deserializer::from_slice(payload).into_iter::<TopLevelMap>();
let (mut batch_cursor, batch_index) = reader.into_cursor_and_fields_index();
// TODO Fetch all document fields to fill the fields ids map
batch_index.iter().for_each(|(_, name)| {
fields_ids_map.insert(name);
});
/// TODO manage the error
let mut previous_offset = 0;
while let Some(document) = iter.next().transpose().unwrap() {
// TODO Fetch all document fields to fill the fields ids map
document.0.keys().for_each(|key| {
fields_ids_map.insert(key.as_ref());
});
let mut offset: u32 = 0;
while let Some(document) = batch_cursor.next_document()? {
// TODO we must manage the TooManyDocumentIds,InvalidDocumentId
// we must manage the unwrap
let external_document_id =
match primary_key.document_id(document, &batch_index)? {
Ok(document_id) => Ok(document_id),
Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => {
Err(user_error)
}
Err(DocumentIdExtractionError::MissingDocumentId) => {
Err(UserError::MissingDocumentId {
match get_docid(&document, &[primary_key.name()]).unwrap() {
Some(document_id) => document_id,
None => {
return Err(UserError::MissingDocumentId {
primary_key: primary_key.name().to_string(),
document: obkv_to_object(document, &batch_index)?,
})
document: todo!(),
// document: obkv_to_object(document, &batch_index)?,
}
.into());
}
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
Err(UserError::TooManyDocumentIds {
primary_key: primary_key.name().to_string(),
document: obkv_to_object(document, &batch_index)?,
})
}
}?;
};
let content = content.clone();
let document_offset = DocumentOffset { content, offset };
let document_operation = InnerDocOp::Addition(document_offset);
// let external_document_id =
// match primary_key.document_id(document, &batch_index)? {
// Ok(document_id) => Ok(document_id),
// Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => {
// Err(user_error)
// }
// Err(DocumentIdExtractionError::MissingDocumentId) => {
// Err(UserError::MissingDocumentId {
// primary_key: primary_key.name().to_string(),
// document: obkv_to_object(document, &batch_index)?,
// })
// }
// Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
// Err(UserError::TooManyDocumentIds {
// primary_key: primary_key.name().to_string(),
// document: obkv_to_object(document, &batch_index)?,
// })
// }
// }?;
match docids_version_offsets.get_mut(&external_document_id) {
let current_offset = iter.byte_offset();
let document_operation = InnerDocOp::Addition(DocumentOffset {
content: &payload[previous_offset..current_offset],
});
match docids_version_offsets.get_mut(external_document_id.as_ref()) {
None => {
let docid = match index
.external_documents_ids()
@ -144,12 +148,13 @@ impl<'p> DocumentChanges<'p> for DocumentOperation {
}
Some((_, offsets)) => offsets.push(document_operation),
}
offset += 1;
previous_offset = iter.byte_offset();
}
}
Payload::Deletion(to_delete) => {
for external_document_id in to_delete {
match docids_version_offsets.get_mut(&external_document_id) {
match docids_version_offsets.get_mut(external_document_id.as_str()) {
None => {
let docid = match index
.external_documents_ids()
@ -162,7 +167,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation {
};
docids_version_offsets.insert(
external_document_id,
CowStr(external_document_id.into()),
(docid, vec![InnerDocOp::Deletion]),
);
}
@ -196,7 +201,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation {
index,
&fields_ids_map,
internal_docid,
external_docid,
external_docid.to_string(), // TODO do not clone
&operations,
)
})
@ -221,22 +226,20 @@ fn merge_document_for_replacements(
let current: Option<&KvReaderFieldId> = current.map(Into::into);
match operations.last() {
Some(InnerDocOp::Addition(DocumentOffset { content, offset })) => {
let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?;
let (mut cursor, batch_index) = reader.into_cursor_and_fields_index();
let update = cursor.get(*offset)?.expect("must exists");
Some(InnerDocOp::Addition(DocumentOffset { content })) => {
let map: TopLevelMap = serde_json::from_slice(content).unwrap();
let mut document_entries = Vec::new();
update.into_iter().for_each(|(k, v)| {
let field_name = batch_index.name(k).unwrap();
let id = fields_ids_map.id(field_name).unwrap();
for (key, v) in map.0 {
let id = fields_ids_map.id(key.as_ref()).unwrap();
document_entries.push((id, v));
});
}
document_entries.sort_unstable_by_key(|(id, _)| *id);
let mut writer = KvWriterFieldId::memory();
document_entries.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap());
document_entries
.into_iter()
.for_each(|(id, value)| writer.insert(id, value.get()).unwrap());
let new = writer.into_boxed();
match current {
@ -305,22 +308,18 @@ fn merge_document_for_updates(
}
for operation in operations {
let DocumentOffset { content, offset } = match operation {
let DocumentOffset { content } = match operation {
InnerDocOp::Addition(offset) => offset,
InnerDocOp::Deletion => {
unreachable!("Deletion in document operations")
}
};
let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?;
let (mut cursor, batch_index) = reader.into_cursor_and_fields_index();
let update = cursor.get(*offset)?.expect("must exists");
update.into_iter().for_each(|(k, v)| {
let field_name = batch_index.name(k).unwrap();
let id = fields_ids_map.id(field_name).unwrap();
document.insert(id, v.to_vec().into());
});
let map: TopLevelMap = serde_json::from_slice(content).unwrap();
for (key, v) in map.0 {
let id = fields_ids_map.id(key.as_ref()).unwrap();
document.insert(id, v.get().as_bytes().to_vec().into());
}
}
let mut writer = KvWriterFieldId::memory();
@ -348,9 +347,21 @@ use serde_json::value::RawValue;
#[derive(Deserialize)]
pub struct TopLevelMap<'p>(#[serde(borrow)] BTreeMap<CowStr<'p>, &'p RawValue>);
#[derive(Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub struct CowStr<'p>(#[serde(borrow)] Cow<'p, str>);
impl CowStr<'_> {
fn to_string(&self) -> String {
self.0.to_string()
}
}
impl AsRef<str> for CowStr<'_> {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
impl<'doc> Borrow<str> for CowStr<'doc> {
fn borrow(&self) -> &str {
self.0.borrow()