mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 14:04:31 +01:00
Make the Transform read from an EnrichedDocumentsBatchReader
This commit is contained in:
parent
ea852200bb
commit
6a0a0ae94f
103
milli/src/documents/enriched.rs
Normal file
103
milli/src/documents/enriched.rs
Normal file
@ -0,0 +1,103 @@
|
||||
use std::fs::File;
|
||||
use std::{io, str};
|
||||
|
||||
use obkv::KvReader;
|
||||
|
||||
use super::{
|
||||
DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
|
||||
Error,
|
||||
};
|
||||
use crate::FieldId;
|
||||
|
||||
/// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have
|
||||
/// been created with a `DocumentsBatchWriter` and, for the enriched data,
|
||||
/// a simple `grenad::Reader<File>`.
|
||||
///
|
||||
/// The documents are returned in the form of `obkv::Reader` where each field is identified with a
|
||||
/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index.
|
||||
pub struct EnrichedDocumentsBatchReader<R> {
|
||||
documents: DocumentsBatchReader<R>,
|
||||
external_ids: grenad::ReaderCursor<File>,
|
||||
}
|
||||
|
||||
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
|
||||
pub fn new(
|
||||
documents: DocumentsBatchReader<R>,
|
||||
external_ids: grenad::Reader<File>,
|
||||
) -> Result<Self, Error> {
|
||||
if documents.documents_count() as u64 == external_ids.len() {
|
||||
Ok(EnrichedDocumentsBatchReader {
|
||||
documents,
|
||||
external_ids: external_ids.into_cursor()?,
|
||||
})
|
||||
} else {
|
||||
Err(Error::InvalidEnrichedData)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn documents_count(&self) -> u32 {
|
||||
self.documents.documents_count()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.documents.is_empty()
|
||||
}
|
||||
|
||||
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
|
||||
self.documents.documents_batch_index()
|
||||
}
|
||||
|
||||
/// This method returns a forward cursor over the enriched documents.
|
||||
pub fn into_cursor(self) -> EnrichedDocumentsBatchCursor<R> {
|
||||
let EnrichedDocumentsBatchReader { documents, mut external_ids } = self;
|
||||
external_ids.reset();
|
||||
EnrichedDocumentsBatchCursor { documents: documents.into_cursor(), external_ids }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct EnrichedDocument<'a> {
|
||||
pub document: KvReader<'a, FieldId>,
|
||||
pub external_id: &'a str,
|
||||
}
|
||||
|
||||
pub struct EnrichedDocumentsBatchCursor<R> {
|
||||
documents: DocumentsBatchCursor<R>,
|
||||
external_ids: grenad::ReaderCursor<File>,
|
||||
}
|
||||
|
||||
impl<R> EnrichedDocumentsBatchCursor<R> {
|
||||
pub fn into_reader(self) -> EnrichedDocumentsBatchReader<R> {
|
||||
let EnrichedDocumentsBatchCursor { documents, external_ids } = self;
|
||||
EnrichedDocumentsBatchReader { documents: documents.into_reader(), external_ids }
|
||||
}
|
||||
|
||||
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
|
||||
self.documents.documents_batch_index()
|
||||
}
|
||||
|
||||
/// Resets the cursor to be able to read from the start again.
|
||||
pub fn reset(&mut self) {
|
||||
self.documents.reset();
|
||||
self.external_ids.reset();
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchCursor<R> {
|
||||
/// Returns the next document, starting from the first one. Subsequent calls to
|
||||
/// `next_document` advance the document reader until all the documents have been read.
|
||||
pub fn next_enriched_document(
|
||||
&mut self,
|
||||
) -> Result<Option<EnrichedDocument>, DocumentsBatchCursorError> {
|
||||
let document = self.documents.next_document()?;
|
||||
let external_id = match self.external_ids.move_on_next()? {
|
||||
Some((_, bytes)) => Some(str::from_utf8(bytes)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
match document.zip(external_id) {
|
||||
Some((document, external_id)) => Ok(Some(EnrichedDocument { document, external_id })),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
@ -1,11 +1,14 @@
|
||||
mod builder;
|
||||
mod enriched;
|
||||
mod reader;
|
||||
|
||||
use std::fmt::{self, Debug};
|
||||
use std::io;
|
||||
use std::str::Utf8Error;
|
||||
|
||||
use bimap::BiHashMap;
|
||||
pub use builder::DocumentsBatchBuilder;
|
||||
pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
|
||||
use obkv::KvReader;
|
||||
pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -87,6 +90,8 @@ impl DocumentsBatchIndex {
|
||||
pub enum Error {
|
||||
ParseFloat { error: std::num::ParseFloatError, line: usize, value: String },
|
||||
InvalidDocumentFormat,
|
||||
InvalidEnrichedData,
|
||||
InvalidUtf8(Utf8Error),
|
||||
Csv(csv::Error),
|
||||
Json(serde_json::Error),
|
||||
Serialize(serde_json::Error),
|
||||
@ -118,6 +123,12 @@ impl From<grenad::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Utf8Error> for Error {
|
||||
fn from(other: Utf8Error) -> Self {
|
||||
Self::InvalidUtf8(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@ -127,6 +138,8 @@ impl fmt::Display for Error {
|
||||
Error::InvalidDocumentFormat => {
|
||||
f.write_str("Invalid document addition format, missing the documents batch index.")
|
||||
}
|
||||
Error::InvalidEnrichedData => f.write_str("Invalid enriched data."),
|
||||
Error::InvalidUtf8(e) => write!(f, "{}", e),
|
||||
Error::Io(e) => write!(f, "{}", e),
|
||||
Error::Serialize(e) => write!(f, "{}", e),
|
||||
Error::Grenad(e) => write!(f, "{}", e),
|
||||
|
@ -1,5 +1,5 @@
|
||||
use std::convert::TryInto;
|
||||
use std::{error, fmt, io};
|
||||
use std::{error, fmt, io, str};
|
||||
|
||||
use obkv::KvReader;
|
||||
|
||||
@ -93,19 +93,20 @@ impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
|
||||
|
||||
/// The possible error thrown by the `DocumentsBatchCursor` when iterating on the documents.
|
||||
#[derive(Debug)]
|
||||
pub struct DocumentsBatchCursorError {
|
||||
inner: grenad::Error,
|
||||
pub enum DocumentsBatchCursorError {
|
||||
Grenad(grenad::Error),
|
||||
Utf8(str::Utf8Error),
|
||||
}
|
||||
|
||||
impl From<grenad::Error> for DocumentsBatchCursorError {
|
||||
fn from(error: grenad::Error) -> DocumentsBatchCursorError {
|
||||
DocumentsBatchCursorError { inner: error }
|
||||
DocumentsBatchCursorError::Grenad(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<grenad::Error> for DocumentsBatchCursorError {
|
||||
fn into(self) -> grenad::Error {
|
||||
self.inner
|
||||
impl From<str::Utf8Error> for DocumentsBatchCursorError {
|
||||
fn from(error: str::Utf8Error) -> DocumentsBatchCursorError {
|
||||
DocumentsBatchCursorError::Utf8(error)
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,6 +114,9 @@ impl error::Error for DocumentsBatchCursorError {}
|
||||
|
||||
impl fmt::Display for DocumentsBatchCursorError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
self.inner.fmt(f)
|
||||
match self {
|
||||
DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
|
||||
DocumentsBatchCursorError::Utf8(e) => e.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ use rayon::ThreadPoolBuildError;
|
||||
use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::documents::DocumentsBatchCursorError;
|
||||
use crate::documents::{self, DocumentsBatchCursorError};
|
||||
use crate::{CriterionError, DocumentId, FieldId, Object, SortError};
|
||||
|
||||
pub fn is_reserved_keyword(keyword: &str) -> bool {
|
||||
@ -36,6 +36,8 @@ pub enum InternalError {
|
||||
FieldIdMappingMissingEntry { key: FieldId },
|
||||
#[error(transparent)]
|
||||
Fst(#[from] fst::Error),
|
||||
#[error(transparent)]
|
||||
DocumentsError(#[from] documents::Error),
|
||||
#[error("Invalid compression type have been specified to grenad.")]
|
||||
GrenadInvalidCompressionType,
|
||||
#[error("Invalid grenad file with an invalid version format.")]
|
||||
@ -185,6 +187,7 @@ macro_rules! error_from_sub_error {
|
||||
error_from_sub_error! {
|
||||
FieldIdMapMissingEntry => InternalError,
|
||||
fst::Error => InternalError,
|
||||
documents::Error => InternalError,
|
||||
str::Utf8Error => InternalError,
|
||||
ThreadPoolBuildError => InternalError,
|
||||
SerializationError => InternalError,
|
||||
@ -212,7 +215,10 @@ where
|
||||
|
||||
impl From<DocumentsBatchCursorError> for Error {
|
||||
fn from(error: DocumentsBatchCursorError) -> Error {
|
||||
Error::from(Into::<grenad::Error>::into(error))
|
||||
match error {
|
||||
DocumentsBatchCursorError::Grenad(e) => Error::from(e),
|
||||
DocumentsBatchCursorError::Utf8(e) => Error::from(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ pub use self::helpers::{
|
||||
};
|
||||
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
|
||||
pub use self::transform::{Transform, TransformOutput};
|
||||
use self::validate::validate_documents_batch;
|
||||
use self::validate::validate_and_enrich_documents_batch;
|
||||
pub use self::validate::{
|
||||
extract_float_from_value, validate_document_id, validate_document_id_value,
|
||||
validate_geo_from_json,
|
||||
@ -141,7 +141,7 @@ where
|
||||
// We check for user errors in this validator and if there is one, we can return
|
||||
// the `IndexDocument` struct as it is valid to send more documents into it.
|
||||
// However, if there is an internal error we throw it away!
|
||||
let reader = match validate_documents_batch(
|
||||
let enriched_documents_reader = match validate_and_enrich_documents_batch(
|
||||
self.wtxn,
|
||||
self.index,
|
||||
self.config.autogenerate_docids,
|
||||
@ -155,7 +155,7 @@ where
|
||||
.transform
|
||||
.as_mut()
|
||||
.expect("Invalid document addition state")
|
||||
.read_documents(reader, self.wtxn, &self.progress)?
|
||||
.read_documents(enriched_documents_reader, self.wtxn, &self.progress)?
|
||||
as u64;
|
||||
|
||||
self.added_documents += indexed_documents;
|
||||
|
@ -14,7 +14,7 @@ use smartstring::SmartString;
|
||||
|
||||
use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn};
|
||||
use super::{IndexDocumentsMethod, IndexerConfig};
|
||||
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader};
|
||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
||||
use crate::error::{Error, InternalError, UserError};
|
||||
use crate::index::db_name;
|
||||
use crate::update::index_documents::validate_document_id_value;
|
||||
@ -153,7 +153,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
|
||||
pub fn read_documents<R, F>(
|
||||
&mut self,
|
||||
reader: DocumentsBatchReader<R>,
|
||||
reader: EnrichedDocumentsBatchReader<R>,
|
||||
wtxn: &mut heed::RwTxn,
|
||||
progress_callback: F,
|
||||
) -> Result<usize>
|
||||
@ -189,7 +189,9 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
let mut external_id_buffer = Vec::new();
|
||||
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
||||
let addition_index = cursor.documents_batch_index().clone();
|
||||
while let Some(document) = cursor.next_document()? {
|
||||
while let Some(enriched_document) = cursor.next_enriched_document()? {
|
||||
let EnrichedDocument { document, external_id } = enriched_document;
|
||||
|
||||
let mut field_buffer_cache = drop_and_reuse(field_buffer);
|
||||
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
||||
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
||||
|
@ -4,27 +4,28 @@ use std::result::Result as StdResult;
|
||||
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader};
|
||||
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader};
|
||||
use crate::error::{GeoError, InternalError, UserError};
|
||||
use crate::update::index_documents::obkv_to_object;
|
||||
use crate::update::index_documents::{obkv_to_object, writer_into_reader};
|
||||
use crate::{FieldId, Index, Object, Result};
|
||||
|
||||
/// The symbol used to define levels in a nested primary key.
|
||||
const PRIMARY_KEY_SPLIT_SYMBOL: char = '.';
|
||||
|
||||
/// This function validates a documents by checking that:
|
||||
/// This function validates and enrich the documents by checking that:
|
||||
/// - we can infer a primary key,
|
||||
/// - all the documents id exist and,
|
||||
/// - all the documents id exist and are extracted,
|
||||
/// - the validity of them but also,
|
||||
/// - the validity of the `_geo` field depending on the settings.
|
||||
pub fn validate_documents_batch<R: Read + Seek>(
|
||||
pub fn validate_and_enrich_documents_batch<R: Read + Seek>(
|
||||
rtxn: &heed::RoTxn,
|
||||
index: &Index,
|
||||
autogenerate_docids: bool,
|
||||
reader: DocumentsBatchReader<R>,
|
||||
) -> Result<StdResult<DocumentsBatchReader<R>, UserError>> {
|
||||
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
|
||||
let mut cursor = reader.into_cursor();
|
||||
let mut documents_batch_index = cursor.documents_batch_index().clone();
|
||||
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;
|
||||
|
||||
// The primary key *field id* that has already been set for this index or the one
|
||||
// we will guess by searching for the first key that contains "id" as a substring.
|
||||
@ -82,6 +83,8 @@ pub fn validate_documents_batch<R: Read + Seek>(
|
||||
Err(user_error) => return Ok(Err(user_error)),
|
||||
};
|
||||
|
||||
external_ids.insert(count.to_be_bytes(), &document_id)?;
|
||||
|
||||
if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) {
|
||||
if let Err(user_error) = validate_geo_from_json(Value::from(document_id), geo_value)? {
|
||||
return Ok(Err(UserError::from(user_error)));
|
||||
@ -90,7 +93,10 @@ pub fn validate_documents_batch<R: Read + Seek>(
|
||||
count += 1;
|
||||
}
|
||||
|
||||
Ok(Ok(cursor.into_reader()))
|
||||
let external_ids = writer_into_reader(external_ids)?;
|
||||
let reader = EnrichedDocumentsBatchReader::new(cursor.into_reader(), external_ids)?;
|
||||
|
||||
Ok(Ok(reader))
|
||||
}
|
||||
|
||||
/// Retrieve the document id after validating it, returning a `UserError`
|
||||
@ -100,7 +106,7 @@ fn fetch_document_id(
|
||||
documents_batch_index: &DocumentsBatchIndex,
|
||||
primary_key: PrimaryKey,
|
||||
autogenerate_docids: bool,
|
||||
count: usize,
|
||||
count: u32,
|
||||
) -> Result<StdResult<String, UserError>> {
|
||||
match primary_key {
|
||||
PrimaryKey::Flat { name: primary_key, field_id: primary_key_id } => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user