feat: Improve the deserialization time of a Blob

This commit is contained in:
Clément Renault 2018-12-30 13:22:02 +01:00
parent a18401f47e
commit 6bd779f9ae
No known key found for this signature in database
GPG key ID: 0151CDAB43460DAE
11 changed files with 198 additions and 227 deletions

View file

@ -6,11 +6,11 @@ pub use self::positive::{PositiveBlob, PositiveBlobBuilder};
pub use self::negative::NegativeBlob;
pub use self::ops::OpBuilder;
use std::fmt;
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::sync::Arc;
use serde_derive::{Serialize, Deserialize};
use serde::ser::{Serialize, Serializer, SerializeTuple};
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
use byteorder::{ReadBytesExt, WriteBytesExt};
#[derive(Debug)]
pub enum Blob {
@ -33,68 +33,41 @@ impl Blob {
Blob::Negative(_) => Sign::Negative,
}
}
}
impl Serialize for Blob {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> Result<Blob, Box<Error>> {
let mut cursor = Cursor::new(&bytes.as_slice()[..len]);
cursor.consume(offset);
let byte = cursor.read_u8()?;
let blob = match Sign::from_byte(byte)? {
Sign::Positive => {
let offset = cursor.position() as usize;
let len = len - offset;
let blob = PositiveBlob::from_shared_bytes(bytes, offset, len)?;
Blob::Positive(blob)
},
Sign::Negative => {
let offset = cursor.position() as usize;
let len = len - offset;
let blob = NegativeBlob::from_shared_bytes(bytes, offset, len)?;
Blob::Negative(blob)
},
};
Ok(blob)
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let sign = self.sign();
sign.write_to_bytes(bytes);
match self {
Blob::Positive(blob) => {
let mut tuple = serializer.serialize_tuple(2)?;
tuple.serialize_element(&Sign::Positive)?;
tuple.serialize_element(&blob)?;
tuple.end()
},
Blob::Negative(blob) => {
let mut tuple = serializer.serialize_tuple(2)?;
tuple.serialize_element(&Sign::Negative)?;
tuple.serialize_element(&blob)?;
tuple.end()
},
Blob::Positive(b) => b.write_to_bytes(bytes),
Blob::Negative(b) => b.write_to_bytes(bytes),
}
}
}
impl<'de> Deserialize<'de> for Blob {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Blob, D::Error> {
struct TupleVisitor;
impl<'de> Visitor<'de> for TupleVisitor {
type Value = Blob;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a Blob struct")
}
#[inline]
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let sign = match seq.next_element()? {
Some(value) => value,
None => return Err(de::Error::invalid_length(0, &self)),
};
match sign {
Sign::Positive => {
let blob = match seq.next_element()? {
Some(value) => value,
None => return Err(de::Error::invalid_length(1, &self)),
};
Ok(Blob::Positive(blob))
},
Sign::Negative => {
let blob = match seq.next_element()? {
Some(value) => value,
None => return Err(de::Error::invalid_length(1, &self)),
};
Ok(Blob::Negative(blob))
},
}
}
}
deserializer.deserialize_tuple(2, TupleVisitor)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Sign {
Positive,
Negative,
@ -107,4 +80,19 @@ impl Sign {
Sign::Negative => Sign::Positive,
}
}
pub fn from_byte(byte: u8) -> Result<Sign, Box<Error>> {
match byte {
0 => Ok(Sign::Positive),
1 => Ok(Sign::Negative),
b => Err(format!("Invalid sign byte {:?}", b).into()),
}
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
match self {
Sign::Positive => bytes.write_u8(0).unwrap(),
Sign::Negative => bytes.write_u8(1).unwrap(),
}
}
}

View file

@ -1,10 +1,11 @@
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::path::Path;
use std::sync::Arc;
use std::fmt;
use sdset::Set;
use serde::de::{self, Deserialize, Deserializer};
use serde::ser::{Serialize, Serializer};
use byteorder::{LittleEndian, ReadBytesExt};
use crate::data::DocIds;
use crate::DocumentId;
@ -14,18 +15,26 @@ pub struct NegativeBlob {
}
impl NegativeBlob {
pub unsafe fn from_path<P>(doc_ids: P) -> Result<Self, Box<Error>>
where P: AsRef<Path>,
{
let doc_ids = DocIds::from_path(doc_ids)?;
Ok(NegativeBlob { doc_ids })
}
pub fn from_bytes(doc_ids: Vec<u8>) -> Result<Self, Box<Error>> {
let doc_ids = DocIds::from_bytes(doc_ids)?;
Ok(NegativeBlob { doc_ids })
}
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> Result<Self, Box<Error>> {
let mut cursor = Cursor::new(&bytes.as_slice()[..len]);
cursor.consume(offset);
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let doc_ids = DocIds::from_shared_bytes(bytes, offset, len)?;
Ok(NegativeBlob::from_raw(doc_ids))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
self.doc_ids.write_to_bytes(bytes)
}
pub fn from_raw(doc_ids: DocIds) -> Self {
NegativeBlob { doc_ids }
}
@ -52,16 +61,3 @@ impl fmt::Debug for NegativeBlob {
write!(f, ")")
}
}
impl Serialize for NegativeBlob {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.doc_ids.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for NegativeBlob {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<NegativeBlob, D::Error> {
let bytes = Vec::deserialize(deserializer)?;
NegativeBlob::from_bytes(bytes).map_err(de::Error::custom)
}
}

View file

@ -60,7 +60,7 @@ impl<'a> $name<'a> {
pub fn into_negative_blob(self) -> NegativeBlob {
let document_ids = sdset::SetOperation::into_set_buf(self.op);
let doc_ids = DocIds::from_document_ids(document_ids.into_vec());
let doc_ids = DocIds::from_raw(document_ids.into_vec());
NegativeBlob::from_raw(doc_ids)
}
}

View file

@ -1,15 +1,16 @@
use std::fmt;
use std::io::Write;
use std::path::Path;
use std::io::{Write, Cursor, BufRead};
use std::convert::From;
use std::error::Error;
use std::sync::Arc;
use std::fmt;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{map, Map, Streamer, IntoStreamer};
use fst::raw::Fst;
use sdset::Set;
use crate::DocIndex;
use crate::data::{DocIndexes, DocIndexesBuilder};
use serde::ser::{Serialize, Serializer, SerializeTuple};
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
#[derive(Default)]
pub struct PositiveBlob {
@ -18,15 +19,6 @@ pub struct PositiveBlob {
}
impl PositiveBlob {
pub unsafe fn from_paths<P, Q>(map: P, indexes: Q) -> Result<Self, Box<Error>>
where P: AsRef<Path>,
Q: AsRef<Path>,
{
let map = Map::from_path(map)?;
let indexes = DocIndexes::from_path(indexes)?;
Ok(PositiveBlob { map, indexes })
}
pub fn from_bytes(map: Vec<u8>, indexes: Vec<u8>) -> Result<Self, Box<Error>> {
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
@ -37,6 +29,33 @@ impl PositiveBlob {
PositiveBlob { map, indexes }
}
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> Result<Self, Box<Error>> {
let mut cursor = Cursor::new(&bytes.as_slice()[..len]);
cursor.consume(offset);
let map_len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let map = Map::from(Fst::from_shared_bytes(bytes.clone(), offset, map_len)?);
cursor.consume(map_len);
let doc_len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let doc_indexes = DocIndexes::from_shared_bytes(bytes, offset, doc_len)?;
Ok(PositiveBlob::from_raw(map, doc_indexes))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let map_bytes = self.map.as_fst().as_bytes();
bytes.write_u64::<LittleEndian>(map_bytes.len() as u64).unwrap();
bytes.extend_from_slice(&map_bytes);
let doc_indexes_vec = self.indexes.to_vec(); // FIXME patch to have a as_slice() function
bytes.write_u64::<LittleEndian>(doc_indexes_vec.len() as u64).unwrap();
bytes.extend_from_slice(&doc_indexes_vec);
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<&[DocIndex]> {
self.map.get(key).map(|index| &self.indexes[index as usize])
}
@ -103,52 +122,6 @@ impl<'m, 'a> Streamer<'a> for PositiveBlobStream<'m> {
}
}
impl Serialize for PositiveBlob {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut tuple = serializer.serialize_tuple(2)?;
tuple.serialize_element(&self.map.as_fst().to_vec())?;
tuple.serialize_element(&self.indexes.to_vec())?;
tuple.end()
}
}
impl<'de> Deserialize<'de> for PositiveBlob {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<PositiveBlob, D::Error> {
struct TupleVisitor;
impl<'de> Visitor<'de> for TupleVisitor {
type Value = PositiveBlob;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a PositiveBlob struct")
}
#[inline]
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let map = match seq.next_element()? {
Some(bytes) => match Map::from_bytes(bytes) {
Ok(value) => value,
Err(err) => return Err(de::Error::custom(err)),
},
None => return Err(de::Error::invalid_length(0, &self)),
};
let indexes = match seq.next_element()? {
Some(bytes) => match DocIndexes::from_bytes(bytes) {
Ok(value) => value,
Err(err) => return Err(de::Error::custom(err)),
},
None => return Err(de::Error::invalid_length(1, &self)),
};
Ok(PositiveBlob { map, indexes })
}
}
deserializer.deserialize_tuple(2, TupleVisitor)
}
}
pub struct PositiveBlobBuilder<W, X> {
map: fst::MapBuilder<W>,
indexes: DocIndexesBuilder<X>,
@ -207,6 +180,29 @@ mod tests {
use crate::DocumentId;
#[test]
fn create_query() -> Result<(), Box<Error>> {
let a = DocIndex { document_id: DocumentId(0), attribute: Attribute::new(3, 11), word_area: WordArea::new(30, 4) };
let b = DocIndex { document_id: DocumentId(1), attribute: Attribute::new(4, 21), word_area: WordArea::new(35, 6) };
let c = DocIndex { document_id: DocumentId(2), attribute: Attribute::new(8, 2), word_area: WordArea::new(89, 6) };
let mut builder = PositiveBlobBuilder::memory();
builder.insert("aaa", Set::new(&[a])?)?;
builder.insert("aab", Set::new(&[a, b, c])?)?;
builder.insert("aac", Set::new(&[a, c])?)?;
let (map_bytes, indexes_bytes) = builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));
assert_eq!(positive_blob.get("aad"), None);
Ok(())
}
#[test]
fn serialize_deserialize() -> Result<(), Box<Error>> {
let a = DocIndex {
@ -269,9 +265,6 @@ mod tests {
let (map_bytes, indexes_bytes) = builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
let bytes = bincode::serialize(&positive_blob)?;
let positive_blob: PositiveBlob = bincode::deserialize(&bytes)?;
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));

View file

@ -7,9 +7,9 @@ use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, DBVector, MergeOperands};
use crossbeam::atomic::ArcCell;
use crate::database::blob::{self, Blob, PositiveBlob};
use crate::database::{DatabaseView, Update, Schema};
use crate::database::{DATA_INDEX, DATA_SCHEMA};
use crate::database::blob::{self, Blob};
pub struct Database {
// DB is under a Mutex to sync update ingestions and separate DB update locking
@ -136,18 +136,31 @@ fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut Merge
};
let mut op = blob::OpBuilder::with_capacity(capacity);
if let Some(existing_value) = existing_value {
let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index");
if let Some(bytes) = existing_value {
let bytes_len = bytes.len();
let bytes = Arc::new(bytes.to_vec());
let blob = match PositiveBlob::from_shared_bytes(bytes, 0, bytes_len) {
Ok(blob) => blob,
Err(e) => panic!("BUG: could not deserialize data-index due to {}", e),
};
op.push(Blob::Positive(blob));
}
for bytes in operands {
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blob");
let bytes_len = bytes.len();
let bytes = Arc::new(bytes.to_vec());
let blob = match Blob::from_shared_bytes(bytes, 0, bytes_len) {
Ok(blob) => blob,
Err(e) => panic!("BUG: could not deserialize blob due to {}", e),
};
op.push(blob);
}
let blob = op.merge().expect("BUG: could not merge blobs");
bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
let mut bytes = Vec::new();
blob.write_to_bytes(&mut bytes);
bytes
}
#[cfg(test)]
@ -158,9 +171,9 @@ mod tests {
use serde_derive::{Serialize, Deserialize};
use tempfile::tempdir;
use crate::tokenizer::DefaultBuilder;
use crate::database::update::PositiveUpdateBuilder;
use crate::database::schema::{SchemaBuilder, STORED, INDEXED};
use crate::database::update::PositiveUpdateBuilder;
use crate::tokenizer::DefaultBuilder;
#[test]
fn ingest_update_file() -> Result<(), Box<Error>> {

View file

@ -2,6 +2,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::error::Error;
use std::ops::Deref;
use std::sync::Arc;
use rocksdb::rocksdb::{DB, Snapshot};
@ -55,7 +56,11 @@ fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<PositiveBlob, Box<Er
where D: Deref<Target=DB>
{
match snapshot.get(DATA_INDEX)? {
Some(vector) => Ok(bincode::deserialize(&*vector)?),
Some(vector) => {
let bytes_len = vector.as_ref().len();
let bytes = Arc::new(vector.as_ref().to_vec());
Ok(PositiveBlob::from_shared_bytes(bytes, 0, bytes_len)?)
},
None => Ok(PositiveBlob::default()),
}
}

View file

@ -38,7 +38,8 @@ impl NegativeUpdateBuilder {
let blob = Blob::Negative(negative_blob);
// write the data-index aka negative blob
let bytes = bincode::serialize(&blob)?;
let mut bytes = Vec::new();
blob.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// FIXME remove this ugly thing !

View file

@ -485,7 +485,8 @@ impl<B> PositiveUpdateBuilder<B> {
let blob = Blob::Positive(positive_blob);
// write the data-index aka positive blob
let bytes = bincode::serialize(&blob)?;
let mut bytes = Vec::new();
blob.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents fields updates