mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 05:14:27 +01:00
optimize document deserialization
This commit is contained in:
parent
b6af84eb77
commit
8d70b01714
@ -1,9 +1,13 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use byteorder::{BigEndian, WriteBytesExt};
|
use byteorder::{BigEndian, WriteBytesExt};
|
||||||
use serde::ser::Serialize;
|
use serde::Deserializer;
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::serde::DocumentSerializer;
|
use crate::FieldId;
|
||||||
|
|
||||||
|
use super::serde::DocumentVisitor;
|
||||||
use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error};
|
use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error};
|
||||||
|
|
||||||
/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
|
/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
|
||||||
@ -24,7 +28,12 @@ use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error};
|
|||||||
/// builder.finish().unwrap();
|
/// builder.finish().unwrap();
|
||||||
/// ```
|
/// ```
|
||||||
pub struct DocumentBatchBuilder<W> {
|
pub struct DocumentBatchBuilder<W> {
|
||||||
serializer: DocumentSerializer<W>,
|
inner: ByteCounter<W>,
|
||||||
|
index: DocumentsBatchIndex,
|
||||||
|
obkv_buffer: Vec<u8>,
|
||||||
|
value_buffer: Vec<u8>,
|
||||||
|
values: BTreeMap<FieldId, Value>,
|
||||||
|
count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
|
impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
|
||||||
@ -34,27 +43,33 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
|
|||||||
// add space to write the offset of the metadata at the end of the writer
|
// add space to write the offset of the metadata at the end of the writer
|
||||||
writer.write_u64::<BigEndian>(0)?;
|
writer.write_u64::<BigEndian>(0)?;
|
||||||
|
|
||||||
let serializer =
|
let this = Self {
|
||||||
DocumentSerializer { writer, buffer: Vec::new(), index, count: 0, allow_seq: true };
|
inner: writer,
|
||||||
|
index,
|
||||||
|
obkv_buffer: Vec::new(),
|
||||||
|
value_buffer: Vec::new(),
|
||||||
|
values: BTreeMap::new(),
|
||||||
|
count: 0,
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Self { serializer })
|
Ok(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of documents that have been written to the builder.
|
/// Returns the number of documents that have been written to the builder.
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.serializer.count
|
self.count
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This method must be called after the document addition is terminated. It will put the
|
/// This method must be called after the document addition is terminated. It will put the
|
||||||
/// metadata at the end of the file, and write the metadata offset at the beginning on the
|
/// metadata at the end of the file, and write the metadata offset at the beginning on the
|
||||||
/// file.
|
/// file.
|
||||||
pub fn finish(self) -> Result<(), Error> {
|
pub fn finish(self) -> Result<(), Error> {
|
||||||
let DocumentSerializer {
|
let Self {
|
||||||
writer: ByteCounter { mut writer, count: offset },
|
inner: ByteCounter { mut writer, count: offset },
|
||||||
index,
|
index,
|
||||||
count,
|
count,
|
||||||
..
|
..
|
||||||
} = self.serializer;
|
} = self;
|
||||||
|
|
||||||
let meta = DocumentsMetadata { count, index };
|
let meta = DocumentsMetadata { count, index };
|
||||||
|
|
||||||
@ -68,13 +83,106 @@ impl<W: io::Write + io::Seek> DocumentBatchBuilder<W> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds documents to the builder.
|
|
||||||
///
|
/// Extends the builder with json documents from a reader.
|
||||||
/// The internal index is updated with the fields found
|
pub fn extend_from_json<R: io::Read>(&mut self, reader: R) -> Result<(), Error> {
|
||||||
/// in the documents. Document must either be a map or a sequences of map, anything else will
|
let mut de = serde_json::Deserializer::from_reader(reader);
|
||||||
/// fail.
|
|
||||||
pub fn add_documents<T: Serialize>(&mut self, document: T) -> Result<(), Error> {
|
let mut visitor = DocumentVisitor {
|
||||||
document.serialize(&mut self.serializer)?;
|
inner: &mut self.inner,
|
||||||
|
index: &mut self.index,
|
||||||
|
obkv_buffer: &mut self.obkv_buffer,
|
||||||
|
value_buffer: &mut self.value_buffer,
|
||||||
|
values: &mut self.values,
|
||||||
|
count: &mut self.count,
|
||||||
|
};
|
||||||
|
|
||||||
|
de.deserialize_any(&mut visitor).unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use std::io::Cursor;
|
||||||
|
|
||||||
|
use crate::documents::DocumentBatchReader;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn add_single_documents_json() {
|
||||||
|
let mut cursor = Cursor::new(Vec::new());
|
||||||
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
|
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"id": 1,
|
||||||
|
"field": "hello!",
|
||||||
|
});
|
||||||
|
|
||||||
|
builder.extend_from_json(Cursor::new(serde_json::to_vec(&json).unwrap())).unwrap();
|
||||||
|
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"blabla": false,
|
||||||
|
"field": "hello!",
|
||||||
|
"id": 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
builder.extend_from_json(Cursor::new(serde_json::to_vec(&json).unwrap())).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(builder.len(), 2);
|
||||||
|
|
||||||
|
builder.finish().unwrap();
|
||||||
|
|
||||||
|
cursor.set_position(0);
|
||||||
|
|
||||||
|
let mut reader = DocumentBatchReader::from_reader(cursor).unwrap();
|
||||||
|
|
||||||
|
let (index, document) = reader.next_document_with_index().unwrap().unwrap();
|
||||||
|
assert_eq!(index.len(), 3);
|
||||||
|
assert_eq!(document.iter().count(), 2);
|
||||||
|
|
||||||
|
let (index, document) = reader.next_document_with_index().unwrap().unwrap();
|
||||||
|
assert_eq!(index.len(), 3);
|
||||||
|
assert_eq!(document.iter().count(), 3);
|
||||||
|
|
||||||
|
assert!(reader.next_document_with_index().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn add_documents_seq_json() {
|
||||||
|
let mut cursor = Cursor::new(Vec::new());
|
||||||
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
|
|
||||||
|
let json = serde_json::json!([{
|
||||||
|
"id": 1,
|
||||||
|
"field": "hello!",
|
||||||
|
},{
|
||||||
|
"blabla": false,
|
||||||
|
"field": "hello!",
|
||||||
|
"id": 1,
|
||||||
|
}
|
||||||
|
]);
|
||||||
|
|
||||||
|
builder.extend_from_json(Cursor::new(serde_json::to_vec(&json).unwrap())).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(builder.len(), 2);
|
||||||
|
|
||||||
|
builder.finish().unwrap();
|
||||||
|
|
||||||
|
cursor.set_position(0);
|
||||||
|
|
||||||
|
let mut reader = DocumentBatchReader::from_reader(cursor).unwrap();
|
||||||
|
|
||||||
|
let (index, document) = reader.next_document_with_index().unwrap().unwrap();
|
||||||
|
assert_eq!(index.len(), 3);
|
||||||
|
assert_eq!(document.iter().count(), 2);
|
||||||
|
|
||||||
|
let (index, document) = reader.next_document_with_index().unwrap().unwrap();
|
||||||
|
assert_eq!(index.len(), 3);
|
||||||
|
assert_eq!(document.iter().count(), 3);
|
||||||
|
|
||||||
|
assert!(reader.next_document_with_index().unwrap().is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -92,7 +92,8 @@ macro_rules! documents {
|
|||||||
let documents = serde_json::json!($data);
|
let documents = serde_json::json!($data);
|
||||||
let mut writer = std::io::Cursor::new(Vec::new());
|
let mut writer = std::io::Cursor::new(Vec::new());
|
||||||
let mut builder = crate::documents::DocumentBatchBuilder::new(&mut writer).unwrap();
|
let mut builder = crate::documents::DocumentBatchBuilder::new(&mut writer).unwrap();
|
||||||
builder.add_documents(documents).unwrap();
|
let documents = serde_json::to_vec(&documents).unwrap();
|
||||||
|
builder.extend_from_json(std::io::Cursor::new(documents)).unwrap();
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
|
|
||||||
writer.set_position(0);
|
writer.set_position(0);
|
||||||
@ -124,7 +125,8 @@ mod test {
|
|||||||
|
|
||||||
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
|
|
||||||
builder.add_documents(json).unwrap();
|
todo!();
|
||||||
|
//builder.add_documents(json).unwrap();
|
||||||
|
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
|
|
||||||
@ -153,8 +155,9 @@ mod test {
|
|||||||
|
|
||||||
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
|
|
||||||
builder.add_documents(doc1).unwrap();
|
todo!();
|
||||||
builder.add_documents(doc2).unwrap();
|
//builder.add_documents(doc1).unwrap();
|
||||||
|
//builder.add_documents(doc2).unwrap();
|
||||||
|
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
|
|
||||||
@ -182,7 +185,8 @@ mod test {
|
|||||||
|
|
||||||
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
|
|
||||||
builder.add_documents(docs).unwrap();
|
todo!();
|
||||||
|
//builder.add_documents(docs).unwrap();
|
||||||
|
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
|
|
||||||
@ -210,11 +214,13 @@ mod test {
|
|||||||
{ "tata": "hello" },
|
{ "tata": "hello" },
|
||||||
]]);
|
]]);
|
||||||
|
|
||||||
assert!(builder.add_documents(docs).is_err());
|
todo!();
|
||||||
|
//assert!(builder.add_documents(docs).is_err());
|
||||||
|
|
||||||
let docs = json!("hello");
|
let docs = json!("hello");
|
||||||
|
|
||||||
assert!(builder.add_documents(docs).is_err());
|
todo!();
|
||||||
|
//assert!(builder.add_documents(docs).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1,312 +1,42 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::{fmt, io};
|
use std::io::Write;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
use byteorder::{BigEndian, WriteBytesExt};
|
use byteorder::WriteBytesExt;
|
||||||
use obkv::KvWriter;
|
use serde::Deserialize;
|
||||||
use serde::ser::{Impossible, Serialize, SerializeMap, SerializeSeq, Serializer};
|
use serde::de::DeserializeSeed;
|
||||||
|
use serde::de::MapAccess;
|
||||||
|
use serde::de::SeqAccess;
|
||||||
|
use serde::de::Visitor;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::{ByteCounter, DocumentsBatchIndex, Error};
|
use super::{ByteCounter, DocumentsBatchIndex};
|
||||||
use crate::FieldId;
|
use crate::FieldId;
|
||||||
|
|
||||||
pub struct DocumentSerializer<W> {
|
struct FieldIdResolver<'a>(&'a mut DocumentsBatchIndex);
|
||||||
pub writer: ByteCounter<W>,
|
|
||||||
pub buffer: Vec<u8>,
|
|
||||||
pub index: DocumentsBatchIndex,
|
|
||||||
pub count: usize,
|
|
||||||
pub allow_seq: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, W: io::Write> Serializer for &'a mut DocumentSerializer<W> {
|
impl<'a, 'de> DeserializeSeed<'de> for FieldIdResolver<'a> {
|
||||||
type Ok = ();
|
type Value = FieldId;
|
||||||
|
|
||||||
type Error = Error;
|
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||||
|
|
||||||
type SerializeSeq = SeqSerializer<'a, W>;
|
|
||||||
type SerializeTuple = Impossible<(), Self::Error>;
|
|
||||||
type SerializeTupleStruct = Impossible<(), Self::Error>;
|
|
||||||
type SerializeTupleVariant = Impossible<(), Self::Error>;
|
|
||||||
type SerializeMap = MapSerializer<'a, &'a mut ByteCounter<W>>;
|
|
||||||
type SerializeStruct = Impossible<(), Self::Error>;
|
|
||||||
type SerializeStructVariant = Impossible<(), Self::Error>;
|
|
||||||
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
|
|
||||||
self.buffer.clear();
|
|
||||||
let cursor = io::Cursor::new(&mut self.buffer);
|
|
||||||
self.count += 1;
|
|
||||||
let map_serializer = MapSerializer {
|
|
||||||
map: KvWriter::new(cursor),
|
|
||||||
index: &mut self.index,
|
|
||||||
writer: &mut self.writer,
|
|
||||||
mapped_documents: BTreeMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(map_serializer)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
|
|
||||||
if self.allow_seq {
|
|
||||||
// Only allow sequence of documents of depth 1.
|
|
||||||
self.allow_seq = false;
|
|
||||||
Ok(SeqSerializer { serializer: self })
|
|
||||||
} else {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_bool(self, _v: bool) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i8(self, _v: i8) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i16(self, _v: i16) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i32(self, _v: i32) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i64(self, _v: i64) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u8(self, _v: u8) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u16(self, _v: u16) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u32(self, _v: u32) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u64(self, _v: u64) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_f32(self, _v: f32) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_f64(self, _v: f64) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_char(self, _v: char) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_str(self, _v: &str) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
|
|
||||||
where
|
where
|
||||||
T: Serialize,
|
D: serde::Deserializer<'de> {
|
||||||
{
|
deserializer.deserialize_str(self)
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
|
impl<'a, 'de> Visitor<'de> for FieldIdResolver<'a> {
|
||||||
Err(Error::InvalidDocumentFormat)
|
type Value = FieldId;
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
|
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_unit_variant(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_newtype_struct<T: ?Sized>(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_value: &T,
|
|
||||||
) -> Result<Self::Ok, Self::Error>
|
|
||||||
where
|
where
|
||||||
T: Serialize,
|
E: serde::de::Error, {
|
||||||
{
|
let field_id = match self.0.get_by_right(v) {
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_newtype_variant<T: ?Sized>(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
_value: &T,
|
|
||||||
) -> Result<Self::Ok, Self::Error>
|
|
||||||
where
|
|
||||||
T: Serialize,
|
|
||||||
{
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_tuple_struct(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeTupleStruct, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_tuple_variant(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeTupleVariant, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_struct(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeStruct, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_struct_variant(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeStructVariant, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SeqSerializer<'a, W> {
|
|
||||||
serializer: &'a mut DocumentSerializer<W>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, W: io::Write> SerializeSeq for SeqSerializer<'a, W> {
|
|
||||||
type Ok = ();
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn serialize_element<T: ?Sized>(&mut self, value: &T) -> Result<(), Self::Error>
|
|
||||||
where
|
|
||||||
T: Serialize,
|
|
||||||
{
|
|
||||||
value.serialize(&mut *self.serializer)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn end(self) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MapSerializer<'a, W> {
|
|
||||||
map: KvWriter<io::Cursor<&'a mut Vec<u8>>, FieldId>,
|
|
||||||
index: &'a mut DocumentsBatchIndex,
|
|
||||||
writer: W,
|
|
||||||
mapped_documents: BTreeMap<FieldId, Value>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This implementation of SerializeMap uses serilialize_entry instead of seriliaze_key and
|
|
||||||
/// serialize_value, therefore these to methods remain unimplemented.
|
|
||||||
impl<'a, W: io::Write> SerializeMap for MapSerializer<'a, W> {
|
|
||||||
type Ok = ();
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn serialize_key<T: ?Sized + Serialize>(&mut self, _key: &T) -> Result<(), Self::Error> {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_value<T: ?Sized>(&mut self, _value: &T) -> Result<(), Self::Error> {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn end(mut self) -> Result<Self::Ok, Self::Error> {
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
for (key, value) in self.mapped_documents {
|
|
||||||
buf.clear();
|
|
||||||
let mut cursor = Cursor::new(&mut buf);
|
|
||||||
serde_json::to_writer(&mut cursor, &value).map_err(Error::JsonError)?;
|
|
||||||
self.map.insert(key, cursor.into_inner()).map_err(Error::Io)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let data = self.map.into_inner().map_err(Error::Io)?.into_inner();
|
|
||||||
let data_len: u32 = data.len().try_into().map_err(|_| Error::DocumentTooLarge)?;
|
|
||||||
|
|
||||||
self.writer.write_u32::<BigEndian>(data_len).map_err(Error::Io)?;
|
|
||||||
self.writer.write_all(&data).map_err(Error::Io)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_entry<K: ?Sized, V: ?Sized>(
|
|
||||||
&mut self,
|
|
||||||
key: &K,
|
|
||||||
value: &V,
|
|
||||||
) -> Result<(), Self::Error>
|
|
||||||
where
|
|
||||||
K: Serialize,
|
|
||||||
V: Serialize,
|
|
||||||
{
|
|
||||||
let field_serializer = FieldSerializer { index: &mut self.index };
|
|
||||||
let field_id: FieldId = key.serialize(field_serializer)?;
|
|
||||||
|
|
||||||
let value = serde_json::to_value(value).map_err(Error::JsonError)?;
|
|
||||||
|
|
||||||
self.mapped_documents.insert(field_id, value);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct FieldSerializer<'a> {
|
|
||||||
index: &'a mut DocumentsBatchIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> serde::Serializer for FieldSerializer<'a> {
|
|
||||||
type Ok = FieldId;
|
|
||||||
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
type SerializeSeq = Impossible<FieldId, Self::Error>;
|
|
||||||
type SerializeTuple = Impossible<FieldId, Self::Error>;
|
|
||||||
type SerializeTupleStruct = Impossible<FieldId, Self::Error>;
|
|
||||||
type SerializeTupleVariant = Impossible<FieldId, Self::Error>;
|
|
||||||
type SerializeMap = Impossible<FieldId, Self::Error>;
|
|
||||||
type SerializeStruct = Impossible<FieldId, Self::Error>;
|
|
||||||
type SerializeStructVariant = Impossible<FieldId, Self::Error>;
|
|
||||||
|
|
||||||
fn serialize_str(self, ws: &str) -> Result<Self::Ok, Self::Error> {
|
|
||||||
let field_id = match self.index.get_by_right(ws) {
|
|
||||||
Some(field_id) => *field_id,
|
Some(field_id) => *field_id,
|
||||||
None => {
|
None => {
|
||||||
let field_id = self.index.len() as FieldId;
|
let field_id = self.0.len() as FieldId;
|
||||||
self.index.insert(field_id, ws.to_string());
|
self.0.insert(field_id, v.to_string());
|
||||||
field_id
|
field_id
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -314,161 +44,85 @@ impl<'a> serde::Serializer for FieldSerializer<'a> {
|
|||||||
Ok(field_id)
|
Ok(field_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_bool(self, _v: bool) -> Result<Self::Ok, Self::Error> {
|
fn expecting(&self, _formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||||
Err(Error::InvalidDocumentFormat)
|
todo!()
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i8(self, _v: i8) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i16(self, _v: i16) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i32(self, _v: i32) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_i64(self, _v: i64) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u8(self, _v: u8) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u16(self, _v: u16) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u32(self, _v: u32) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_u64(self, _v: u64) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_f32(self, _v: f32) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_f64(self, _v: f64) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_char(self, _v: char) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
|
|
||||||
where
|
|
||||||
T: Serialize,
|
|
||||||
{
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_unit_variant(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_newtype_struct<T: ?Sized>(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_value: &T,
|
|
||||||
) -> Result<Self::Ok, Self::Error>
|
|
||||||
where
|
|
||||||
T: Serialize,
|
|
||||||
{
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_newtype_variant<T: ?Sized>(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
_value: &T,
|
|
||||||
) -> Result<Self::Ok, Self::Error>
|
|
||||||
where
|
|
||||||
T: Serialize,
|
|
||||||
{
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_tuple_struct(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeTupleStruct, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_tuple_variant(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeTupleVariant, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_struct(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeStruct, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_struct_variant(
|
|
||||||
self,
|
|
||||||
_name: &'static str,
|
|
||||||
_variant_index: u32,
|
|
||||||
_variant: &'static str,
|
|
||||||
_len: usize,
|
|
||||||
) -> Result<Self::SerializeStructVariant, Self::Error> {
|
|
||||||
Err(Error::InvalidDocumentFormat)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl serde::ser::Error for Error {
|
struct ValueDeserializer;
|
||||||
fn custom<T: fmt::Display>(msg: T) -> Self {
|
|
||||||
Error::Custom(msg.to_string())
|
impl<'de> DeserializeSeed<'de> for ValueDeserializer {
|
||||||
|
type Value = serde_json::Value;
|
||||||
|
|
||||||
|
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::Deserializer<'de> {
|
||||||
|
serde_json::Value::deserialize(deserializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DocumentVisitor<'a, W> {
|
||||||
|
pub inner: &'a mut ByteCounter<W>,
|
||||||
|
pub index: &'a mut DocumentsBatchIndex,
|
||||||
|
pub obkv_buffer: &'a mut Vec<u8>,
|
||||||
|
pub value_buffer: &'a mut Vec<u8>,
|
||||||
|
pub values: &'a mut BTreeMap<FieldId, Value>,
|
||||||
|
pub count: &'a mut usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> {
|
||||||
|
/// This Visitor value is nothing, since it write the value to a file.
|
||||||
|
type Value = ();
|
||||||
|
|
||||||
|
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
|
||||||
|
where
|
||||||
|
A: SeqAccess<'de>,
|
||||||
|
{
|
||||||
|
while let Some(_) = seq.next_element_seed(&mut *self)? { }
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
|
||||||
|
where
|
||||||
|
A: MapAccess<'de>,
|
||||||
|
{
|
||||||
|
while let Some((key, value)) = map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer).unwrap() {
|
||||||
|
self.values.insert(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.obkv_buffer.clear();
|
||||||
|
let mut obkv = obkv::KvWriter::new(Cursor::new(&mut *self.obkv_buffer));
|
||||||
|
for (key, value) in self.values.iter() {
|
||||||
|
self.value_buffer.clear();
|
||||||
|
// This is guaranteed to work
|
||||||
|
serde_json::to_writer(Cursor::new(&mut *self.value_buffer), value).unwrap();
|
||||||
|
obkv.insert(*key, &self.value_buffer).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let reader = obkv.into_inner().unwrap().into_inner();
|
||||||
|
|
||||||
|
self.inner.write_u32::<byteorder::BigEndian>(reader.len() as u32).unwrap();
|
||||||
|
self.inner.write_all(reader).unwrap();
|
||||||
|
|
||||||
|
*self.count += 1;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "a documents, or a sequence of documents.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W>
|
||||||
|
where W: Write,
|
||||||
|
{
|
||||||
|
type Value = ();
|
||||||
|
|
||||||
|
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::Deserializer<'de> {
|
||||||
|
deserializer.deserialize_map(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,8 @@ mod test {
|
|||||||
"txts": sample_txts[..(rng.gen_range(0..3))],
|
"txts": sample_txts[..(rng.gen_range(0..3))],
|
||||||
"cat-ints": sample_ints[..(rng.gen_range(0..3))],
|
"cat-ints": sample_ints[..(rng.gen_range(0..3))],
|
||||||
});
|
});
|
||||||
builder.add_documents(doc).unwrap();
|
todo!()
|
||||||
|
//builder.add_documents(doc).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
|
@ -877,7 +877,8 @@ mod tests {
|
|||||||
let mut cursor = Cursor::new(Vec::new());
|
let mut cursor = Cursor::new(Vec::new());
|
||||||
|
|
||||||
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
builder.add_documents(big_object).unwrap();
|
todo!();
|
||||||
|
//builder.add_documents(big_object).unwrap();
|
||||||
builder.finish().unwrap();
|
builder.finish().unwrap();
|
||||||
cursor.set_position(0);
|
cursor.set_position(0);
|
||||||
let content = DocumentBatchReader::from_reader(cursor).unwrap();
|
let content = DocumentBatchReader::from_reader(cursor).unwrap();
|
||||||
|
@ -61,9 +61,10 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
|
|||||||
let mut cursor = Cursor::new(Vec::new());
|
let mut cursor = Cursor::new(Vec::new());
|
||||||
let mut documents_builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
let mut documents_builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
|
||||||
let reader = Cursor::new(CONTENT.as_bytes());
|
let reader = Cursor::new(CONTENT.as_bytes());
|
||||||
for doc in serde_json::Deserializer::from_reader(reader).into_iter::<serde_json::Value>() {
|
todo!();
|
||||||
documents_builder.add_documents(doc.unwrap()).unwrap();
|
//for doc in serde_json::Deserializer::from_reader(reader).into_iter::<serde_json::Value>() {
|
||||||
}
|
//documents_builder.add_documents(doc.unwrap()).unwrap();
|
||||||
|
//}
|
||||||
documents_builder.finish().unwrap();
|
documents_builder.finish().unwrap();
|
||||||
|
|
||||||
cursor.set_position(0);
|
cursor.set_position(0);
|
||||||
|
@ -409,7 +409,8 @@ fn criteria_ascdesc() {
|
|||||||
"age": age,
|
"age": age,
|
||||||
});
|
});
|
||||||
|
|
||||||
batch_builder.add_documents(json).unwrap();
|
todo!();
|
||||||
|
//batch_builder.add_documents(json).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
batch_builder.finish().unwrap();
|
batch_builder.finish().unwrap();
|
||||||
|
Loading…
Reference in New Issue
Block a user