feat: Implemented a basic deserialiazation

This commit is contained in:
Clément Renault 2018-12-03 22:26:24 +01:00
parent 2a35d72fe2
commit b2cec98805
No known key found for this signature in database
GPG key ID: 0151CDAB43460DAE
9 changed files with 655 additions and 60 deletions

View file

@ -1,13 +1,15 @@
use std::error::Error;
use std::marker;
use std::{fmt, marker};
use rocksdb::rocksdb::{DB, Snapshot};
use rocksdb::rocksdb::{DB, DBVector, Snapshot, SeekKey};
use rocksdb::rocksdb_options::ReadOptions;
use serde::de::DeserializeOwned;
use crate::index::schema::Schema;
use crate::blob::positive::PositiveBlob;
use crate::database::deserializer::{Deserializer, DeserializerError};
use crate::database::{DATA_INDEX, DATA_SCHEMA};
use crate::blob::positive::PositiveBlob;
use crate::index::schema::Schema;
use crate::database::{DocumentKey, DocumentKeyAttr};
use crate::DocumentId;
// FIXME Do not panic!
@ -40,6 +42,10 @@ impl<'a> DatabaseView<'a> {
self.snapshot
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> {
Ok(self.snapshot.get(key)?)
}
// TODO create an enum error type
pub fn retrieve_document<D>(&self, id: DocumentId) -> Result<D, Box<Error>>
where D: DeserializeOwned
@ -60,6 +66,36 @@ impl<'a> DatabaseView<'a> {
}
}
impl<'a> fmt::Debug for DatabaseView<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut options = ReadOptions::new();
let lower = DocumentKey::new(0);
options.set_iterate_lower_bound(lower.as_ref());
let mut iter = self.snapshot.iter_opt(options);
iter.seek(SeekKey::Start);
let iter = iter.map(|(key, _)| DocumentKeyAttr::from_bytes(&key));
if f.alternate() {
writeln!(f, "DatabaseView(")?;
} else {
write!(f, "DatabaseView(")?;
}
self.schema.fmt(f)?;
if f.alternate() {
writeln!(f, ",")?;
} else {
write!(f, ", ")?;
}
f.debug_list().entries(iter).finish()?;
write!(f, ")")
}
}
// TODO this is just an iter::Map !!!
pub struct DocumentIter<'a, D, I> {
database_view: &'a DatabaseView<'a>,

View file

@ -1,11 +1,11 @@
use std::error::Error;
use std::fmt;
use rocksdb::rocksdb::{DB, Snapshot};
use rocksdb::rocksdb::{DB, Snapshot, SeekKey};
use rocksdb::rocksdb_options::ReadOptions;
use serde::de::value::MapDeserializer;
use serde::forward_to_deserialize_any;
use serde::de::Visitor;
use serde::de::value::MapDeserializer;
use serde::de::{self, Visitor, IntoDeserializer};
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
use crate::index::schema::Schema;
@ -23,7 +23,7 @@ impl<'a> Deserializer<'a> {
}
}
impl<'de, 'a, 'b> serde::de::Deserializer<'de> for &'b mut Deserializer<'a> {
impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> {
type Error = DeserializerError;
fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
@ -35,8 +35,7 @@ impl<'de, 'a, 'b> serde::de::Deserializer<'de> for &'b mut Deserializer<'a> {
forward_to_deserialize_any! {
bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit seq
bytes byte_buf unit_struct tuple_struct
identifier tuple ignored_any option newtype_struct enum
struct
identifier tuple ignored_any option newtype_struct enum struct
}
fn deserialize_map<V>(self, visitor: V) -> Result<V::Value, Self::Error>
@ -48,14 +47,20 @@ impl<'de, 'a, 'b> serde::de::Deserializer<'de> for &'b mut Deserializer<'a> {
options.set_iterate_lower_bound(lower.as_ref());
options.set_iterate_upper_bound(upper.as_ref());
let mut db_iter = self.snapshot.iter_opt(options);
let iter = db_iter.map(|(key, value)| {
let mut iter = self.snapshot.iter_opt(options);
iter.seek(SeekKey::Start);
if iter.kv().is_none() {
// FIXME return an error
}
let iter = iter.map(|(key, value)| {
// retrieve the schema attribute name
// from the schema attribute number
let document_key_attr = DocumentKeyAttr::from_bytes(&key);
let schema_attr = document_key_attr.attribute();
let attribute_name = self.schema.attribute_name(schema_attr);
(attribute_name, value)
(attribute_name, Value(value))
});
let map_deserializer = MapDeserializer::new(iter);
@ -63,12 +68,101 @@ impl<'de, 'a, 'b> serde::de::Deserializer<'de> for &'b mut Deserializer<'a> {
}
}
struct Value(Vec<u8>);
impl<'de> IntoDeserializer<'de, DeserializerError> for Value {
type Deserializer = Self;
fn into_deserializer(self) -> Self::Deserializer {
self
}
}
macro_rules! forward_to_bincode_values {
($($ty:ident => $de_method:ident,)*) => {
$(
fn $de_method<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: de::Visitor<'de>
{
match bincode::deserialize::<$ty>(&self.0) {
Ok(val) => val.into_deserializer().$de_method(visitor),
Err(e) => Err(de::Error::custom(e)),
}
}
)*
}
}
impl<'de, 'a> de::Deserializer<'de> for Value {
type Error = DeserializerError;
fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
{
self.0.into_deserializer().deserialize_any(visitor)
}
fn deserialize_str<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
{
self.deserialize_string(visitor)
}
fn deserialize_string<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
{
match bincode::deserialize::<String>(&self.0) {
Ok(val) => val.into_deserializer().deserialize_string(visitor),
Err(e) => Err(de::Error::custom(e)),
}
}
fn deserialize_bytes<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
{
self.deserialize_byte_buf(visitor)
}
fn deserialize_byte_buf<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
{
match bincode::deserialize::<Vec<u8>>(&self.0) {
Ok(val) => val.into_deserializer().deserialize_byte_buf(visitor),
Err(e) => Err(de::Error::custom(e)),
}
}
forward_to_bincode_values! {
char => deserialize_char,
bool => deserialize_bool,
u8 => deserialize_u8,
u16 => deserialize_u16,
u32 => deserialize_u32,
u64 => deserialize_u64,
i8 => deserialize_i8,
i16 => deserialize_i16,
i32 => deserialize_i32,
i64 => deserialize_i64,
f32 => deserialize_f32,
f64 => deserialize_f64,
}
forward_to_deserialize_any! {
unit seq map
unit_struct tuple_struct
identifier tuple ignored_any option newtype_struct enum struct
}
}
#[derive(Debug)]
pub enum DeserializerError {
Custom(String),
}
impl serde::de::Error for DeserializerError {
impl de::Error for DeserializerError {
fn custom<T: fmt::Display>(msg: T) -> Self {
DeserializerError::Custom(msg.to_string())
}

View file

@ -1,5 +1,6 @@
use std::io::{Cursor, Read, Write};
use std::mem::size_of;
use std::fmt;
use byteorder::{NativeEndian, WriteBytesExt, ReadBytesExt};
@ -48,6 +49,14 @@ impl AsRef<[u8]> for DocumentKey {
}
}
impl fmt::Debug for DocumentKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DocumentKey")
.field("document_id", &self.document_id())
.finish()
}
}
#[derive(Copy, Clone)]
pub struct DocumentKeyAttr([u8; DOC_KEY_ATTR_LEN]);
@ -94,3 +103,12 @@ impl AsRef<[u8]> for DocumentKeyAttr {
&self.0
}
}
impl fmt::Debug for DocumentKeyAttr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DocumentKeyAttr")
.field("document_id", &self.document_id())
.field("attribute", &self.attribute().as_u32())
.finish()
}
}

View file

@ -1,11 +1,13 @@
use std::error::Error;
use std::path::Path;
use std::fmt;
use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions};
use rocksdb::{DB, MergeOperands};
use rocksdb::{DB, DBVector, MergeOperands, SeekKey};
use rocksdb::rocksdb::Writable;
pub use crate::database::database_view::DatabaseView;
pub use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
use crate::index::update::Update;
use crate::index::schema::Schema;
use crate::blob::{self, Blob};
@ -30,6 +32,7 @@ impl Database {
let path = path.to_string_lossy();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
// opts.error_if_exists(true); // FIXME pull request that
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
@ -80,14 +83,40 @@ impl Database {
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> {
Ok(self.0.get(key)?)
}
pub fn flush(&self) -> Result<(), Box<Error>> {
Ok(self.0.flush(true)?)
}
pub fn view(&self) -> Result<DatabaseView, Box<Error>> {
let snapshot = self.0.snapshot();
DatabaseView::new(snapshot)
}
}
impl fmt::Debug for Database {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Database([")?;
let mut iter = self.0.iter();
iter.seek(SeekKey::Start);
let mut first = true;
for (key, value) in &mut iter {
if !first { write!(f, ", ")?; }
first = false;
let key = String::from_utf8_lossy(&key);
write!(f, "{:?}", key)?;
}
write!(f, "])")
}
}
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
if key != DATA_INDEX { panic!("The merge operator only supports \"data-index\" merging") }
if key != DATA_INDEX {
panic!("The merge operator only supports \"data-index\" merging")
}
let capacity = {
let remaining = operands.size_hint().0;
@ -109,3 +138,90 @@ fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut Merge
let blob = op.merge().expect("BUG: could not merge blobs");
bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
}
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error;
use std::path::PathBuf;
use serde_derive::{Serialize, Deserialize};
use tempfile::tempdir;
use crate::tokenizer::DefaultBuilder;
use crate::index::update::PositiveUpdateBuilder;
use crate::index::schema::{Schema, SchemaBuilder, STORED, INDEXED};
#[test]
fn ingest_update_file() -> Result<(), Box<Error>> {
let dir = tempdir()?;
let rocksdb_path = dir.path().join("rocksdb.rdb");
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct SimpleDoc {
title: String,
description: String,
}
let title;
let description;
let schema = {
let mut builder = SchemaBuilder::new();
title = builder.new_attribute("title", STORED | INDEXED);
description = builder.new_attribute("description", STORED | INDEXED);
builder.build()
};
let database = Database::create(&rocksdb_path, schema.clone())?;
let tokenizer_builder = DefaultBuilder::new();
let update_path = dir.path().join("update.sst");
let doc0 = SimpleDoc {
title: String::from("I am a title"),
description: String::from("I am a description"),
};
let doc1 = SimpleDoc {
title: String::from("I am the second title"),
description: String::from("I am the second description"),
};
let mut update = {
let mut builder = PositiveUpdateBuilder::new(update_path, schema, tokenizer_builder);
// builder.update_field(0, title, doc0.title.clone());
// builder.update_field(0, description, doc0.description.clone());
// builder.update_field(1, title, doc1.title.clone());
// builder.update_field(1, description, doc1.description.clone());
builder.update(0, &doc0).unwrap();
builder.update(1, &doc1).unwrap();
builder.build()?
};
update.set_move(true);
database.ingest_update_file(update)?;
let view = database.view()?;
println!("{:?}", view);
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
struct DeSimpleDoc {
title: char,
}
let de_doc0: DeSimpleDoc = view.retrieve_document(0)?;
let de_doc1: DeSimpleDoc = view.retrieve_document(1)?;
println!("{:?}", de_doc0);
println!("{:?}", de_doc1);
// assert_eq!(doc0, de_doc0);
// assert_eq!(doc1, de_doc1);
Ok(dir.close()?)
}
}