2022-06-14 16:03:48 +02:00
|
|
|
use std::convert::TryInto;
|
2022-06-21 14:41:19 +02:00
|
|
|
use std::{error, fmt, io};
|
2021-08-31 11:44:15 +02:00
|
|
|
|
|
|
|
use obkv::KvReader;
|
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
|
2021-08-31 11:44:15 +02:00
|
|
|
use crate::FieldId;
|
|
|
|
|
|
|
|
/// The `DocumentsBatchReader` provides a way to iterate over documents that have been created with
|
|
|
|
/// a `DocumentsBatchWriter`.
|
|
|
|
///
|
|
|
|
/// The documents are returned in the form of `obkv::Reader` where each field is identified with a
|
|
|
|
/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index.
|
2022-06-14 16:03:48 +02:00
|
|
|
pub struct DocumentsBatchReader<R> {
|
|
|
|
cursor: grenad::ReaderCursor<R>,
|
|
|
|
fields_index: DocumentsBatchIndex,
|
2021-08-31 11:44:15 +02:00
|
|
|
}
|
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
|
2022-07-18 16:08:01 +02:00
|
|
|
pub fn new(cursor: DocumentsBatchCursor<R>, fields_index: DocumentsBatchIndex) -> Self {
|
|
|
|
Self { cursor: cursor.cursor, fields_index }
|
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
/// Construct a `DocumentsReader` from a reader.
|
|
|
|
///
|
2022-06-14 16:03:48 +02:00
|
|
|
/// It first retrieves the index, then moves to the first document. Use the `into_cursor`
|
|
|
|
/// method to iterator over the documents, from the first to the last.
|
2024-01-23 09:42:48 +01:00
|
|
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
2022-06-14 16:03:48 +02:00
|
|
|
pub fn from_reader(reader: R) -> Result<Self, Error> {
|
|
|
|
let reader = grenad::Reader::new(reader)?;
|
|
|
|
let mut cursor = reader.into_cursor()?;
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
let fields_index = match cursor.move_on_key_equal_to(DOCUMENTS_BATCH_INDEX_KEY)? {
|
|
|
|
Some((_, value)) => serde_json::from_slice(value).map_err(Error::Serialize)?,
|
|
|
|
None => return Err(Error::InvalidDocumentFormat),
|
|
|
|
};
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
Ok(DocumentsBatchReader { cursor, fields_index })
|
|
|
|
}
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
pub fn documents_count(&self) -> u32 {
|
|
|
|
self.cursor.len().saturating_sub(1).try_into().expect("Invalid number of documents")
|
|
|
|
}
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
pub fn is_empty(&self) -> bool {
|
|
|
|
self.cursor.len().saturating_sub(1) == 0
|
2021-08-31 11:44:15 +02:00
|
|
|
}
|
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
|
|
|
|
&self.fields_index
|
|
|
|
}
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
/// This method returns a forward cursor over the documents.
|
2022-07-18 16:08:01 +02:00
|
|
|
pub fn into_cursor_and_fields_index(self) -> (DocumentsBatchCursor<R>, DocumentsBatchIndex) {
|
2022-06-14 16:03:48 +02:00
|
|
|
let DocumentsBatchReader { cursor, fields_index } = self;
|
2022-07-18 16:08:01 +02:00
|
|
|
let mut cursor = DocumentsBatchCursor { cursor };
|
2022-06-14 16:03:48 +02:00
|
|
|
cursor.reset();
|
2022-07-18 16:08:01 +02:00
|
|
|
(cursor, fields_index)
|
2021-08-31 11:44:15 +02:00
|
|
|
}
|
2022-06-14 16:03:48 +02:00
|
|
|
}
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-06-14 16:03:48 +02:00
|
|
|
/// A forward cursor over the documents in a `DocumentsBatchReader`.
|
|
|
|
pub struct DocumentsBatchCursor<R> {
|
|
|
|
cursor: grenad::ReaderCursor<R>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<R> DocumentsBatchCursor<R> {
|
|
|
|
/// Resets the cursor to be able to read from the start again.
|
|
|
|
pub fn reset(&mut self) {
|
|
|
|
self.cursor.reset();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
|
|
|
|
/// Returns the next document, starting from the first one. Subsequent calls to
|
|
|
|
/// `next_document` advance the document reader until all the documents have been read.
|
2022-06-16 12:03:43 +02:00
|
|
|
pub fn next_document(
|
|
|
|
&mut self,
|
|
|
|
) -> Result<Option<KvReader<FieldId>>, DocumentsBatchCursorError> {
|
2022-06-14 16:03:48 +02:00
|
|
|
match self.cursor.move_on_next()? {
|
|
|
|
Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => {
|
|
|
|
Ok(Some(KvReader::new(value)))
|
|
|
|
}
|
|
|
|
_otherwise => Ok(None),
|
|
|
|
}
|
2021-08-31 11:44:15 +02:00
|
|
|
}
|
|
|
|
}
|
2022-06-16 12:03:43 +02:00
|
|
|
|
|
|
|
/// The possible error thrown by the `DocumentsBatchCursor` when iterating on the documents.
|
|
|
|
#[derive(Debug)]
|
2022-06-20 13:48:02 +02:00
|
|
|
pub enum DocumentsBatchCursorError {
|
|
|
|
Grenad(grenad::Error),
|
2022-06-21 14:41:19 +02:00
|
|
|
SerdeJson(serde_json::Error),
|
2022-06-16 12:03:43 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl From<grenad::Error> for DocumentsBatchCursorError {
|
|
|
|
fn from(error: grenad::Error) -> DocumentsBatchCursorError {
|
2022-06-20 13:48:02 +02:00
|
|
|
DocumentsBatchCursorError::Grenad(error)
|
2022-06-16 12:03:43 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-21 14:41:19 +02:00
|
|
|
impl From<serde_json::Error> for DocumentsBatchCursorError {
|
|
|
|
fn from(error: serde_json::Error) -> DocumentsBatchCursorError {
|
|
|
|
DocumentsBatchCursorError::SerdeJson(error)
|
2022-06-16 12:03:43 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl error::Error for DocumentsBatchCursorError {}
|
|
|
|
|
|
|
|
impl fmt::Display for DocumentsBatchCursorError {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
2022-06-20 13:48:02 +02:00
|
|
|
match self {
|
|
|
|
DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
|
2022-06-21 14:41:19 +02:00
|
|
|
DocumentsBatchCursorError::SerdeJson(e) => e.fmt(f),
|
2022-06-20 13:48:02 +02:00
|
|
|
}
|
2022-06-16 12:03:43 +02:00
|
|
|
}
|
|
|
|
}
|