diff --git a/benchmarks/benches/utils.rs b/benchmarks/benches/utils.rs index 630e17943..51178b43b 100644 --- a/benchmarks/benches/utils.rs +++ b/benchmarks/benches/utils.rs @@ -164,11 +164,8 @@ fn documents_from_jsonl(reader: impl BufRead) -> anyhow::Result> { fn documents_from_json(reader: impl BufRead) -> anyhow::Result> { let mut documents = DocumentsBatchBuilder::new(Vec::new()); - let list: Vec = serde_json::from_reader(reader)?; - for object in list { - documents.append_json_object(&object)?; - } + documents.append_json_array(reader)?; documents.into_inner().map_err(Into::into) } diff --git a/cli/src/main.rs b/cli/src/main.rs index 0d197af17..35fef95c6 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -337,11 +337,8 @@ fn documents_from_jsonl(reader: impl Read) -> Result> { fn documents_from_json(reader: impl Read) -> Result> { let mut documents = DocumentsBatchBuilder::new(Vec::new()); - let list: Vec = serde_json::from_reader(reader)?; - for object in list { - documents.append_json_object(&object)?; - } + documents.append_json_array(reader)?; documents.into_inner().map_err(Into::into) } diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 117aa31e8..83fce9a9c 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -1042,11 +1042,8 @@ fn documents_from_jsonl(reader: impl Read) -> anyhow::Result> { fn documents_from_json(reader: impl Read) -> anyhow::Result> { let mut documents = DocumentsBatchBuilder::new(Vec::new()); - let list: Vec = serde_json::from_reader(reader)?; - for object in list { - documents.append_json_object(&object)?; - } + documents.append_json_array(reader)?; documents.into_inner().map_err(Into::into) } diff --git a/milli/src/documents/builder.rs b/milli/src/documents/builder.rs index 589e52269..bb9d6aa68 100644 --- a/milli/src/documents/builder.rs +++ b/milli/src/documents/builder.rs @@ -1,9 +1,11 @@ use std::io::{self, Write}; use grenad::{CompressionType, WriterBuilder}; +use serde::de::Deserializer; use serde_json::{to_writer, Value}; use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY}; +use crate::documents::serde_impl::DocumentVisitor; use crate::Object; /// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary @@ -78,6 +80,13 @@ impl DocumentsBatchBuilder { Ok(()) } + /// Appends a new JSON array of objects into the batch and updates the `DocumentsBatchIndex` accordingly. + pub fn append_json_array(&mut self, reader: R) -> Result<(), Error> { + let mut de = serde_json::Deserializer::from_reader(reader); + let mut visitor = DocumentVisitor::new(self); + de.deserialize_any(&mut visitor)? + } + /// Appends a new CSV file into the batch and updates the `DocumentsBatchIndex` accordingly. pub fn append_csv(&mut self, mut reader: csv::Reader) -> Result<(), Error> { // Make sure that we insert the fields ids in order as the obkv writer has this requirement. diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 43bfc1c20..c5ff7a120 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -1,6 +1,7 @@ mod builder; mod enriched; mod reader; +mod serde_impl; use std::fmt::{self, Debug}; use std::io; diff --git a/milli/src/documents/serde_impl.rs b/milli/src/documents/serde_impl.rs new file mode 100644 index 000000000..d4abdc844 --- /dev/null +++ b/milli/src/documents/serde_impl.rs @@ -0,0 +1,76 @@ +use std::fmt; +use std::io::Write; + +use serde::de::{DeserializeSeed, MapAccess, SeqAccess, Visitor}; + +use super::Error; +use crate::documents::DocumentsBatchBuilder; +use crate::Object; + +macro_rules! tri { + ($e:expr) => { + match $e { + Ok(r) => r, + Err(e) => return Ok(Err(e.into())), + } + }; +} + +pub struct DocumentVisitor<'a, W> { + inner: &'a mut DocumentsBatchBuilder, + object: Object, +} + +impl<'a, W> DocumentVisitor<'a, W> { + pub fn new(inner: &'a mut DocumentsBatchBuilder) -> Self { + DocumentVisitor { inner, object: Object::new() } + } +} + +impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> { + /// This Visitor value is nothing, since it write the value to a file. + type Value = Result<(), Error>; + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + while let Some(v) = seq.next_element_seed(&mut *self)? { + tri!(v) + } + + Ok(Ok(())) + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + self.object.clear(); + while let Some((key, value)) = map.next_entry()? { + self.object.insert(key, value); + } + + tri!(self.inner.append_json_object(&self.object)); + + Ok(Ok(())) + } + + fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "a documents, or a sequence of documents.") + } +} + +impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W> +where + W: Write, +{ + type Value = Result<(), Error>; + + fn deserialize(self, deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_map(self) + } +}