Introduce a function to extend from a JSON array of objects

This commit is contained in:
Kerollmops 2022-07-11 18:38:50 +02:00
parent dc61105554
commit a892a4a79c
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 89 additions and 12 deletions

View File

@ -164,11 +164,8 @@ fn documents_from_jsonl(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
fn documents_from_json(reader: impl BufRead) -> anyhow::Result<Vec<u8>> { fn documents_from_json(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new()); let mut documents = DocumentsBatchBuilder::new(Vec::new());
let list: Vec<Object> = serde_json::from_reader(reader)?;
for object in list { documents.append_json_array(reader)?;
documents.append_json_object(&object)?;
}
documents.into_inner().map_err(Into::into) documents.into_inner().map_err(Into::into)
} }

View File

@ -337,11 +337,8 @@ fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> {
fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> { fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new()); let mut documents = DocumentsBatchBuilder::new(Vec::new());
let list: Vec<Object> = serde_json::from_reader(reader)?;
for object in list { documents.append_json_array(reader)?;
documents.append_json_object(&object)?;
}
documents.into_inner().map_err(Into::into) documents.into_inner().map_err(Into::into)
} }

View File

@ -1042,11 +1042,8 @@ fn documents_from_jsonl(reader: impl Read) -> anyhow::Result<Vec<u8>> {
fn documents_from_json(reader: impl Read) -> anyhow::Result<Vec<u8>> { fn documents_from_json(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new()); let mut documents = DocumentsBatchBuilder::new(Vec::new());
let list: Vec<Object> = serde_json::from_reader(reader)?;
for object in list { documents.append_json_array(reader)?;
documents.append_json_object(&object)?;
}
documents.into_inner().map_err(Into::into) documents.into_inner().map_err(Into::into)
} }

View File

@ -1,9 +1,11 @@
use std::io::{self, Write}; use std::io::{self, Write};
use grenad::{CompressionType, WriterBuilder}; use grenad::{CompressionType, WriterBuilder};
use serde::de::Deserializer;
use serde_json::{to_writer, Value}; use serde_json::{to_writer, Value};
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY}; use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
use crate::documents::serde_impl::DocumentVisitor;
use crate::Object; use crate::Object;
/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary /// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
@ -78,6 +80,13 @@ impl<W: Write> DocumentsBatchBuilder<W> {
Ok(()) Ok(())
} }
/// Appends a new JSON array of objects into the batch and updates the `DocumentsBatchIndex` accordingly.
pub fn append_json_array<R: io::Read>(&mut self, reader: R) -> Result<(), Error> {
let mut de = serde_json::Deserializer::from_reader(reader);
let mut visitor = DocumentVisitor::new(self);
de.deserialize_any(&mut visitor)?
}
/// Appends a new CSV file into the batch and updates the `DocumentsBatchIndex` accordingly. /// Appends a new CSV file into the batch and updates the `DocumentsBatchIndex` accordingly.
pub fn append_csv<R: io::Read>(&mut self, mut reader: csv::Reader<R>) -> Result<(), Error> { pub fn append_csv<R: io::Read>(&mut self, mut reader: csv::Reader<R>) -> Result<(), Error> {
// Make sure that we insert the fields ids in order as the obkv writer has this requirement. // Make sure that we insert the fields ids in order as the obkv writer has this requirement.

View File

@ -1,6 +1,7 @@
mod builder; mod builder;
mod enriched; mod enriched;
mod reader; mod reader;
mod serde_impl;
use std::fmt::{self, Debug}; use std::fmt::{self, Debug};
use std::io; use std::io;

View File

@ -0,0 +1,76 @@
use std::fmt;
use std::io::Write;
use serde::de::{DeserializeSeed, MapAccess, SeqAccess, Visitor};
use super::Error;
use crate::documents::DocumentsBatchBuilder;
use crate::Object;
macro_rules! tri {
($e:expr) => {
match $e {
Ok(r) => r,
Err(e) => return Ok(Err(e.into())),
}
};
}
pub struct DocumentVisitor<'a, W> {
inner: &'a mut DocumentsBatchBuilder<W>,
object: Object,
}
impl<'a, W> DocumentVisitor<'a, W> {
pub fn new(inner: &'a mut DocumentsBatchBuilder<W>) -> Self {
DocumentVisitor { inner, object: Object::new() }
}
}
impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
/// This Visitor value is nothing, since it write the value to a file.
type Value = Result<(), Error>;
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
while let Some(v) = seq.next_element_seed(&mut *self)? {
tri!(v)
}
Ok(Ok(()))
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
self.object.clear();
while let Some((key, value)) = map.next_entry()? {
self.object.insert(key, value);
}
tri!(self.inner.append_json_object(&self.object));
Ok(Ok(()))
}
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "a documents, or a sequence of documents.")
}
}
impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W>
where
W: Write,
{
type Value = Result<(), Error>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(self)
}
}