Move crates under a sub folder to clean up the code

This commit is contained in:
Clément Renault 2024-10-21 08:18:43 +02:00
parent 30f3c30389
commit 9c1e54a2c8
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
1062 changed files with 19 additions and 20 deletions

View file

@ -0,0 +1,600 @@
use std::io::{self, Write};
use grenad::{CompressionType, WriterBuilder};
use serde::de::Deserializer;
use serde_json::{to_writer, Value};
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
use crate::documents::serde_impl::DocumentVisitor;
use crate::Object;
/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
/// format used by milli.
///
/// The writer used by the `DocumentsBatchBuilder` can be read using a `DocumentsBatchReader`
/// to iterate over the documents.
///
/// ## example:
/// ```
/// use serde_json::json;
/// use milli::documents::DocumentsBatchBuilder;
///
/// let json = json!({ "id": 1, "name": "foo" });
///
/// let mut builder = DocumentsBatchBuilder::new(Vec::new());
/// builder.append_json_object(json.as_object().unwrap()).unwrap();
/// let _vector = builder.into_inner().unwrap();
/// ```
pub struct DocumentsBatchBuilder<W> {
/// The inner grenad writer, the last value must always be the `DocumentsBatchIndex`.
writer: grenad::Writer<W>,
/// A map that creates the relation between field ids and field names.
fields_index: DocumentsBatchIndex,
/// The number of documents that were added to this builder,
/// it doesn't take the primary key of the documents into account at this point.
documents_count: u32,
/// A buffer to store a temporary obkv buffer and avoid reallocating.
obkv_buffer: Vec<u8>,
/// A buffer to serialize the values and avoid reallocating,
/// serialized values are stored in an obkv.
value_buffer: Vec<u8>,
}
impl<W: Write> DocumentsBatchBuilder<W> {
pub fn new(writer: W) -> DocumentsBatchBuilder<W> {
DocumentsBatchBuilder {
writer: WriterBuilder::new().compression_type(CompressionType::None).build(writer),
fields_index: DocumentsBatchIndex::default(),
documents_count: 0,
obkv_buffer: Vec::new(),
value_buffer: Vec::new(),
}
}
/// Returns the number of documents inserted into this builder.
pub fn documents_count(&self) -> u32 {
self.documents_count
}
/// Appends a new JSON object into the batch and updates the `DocumentsBatchIndex` accordingly.
pub fn append_json_object(&mut self, object: &Object) -> io::Result<()> {
// Make sure that we insert the fields ids in order as the obkv writer has this requirement.
let mut fields_ids: Vec<_> = object.keys().map(|k| self.fields_index.insert(k)).collect();
fields_ids.sort_unstable();
self.obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer);
for field_id in fields_ids {
let key = self.fields_index.name(field_id).unwrap();
self.value_buffer.clear();
to_writer(&mut self.value_buffer, &object[key])?;
writer.insert(field_id, &self.value_buffer)?;
}
let internal_id = self.documents_count.to_be_bytes();
let document_bytes = writer.into_inner()?;
self.writer.insert(internal_id, &document_bytes)?;
self.documents_count += 1;
Ok(())
}
/// Appends a new JSON array of objects into the batch and updates the `DocumentsBatchIndex` accordingly.
pub fn append_json_array<R: io::Read>(&mut self, reader: R) -> Result<(), Error> {
let mut de = serde_json::Deserializer::from_reader(reader);
let mut visitor = DocumentVisitor::new(self);
de.deserialize_any(&mut visitor)?
}
/// Appends a new CSV file into the batch and updates the `DocumentsBatchIndex` accordingly.
pub fn append_csv<R: io::Read>(&mut self, mut reader: csv::Reader<R>) -> Result<(), Error> {
// Make sure that we insert the fields ids in order as the obkv writer has this requirement.
let mut typed_fields_ids: Vec<_> = reader
.headers()?
.into_iter()
.map(parse_csv_header)
.map(|(k, t)| (self.fields_index.insert(k), t))
.enumerate()
.collect();
// Make sure that we insert the fields ids in order as the obkv writer has this requirement.
typed_fields_ids.sort_unstable_by_key(|(_, (fid, _))| *fid);
let mut record = csv::StringRecord::new();
let mut line = 0;
while reader.read_record(&mut record)? {
// We increment here and not at the end of the while loop to take
// the header offset into account.
line += 1;
self.obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer);
for (i, (field_id, type_)) in typed_fields_ids.iter() {
self.value_buffer.clear();
let value = &record[*i];
let trimmed_value = value.trim();
match type_ {
AllowedType::Number => {
if trimmed_value.is_empty() {
to_writer(&mut self.value_buffer, &Value::Null)?;
} else if let Ok(integer) = trimmed_value.parse::<i64>() {
to_writer(&mut self.value_buffer, &integer)?;
} else {
match trimmed_value.parse::<f64>() {
Ok(float) => {
to_writer(&mut self.value_buffer, &float)?;
}
Err(error) => {
return Err(Error::ParseFloat {
error,
line,
value: value.to_string(),
});
}
}
}
}
AllowedType::Boolean => {
if trimmed_value.is_empty() {
to_writer(&mut self.value_buffer, &Value::Null)?;
} else {
match trimmed_value.parse::<bool>() {
Ok(bool) => {
to_writer(&mut self.value_buffer, &bool)?;
}
Err(error) => {
return Err(Error::ParseBool {
error,
line,
value: value.to_string(),
});
}
}
}
}
AllowedType::String => {
if value.is_empty() {
to_writer(&mut self.value_buffer, &Value::Null)?;
} else {
to_writer(&mut self.value_buffer, value)?;
}
}
}
// We insert into the obkv writer the value buffer that has been filled just above.
writer.insert(*field_id, &self.value_buffer)?;
}
let internal_id = self.documents_count.to_be_bytes();
let document_bytes = writer.into_inner()?;
self.writer.insert(internal_id, &document_bytes)?;
self.documents_count += 1;
}
Ok(())
}
/// Flushes the content on disk and stores the final version of the `DocumentsBatchIndex`.
pub fn into_inner(mut self) -> io::Result<W> {
let DocumentsBatchBuilder { mut writer, fields_index, .. } = self;
// We serialize and insert the `DocumentsBatchIndex` as the last key of the grenad writer.
self.value_buffer.clear();
to_writer(&mut self.value_buffer, &fields_index)?;
writer.insert(DOCUMENTS_BATCH_INDEX_KEY, &self.value_buffer)?;
writer.into_inner()
}
}
#[derive(Debug)]
enum AllowedType {
String,
Boolean,
Number,
}
fn parse_csv_header(header: &str) -> (&str, AllowedType) {
// if there are several separators we only split on the last one.
match header.rsplit_once(':') {
Some((field_name, field_type)) => match field_type {
"string" => (field_name, AllowedType::String),
"boolean" => (field_name, AllowedType::Boolean),
"number" => (field_name, AllowedType::Number),
// if the pattern isn't recognized, we keep the whole field.
_otherwise => (header, AllowedType::String),
},
None => (header, AllowedType::String),
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use serde_json::json;
use super::*;
use crate::documents::{obkv_to_object, DocumentsBatchReader};
#[test]
fn add_single_documents_json() {
let json = serde_json::json!({
"id": 1,
"field": "hello!",
});
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_json_object(json.as_object().unwrap()).unwrap();
let json = serde_json::json!({
"blabla": false,
"field": "hello!",
"id": 1,
});
builder.append_json_object(json.as_object().unwrap()).unwrap();
assert_eq!(builder.documents_count(), 2);
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
assert_eq!(index.len(), 3);
let document = cursor.next_document().unwrap().unwrap();
assert_eq!(document.iter().count(), 2);
let document = cursor.next_document().unwrap().unwrap();
assert_eq!(document.iter().count(), 3);
assert!(cursor.next_document().unwrap().is_none());
}
#[test]
fn add_documents_csv() {
let csv_content = "id:number,field:string\n1,hello!\n2,blabla";
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
assert_eq!(builder.documents_count(), 2);
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
assert_eq!(index.len(), 2);
let document = cursor.next_document().unwrap().unwrap();
assert_eq!(document.iter().count(), 2);
let document = cursor.next_document().unwrap().unwrap();
assert_eq!(document.iter().count(), 2);
assert!(cursor.next_document().unwrap().is_none());
}
#[test]
fn simple_csv_document() {
let csv_content = r#"city,country,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
assert!(cursor.next_document().unwrap().is_none());
}
#[test]
fn coma_in_field() {
let csv_content = r#"city,country,pop
"Boston","United, States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city": "Boston",
"country": "United, States",
"pop": "4628910",
})
);
}
#[test]
fn quote_in_field() {
let csv_content = r#"city,country,pop
"Boston","United"" States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city": "Boston",
"country": "United\" States",
"pop": "4628910",
})
);
}
#[test]
fn integer_in_field() {
let csv_content = r#"city,country,pop:number
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910,
})
);
}
#[test]
fn integer_as_id() {
let csv_content = r#""id:number","title:string","comment:string"
"1239","Pride and Prejudice","A great book""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"id": 1239,
"title": "Pride and Prejudice",
"comment": "A great book",
})
);
}
#[test]
fn float_in_field() {
let csv_content = r#"city,country,pop:number
"Boston","United States","4628910.01""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.01,
})
);
}
#[test]
fn several_colon_in_header() {
let csv_content = r#"city:love:string,country:state,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city:love": "Boston",
"country:state": "United States",
"pop": "4628910",
})
);
}
#[test]
fn ending_by_colon_in_header() {
let csv_content = r#"city:,country,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn starting_by_colon_in_header() {
let csv_content = r#":city,country,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
":city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[ignore]
#[test]
fn starting_by_colon_in_header2() {
let csv_content = r#":string,country,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, _) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
assert!(cursor.next_document().is_err());
}
#[test]
fn double_colon_in_header() {
let csv_content = r#"city::string,country,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv).unwrap();
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
assert_eq!(
val,
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn bad_type_in_header() {
let csv_content = r#"city,country:number,pop
"Boston","United States","4628910""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
assert!(builder.append_csv(csv).is_err());
}
#[test]
fn bad_column_count1() {
let csv_content = r#"city,country,pop
"Boston","United States","4628910", "too much
let csv = csv::Reader::from_reader(Cursor::new(csv_content"#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
assert!(builder.append_csv(csv).is_err());
}
#[test]
fn bad_column_count2() {
let csv_content = r#"city,country,pop
"Boston","United States""#;
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
assert!(builder.append_csv(csv).is_err());
}
}

View file

@ -0,0 +1,110 @@
use std::fs::File;
use std::io::BufReader;
use std::{io, str};
use obkv::KvReader;
use super::{
DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
Error,
};
use crate::update::DocumentId;
use crate::FieldId;
/// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have
/// been created with a `DocumentsBatchWriter` and, for the enriched data,
/// a simple `grenad::Reader<File>`.
///
/// The documents are returned in the form of `obkv::Reader` where each field is identified with a
/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index.
pub struct EnrichedDocumentsBatchReader<R> {
documents: DocumentsBatchReader<R>,
primary_key: String,
external_ids: grenad::ReaderCursor<BufReader<File>>,
}
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
pub fn new(
documents: DocumentsBatchReader<R>,
primary_key: String,
external_ids: grenad::Reader<BufReader<File>>,
) -> Result<Self, Error> {
if documents.documents_count() as u64 == external_ids.len() {
Ok(EnrichedDocumentsBatchReader {
documents,
primary_key,
external_ids: external_ids.into_cursor()?,
})
} else {
Err(Error::InvalidEnrichedData)
}
}
pub fn documents_count(&self) -> u32 {
self.documents.documents_count()
}
pub fn primary_key(&self) -> &str {
&self.primary_key
}
pub fn is_empty(&self) -> bool {
self.documents.is_empty()
}
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
self.documents.documents_batch_index()
}
/// This method returns a forward cursor over the enriched documents.
pub fn into_cursor_and_fields_index(
self,
) -> (EnrichedDocumentsBatchCursor<R>, DocumentsBatchIndex) {
let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self;
let (documents, fields_index) = documents.into_cursor_and_fields_index();
external_ids.reset();
(EnrichedDocumentsBatchCursor { documents, primary_key, external_ids }, fields_index)
}
}
#[derive(Debug, Clone)]
pub struct EnrichedDocument<'a> {
pub document: KvReader<'a, FieldId>,
pub document_id: DocumentId,
}
pub struct EnrichedDocumentsBatchCursor<R> {
documents: DocumentsBatchCursor<R>,
primary_key: String,
external_ids: grenad::ReaderCursor<BufReader<File>>,
}
impl<R> EnrichedDocumentsBatchCursor<R> {
pub fn primary_key(&self) -> &str {
&self.primary_key
}
/// Resets the cursor to be able to read from the start again.
pub fn reset(&mut self) {
self.documents.reset();
self.external_ids.reset();
}
}
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchCursor<R> {
/// Returns the next document, starting from the first one. Subsequent calls to
/// `next_document` advance the document reader until all the documents have been read.
pub fn next_enriched_document(
&mut self,
) -> Result<Option<EnrichedDocument<'_>>, DocumentsBatchCursorError> {
let document = self.documents.next_document()?;
let document_id = match self.external_ids.move_on_next()? {
Some((_, bytes)) => serde_json::from_slice(bytes).map(Some)?,
None => None,
};
match document.zip(document_id) {
Some((document, document_id)) => Ok(Some(EnrichedDocument { document, document_id })),
None => Ok(None),
}
}
}

View file

@ -0,0 +1,273 @@
mod builder;
mod enriched;
mod primary_key;
mod reader;
mod serde_impl;
use std::fmt::Debug;
use std::io;
use std::str::Utf8Error;
use bimap::BiHashMap;
pub use builder::DocumentsBatchBuilder;
pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
use obkv::KvReader;
pub use primary_key::{
validate_document_id_value, DocumentIdExtractionError, FieldIdMapper, PrimaryKey,
DEFAULT_PRIMARY_KEY,
};
pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
use serde::{Deserialize, Serialize};
use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::{FieldId, Object, Result};
/// The key that is used to store the `DocumentsBatchIndex` datastructure,
/// it is the absolute last key of the list.
const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes();
/// Helper function to convert an obkv reader into a JSON object.
pub fn obkv_to_object(obkv: &KvReader<'_, FieldId>, index: &DocumentsBatchIndex) -> Result<Object> {
obkv.iter()
.map(|(field_id, value)| {
let field_name = index
.name(field_id)
.ok_or(FieldIdMapMissingEntry::FieldId { field_id, process: "obkv_to_object" })?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
Ok((field_name.to_string(), value))
})
.collect()
}
/// A bidirectional map that links field ids to their name in a document batch.
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct DocumentsBatchIndex(pub BiHashMap<FieldId, String>);
impl DocumentsBatchIndex {
/// Insert the field in the map, or return it's field id if it doesn't already exists.
pub fn insert(&mut self, field: &str) -> FieldId {
match self.0.get_by_right(field) {
Some(field_id) => *field_id,
None => {
let field_id = self.0.len() as FieldId;
self.0.insert(field_id, field.to_string());
field_id
}
}
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> bimap::hash::Iter<'_, FieldId, String> {
self.0.iter()
}
pub fn name(&self, id: FieldId) -> Option<&str> {
self.0.get_by_left(&id).map(AsRef::as_ref)
}
pub fn id(&self, name: &str) -> Option<FieldId> {
self.0.get_by_right(name).cloned()
}
pub fn recreate_json(&self, document: &obkv::KvReaderU16<'_>) -> Result<Object> {
let mut map = Object::new();
for (k, v) in document.iter() {
// TODO: TAMO: update the error type
let key =
self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone();
let value = serde_json::from_slice::<serde_json::Value>(v)
.map_err(crate::error::InternalError::SerdeJson)?;
map.insert(key, value);
}
Ok(map)
}
}
impl FieldIdMapper for DocumentsBatchIndex {
fn id(&self, name: &str) -> Option<FieldId> {
self.id(name)
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Error parsing number {value:?} at line {line}: {error}")]
ParseFloat { error: std::num::ParseFloatError, line: usize, value: String },
#[error("Error parsing boolean {value:?} at line {line}: {error}")]
ParseBool { error: std::str::ParseBoolError, line: usize, value: String },
#[error("Invalid document addition format, missing the documents batch index.")]
InvalidDocumentFormat,
#[error("Invalid enriched data.")]
InvalidEnrichedData,
#[error(transparent)]
InvalidUtf8(#[from] Utf8Error),
#[error(transparent)]
Csv(#[from] csv::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
Serialize(serde_json::Error),
#[error(transparent)]
Grenad(#[from] grenad::Error),
#[error(transparent)]
Io(#[from] io::Error),
}
pub fn objects_from_json_value(json: serde_json::Value) -> Vec<crate::Object> {
let documents = match json {
object @ serde_json::Value::Object(_) => vec![object],
serde_json::Value::Array(objects) => objects,
invalid => {
panic!("an array of objects must be specified, {:#?} is not an array", invalid)
}
};
let mut objects = vec![];
for document in documents {
let object = match document {
serde_json::Value::Object(object) => object,
invalid => panic!("an object must be specified, {:#?} is not an object", invalid),
};
objects.push(object);
}
objects
}
/// Macro used to generate documents, with the same syntax as `serde_json::json`
#[cfg(test)]
macro_rules! documents {
($data:tt) => {{
let documents = serde_json::json!($data);
let documents = $crate::documents::objects_from_json_value(documents);
$crate::documents::documents_batch_reader_from_objects(documents)
}};
}
pub fn documents_batch_reader_from_objects(
objects: impl IntoIterator<Item = Object>,
) -> DocumentsBatchReader<std::io::Cursor<Vec<u8>>> {
let mut builder = DocumentsBatchBuilder::new(Vec::new());
for object in objects {
builder.append_json_object(&object).unwrap();
}
let vector = builder.into_inner().unwrap();
DocumentsBatchReader::from_reader(std::io::Cursor::new(vector)).unwrap()
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use serde_json::{json, Value};
use super::*;
#[test]
fn create_documents_no_errors() {
let value = json!({
"number": 1,
"string": "this is a field",
"array": ["an", "array"],
"object": {
"key": "value",
},
"bool": true
});
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_json_object(value.as_object().unwrap()).unwrap();
let vector = builder.into_inner().unwrap();
let (mut documents, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
assert_eq!(index.iter().count(), 5);
let reader = documents.next_document().unwrap().unwrap();
assert_eq!(reader.iter().count(), 5);
assert!(documents.next_document().unwrap().is_none());
}
#[test]
fn test_add_multiple_documents() {
let doc1 = json!({
"bool": true,
});
let doc2 = json!({
"toto": false,
});
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_json_object(doc1.as_object().unwrap()).unwrap();
builder.append_json_object(doc2.as_object().unwrap()).unwrap();
let vector = builder.into_inner().unwrap();
let (mut documents, index) = DocumentsBatchReader::from_reader(io::Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
assert_eq!(index.iter().count(), 2);
let reader = documents.next_document().unwrap().unwrap();
assert_eq!(reader.iter().count(), 1);
assert!(documents.next_document().unwrap().is_some());
assert!(documents.next_document().unwrap().is_none());
}
#[test]
fn test_nested() {
let docs_reader = documents!([{
"hello": {
"toto": ["hello"]
}
}]);
let (mut cursor, _) = docs_reader.into_cursor_and_fields_index();
let doc = cursor.next_document().unwrap().unwrap();
let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap();
assert_eq!(nested, json!({ "toto": ["hello"] }));
}
#[test]
fn out_of_order_json_fields() {
let _documents = documents!([
{"id": 1,"b": 0},
{"id": 2,"a": 0,"b": 0},
]);
}
#[test]
fn csv_types_dont_panic() {
let csv1_content =
"id:number,b:boolean,c,d:number\n1,,,\n2,true,doggo,2\n3,false,the best doggo,-2\n4,,\"Hello, World!\",2.5";
let csv1 = csv::Reader::from_reader(Cursor::new(csv1_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv1).unwrap();
let vector = builder.into_inner().unwrap();
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap();
}
#[test]
fn out_of_order_csv_fields() {
let csv1_content = "id:number,b\n1,0";
let csv1 = csv::Reader::from_reader(Cursor::new(csv1_content));
let csv2_content = "id:number,a,b\n2,0,0";
let csv2 = csv::Reader::from_reader(Cursor::new(csv2_content));
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_csv(csv1).unwrap();
builder.append_csv(csv2).unwrap();
let vector = builder.into_inner().unwrap();
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap();
}
}

View file

@ -0,0 +1,174 @@
use std::iter;
use std::result::Result as StdResult;
use serde_json::Value;
use crate::{FieldId, InternalError, Object, Result, UserError};
/// The symbol used to define levels in a nested primary key.
const PRIMARY_KEY_SPLIT_SYMBOL: char = '.';
/// The default primary that is used when not specified.
pub const DEFAULT_PRIMARY_KEY: &str = "id";
/// Trait for objects that can map the name of a field to its [`FieldId`].
pub trait FieldIdMapper {
/// Attempts to map the passed name to its [`FieldId`].
///
/// `None` if the field with this name was not found.
fn id(&self, name: &str) -> Option<FieldId>;
}
/// A type that represent the type of primary key that has been set
/// for this index, a classic flat one or a nested one.
#[derive(Debug, Clone, Copy)]
pub enum PrimaryKey<'a> {
Flat { name: &'a str, field_id: FieldId },
Nested { name: &'a str },
}
pub enum DocumentIdExtractionError {
InvalidDocumentId(UserError),
MissingDocumentId,
TooManyDocumentIds(usize),
}
impl<'a> PrimaryKey<'a> {
pub fn new(path: &'a str, fields: &impl FieldIdMapper) -> Option<Self> {
Some(if path.contains(PRIMARY_KEY_SPLIT_SYMBOL) {
Self::Nested { name: path }
} else {
let field_id = fields.id(path)?;
Self::Flat { name: path, field_id }
})
}
pub fn name(&self) -> &str {
match self {
PrimaryKey::Flat { name, .. } => name,
PrimaryKey::Nested { name } => name,
}
}
pub fn document_id(
&self,
document: &obkv::KvReader<'_, FieldId>,
fields: &impl FieldIdMapper,
) -> Result<StdResult<String, DocumentIdExtractionError>> {
match self {
PrimaryKey::Flat { name: _, field_id } => match document.get(*field_id) {
Some(document_id_bytes) => {
let document_id = serde_json::from_slice(document_id_bytes)
.map_err(InternalError::SerdeJson)?;
match validate_document_id_value(document_id) {
Ok(document_id) => Ok(Ok(document_id)),
Err(user_error) => {
Ok(Err(DocumentIdExtractionError::InvalidDocumentId(user_error)))
}
}
}
None => Ok(Err(DocumentIdExtractionError::MissingDocumentId)),
},
nested @ PrimaryKey::Nested { .. } => {
let mut matching_documents_ids = Vec::new();
for (first_level_name, right) in nested.possible_level_names() {
if let Some(field_id) = fields.id(first_level_name) {
if let Some(value_bytes) = document.get(field_id) {
let object = serde_json::from_slice(value_bytes)
.map_err(InternalError::SerdeJson)?;
fetch_matching_values(object, right, &mut matching_documents_ids);
if matching_documents_ids.len() >= 2 {
return Ok(Err(DocumentIdExtractionError::TooManyDocumentIds(
matching_documents_ids.len(),
)));
}
}
}
}
match matching_documents_ids.pop() {
Some(document_id) => match validate_document_id_value(document_id) {
Ok(document_id) => Ok(Ok(document_id)),
Err(user_error) => {
Ok(Err(DocumentIdExtractionError::InvalidDocumentId(user_error)))
}
},
None => Ok(Err(DocumentIdExtractionError::MissingDocumentId)),
}
}
}
}
/// Returns an `Iterator` that gives all the possible fields names the primary key
/// can have depending of the first level name and depth of the objects.
pub fn possible_level_names(&self) -> impl Iterator<Item = (&str, &str)> + '_ {
let name = self.name();
name.match_indices(PRIMARY_KEY_SPLIT_SYMBOL)
.map(move |(i, _)| (&name[..i], &name[i + PRIMARY_KEY_SPLIT_SYMBOL.len_utf8()..]))
.chain(iter::once((name, "")))
}
}
fn fetch_matching_values(value: Value, selector: &str, output: &mut Vec<Value>) {
match value {
Value::Object(object) => fetch_matching_values_in_object(object, selector, "", output),
otherwise => output.push(otherwise),
}
}
fn fetch_matching_values_in_object(
object: Object,
selector: &str,
base_key: &str,
output: &mut Vec<Value>,
) {
for (key, value) in object {
let base_key = if base_key.is_empty() {
key.to_string()
} else {
format!("{}{}{}", base_key, PRIMARY_KEY_SPLIT_SYMBOL, key)
};
if starts_with(selector, &base_key) {
match value {
Value::Object(object) => {
fetch_matching_values_in_object(object, selector, &base_key, output)
}
value => output.push(value),
}
}
}
}
fn starts_with(selector: &str, key: &str) -> bool {
selector.strip_prefix(key).map_or(false, |tail| {
tail.chars().next().map(|c| c == PRIMARY_KEY_SPLIT_SYMBOL).unwrap_or(true)
})
}
// FIXME: move to a DocumentId struct
fn validate_document_id(document_id: &str) -> Option<&str> {
if document_id.is_empty()
|| document_id.len() > 512
|| !document_id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
{
None
} else {
Some(document_id)
}
}
pub fn validate_document_id_value(document_id: Value) -> StdResult<String, UserError> {
match document_id {
Value::String(string) => match validate_document_id(&string) {
Some(s) if s.len() == string.len() => Ok(string),
Some(s) => Ok(s.to_string()),
None => Err(UserError::InvalidDocumentId { document_id: Value::String(string) }),
},
// a `u64` or `i64` cannot be more than 512 bytes once converted to a string
Value::Number(number) if !number.is_f64() => Ok(number.to_string()),
content => Err(UserError::InvalidDocumentId { document_id: content }),
}
}

View file

@ -0,0 +1,117 @@
use std::convert::TryInto;
use std::{error, fmt, io};
use obkv::KvReader;
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
use crate::FieldId;
/// The `DocumentsBatchReader` provides a way to iterate over documents that have been created with
/// a `DocumentsBatchWriter`.
///
/// The documents are returned in the form of `obkv::Reader` where each field is identified with a
/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index.
pub struct DocumentsBatchReader<R> {
cursor: grenad::ReaderCursor<R>,
fields_index: DocumentsBatchIndex,
}
impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
pub fn new(cursor: DocumentsBatchCursor<R>, fields_index: DocumentsBatchIndex) -> Self {
Self { cursor: cursor.cursor, fields_index }
}
/// Construct a `DocumentsReader` from a reader.
///
/// It first retrieves the index, then moves to the first document. Use the `into_cursor`
/// method to iterator over the documents, from the first to the last.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
pub fn from_reader(reader: R) -> Result<Self, Error> {
let reader = grenad::Reader::new(reader)?;
let mut cursor = reader.into_cursor()?;
let fields_index = match cursor.move_on_key_equal_to(DOCUMENTS_BATCH_INDEX_KEY)? {
Some((_, value)) => serde_json::from_slice(value).map_err(Error::Serialize)?,
None => return Err(Error::InvalidDocumentFormat),
};
Ok(DocumentsBatchReader { cursor, fields_index })
}
pub fn documents_count(&self) -> u32 {
self.cursor.len().saturating_sub(1).try_into().expect("Invalid number of documents")
}
pub fn is_empty(&self) -> bool {
self.cursor.len().saturating_sub(1) == 0
}
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
&self.fields_index
}
/// This method returns a forward cursor over the documents.
pub fn into_cursor_and_fields_index(self) -> (DocumentsBatchCursor<R>, DocumentsBatchIndex) {
let DocumentsBatchReader { cursor, fields_index } = self;
let mut cursor = DocumentsBatchCursor { cursor };
cursor.reset();
(cursor, fields_index)
}
}
/// A forward cursor over the documents in a `DocumentsBatchReader`.
pub struct DocumentsBatchCursor<R> {
cursor: grenad::ReaderCursor<R>,
}
impl<R> DocumentsBatchCursor<R> {
/// Resets the cursor to be able to read from the start again.
pub fn reset(&mut self) {
self.cursor.reset();
}
}
impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
/// Returns the next document, starting from the first one. Subsequent calls to
/// `next_document` advance the document reader until all the documents have been read.
pub fn next_document(
&mut self,
) -> Result<Option<KvReader<'_, FieldId>>, DocumentsBatchCursorError> {
match self.cursor.move_on_next()? {
Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => {
Ok(Some(KvReader::new(value)))
}
_otherwise => Ok(None),
}
}
}
/// The possible error thrown by the `DocumentsBatchCursor` when iterating on the documents.
#[derive(Debug)]
pub enum DocumentsBatchCursorError {
Grenad(grenad::Error),
SerdeJson(serde_json::Error),
}
impl From<grenad::Error> for DocumentsBatchCursorError {
fn from(error: grenad::Error) -> DocumentsBatchCursorError {
DocumentsBatchCursorError::Grenad(error)
}
}
impl From<serde_json::Error> for DocumentsBatchCursorError {
fn from(error: serde_json::Error) -> DocumentsBatchCursorError {
DocumentsBatchCursorError::SerdeJson(error)
}
}
impl error::Error for DocumentsBatchCursorError {}
impl fmt::Display for DocumentsBatchCursorError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
DocumentsBatchCursorError::SerdeJson(e) => e.fmt(f),
}
}
}

View file

@ -0,0 +1,76 @@
use std::fmt;
use std::io::Write;
use serde::de::{DeserializeSeed, MapAccess, SeqAccess, Visitor};
use super::Error;
use crate::documents::DocumentsBatchBuilder;
use crate::Object;
macro_rules! tri {
($e:expr) => {
match $e {
Ok(r) => r,
Err(e) => return Ok(Err(e.into())),
}
};
}
pub struct DocumentVisitor<'a, W> {
inner: &'a mut DocumentsBatchBuilder<W>,
object: Object,
}
impl<'a, W> DocumentVisitor<'a, W> {
pub fn new(inner: &'a mut DocumentsBatchBuilder<W>) -> Self {
DocumentVisitor { inner, object: Object::new() }
}
}
impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
/// This Visitor value is nothing, since it write the value to a file.
type Value = Result<(), Error>;
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
while let Some(v) = seq.next_element_seed(&mut *self)? {
tri!(v)
}
Ok(Ok(()))
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
self.object.clear();
while let Some((key, value)) = map.next_entry()? {
self.object.insert(key, value);
}
tri!(self.inner.append_json_object(&self.object));
Ok(Ok(()))
}
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "a documents, or a sequence of documents.")
}
}
impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W>
where
W: Write,
{
type Value = Result<(), Error>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(self)
}
}