feat: Implement WriteToBytes/FromSharedDataCursor

This commit is contained in:
Clément Renault 2019-02-17 16:33:42 +01:00
parent 8014857ebf
commit a8df438814
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
8 changed files with 190 additions and 136 deletions

View File

@ -1,12 +1,15 @@
use std::io::{self, Cursor, BufRead};
use std::slice::from_raw_parts;
use std::mem::size_of;
use std::error::Error;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use sdset::Set;
use crate::DocumentId;
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
use crate::write_to_bytes::WriteToBytes;
use crate::data::SharedData;
use crate::DocumentId;
use super::into_u8_slice;
#[derive(Default, Clone)]
@ -19,21 +22,6 @@ impl DocIds {
DocIds(data)
}
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> io::Result<DocIds> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let doc_ids = cursor.get_ref().range(offset, len);
cursor.consume(len);
Ok(DocIds(doc_ids))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let len = self.0.len() as u64;
bytes.write_u64::<LittleEndian>(len).unwrap();
bytes.extend_from_slice(&self.0);
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
@ -52,3 +40,22 @@ impl AsRef<Set<DocumentId>> for DocIds {
Set::new_unchecked(slice)
}
}
impl FromSharedDataCursor for DocIds {
type Error = Box<Error>;
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<DocIds, Self::Error> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let data = cursor.extract(len);
Ok(DocIds(data))
}
}
impl WriteToBytes for DocIds {
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let len = self.0.len() as u64;
bytes.write_u64::<LittleEndian>(len).unwrap();
bytes.extend_from_slice(&self.0);
}
}

View File

@ -1,14 +1,16 @@
use std::io::{self, Write, Cursor, BufRead};
use std::io::{self, Write};
use std::slice::from_raw_parts;
use std::mem::size_of;
use std::ops::Index;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use sdset::Set;
use crate::DocIndex;
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
use crate::write_to_bytes::WriteToBytes;
use crate::data::SharedData;
use crate::DocIndex;
use super::into_u8_slice;
#[derive(Debug)]
@ -25,38 +27,6 @@ pub struct DocIndexes {
}
impl DocIndexes {
pub fn from_bytes(bytes: Vec<u8>) -> io::Result<DocIndexes> {
let len = bytes.len();
let bytes = Arc::from(bytes);
let data = SharedData::new(bytes, 0, len);
let mut cursor = Cursor::new(data);
DocIndexes::from_cursor(&mut cursor)
}
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> io::Result<DocIndexes> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let ranges = cursor.get_ref().range(offset, len);
cursor.consume(len);
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let indexes = cursor.get_ref().range(offset, len);
cursor.consume(len);
Ok(DocIndexes { ranges, indexes })
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let ranges_len = self.ranges.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(ranges_len);
bytes.extend_from_slice(&self.ranges);
let indexes_len = self.indexes.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(indexes_len);
bytes.extend_from_slice(&self.indexes);
}
pub fn get(&self, index: usize) -> Option<&Set<DocIndex>> {
self.ranges().get(index).map(|Range { start, end }| {
let start = *start as usize;
@ -92,6 +62,32 @@ impl Index<usize> for DocIndexes {
}
}
impl FromSharedDataCursor for DocIndexes {
type Error = io::Error;
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<DocIndexes, Self::Error> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let ranges = cursor.extract(len);
let len = cursor.read_u64::<LittleEndian>()? as usize;
let indexes = cursor.extract(len);
Ok(DocIndexes { ranges, indexes })
}
}
impl WriteToBytes for DocIndexes {
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let ranges_len = self.ranges.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(ranges_len);
bytes.extend_from_slice(&self.ranges);
let indexes_len = self.indexes.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(indexes_len);
bytes.extend_from_slice(&self.indexes);
}
}
pub struct DocIndexesBuilder<W> {
ranges: Vec<Range>,
indexes: Vec<DocIndex>,

View File

@ -1,6 +1,4 @@
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{map, Map, IntoStreamer, Streamer};
@ -8,7 +6,9 @@ use fst::raw::Fst;
use sdset::duo::{Union, DifferenceByKey};
use sdset::{Set, SetOperation};
use crate::data::{SharedData, DocIndexes, DocIndexesBuilder};
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
use crate::write_to_bytes::WriteToBytes;
use crate::data::{DocIndexes, DocIndexesBuilder};
use crate::{DocumentId, DocIndex};
#[derive(Default)]
@ -18,46 +18,6 @@ pub struct Index {
}
impl Index {
pub fn from_bytes(bytes: Vec<u8>) -> Result<Index, Box<Error>> {
let len = bytes.len();
Index::from_shared_bytes(Arc::from(bytes), 0, len)
}
pub fn from_shared_bytes(
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
) -> Result<Index, Box<Error>>
{
let data = SharedData::new(bytes, offset, len);
let mut cursor = Cursor::new(data);
Index::from_cursor(&mut cursor)
}
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Index, Box<Error>> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let data = cursor.get_ref().range(offset, len);
let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?;
let map = Map::from(fst);
cursor.consume(len);
let indexes = DocIndexes::from_cursor(cursor)?;
Ok(Index { map, indexes})
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let slice = self.map.as_fst().as_bytes();
let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice);
self.indexes.write_to_bytes(bytes);
}
pub fn remove_documents(&self, documents: &Set<DocumentId>) -> Index {
let mut buffer = Vec::new();
let mut builder = IndexBuilder::new();
@ -116,6 +76,33 @@ impl Index {
}
}
impl FromSharedDataCursor for Index {
type Error = Box<Error>;
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Index, Self::Error> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let data = cursor.extract(len);
let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?;
let map = Map::from(fst);
let indexes = DocIndexes::from_shared_data_cursor(cursor)?;
Ok(Index { map, indexes})
}
}
impl WriteToBytes for Index {
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let slice = self.map.as_fst().as_bytes();
let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice);
self.indexes.write_to_bytes(bytes);
}
}
impl<'m, 'a> IntoStreamer<'a> for &'m Index {
type Item = (&'a [u8], &'a Set<DocIndex>);
type Into = Stream<'m>;

View File

@ -17,6 +17,9 @@ use lockfree::map::Map;
use hashbrown::HashMap;
use log::{info, error, warn};
use crate::shared_data_cursor::FromSharedDataCursor;
use crate::write_to_bytes::WriteToBytes;
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
pub use self::view::{DatabaseView, DocumentIter};
pub use self::update::Update;
@ -51,7 +54,7 @@ where D: Deref<Target=DB>
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
where D: Deref<Target=DB>
{
use self::update::ReadIndexEvent::*;
use self::update::ReadIndexEvent::{self, *};
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX));
info!("loading index from kv-store took {}", elapsed);
@ -63,7 +66,7 @@ where D: Deref<Target=DB>
info!("index size is {}B", size);
let (elapsed, result) = elapsed::measure_time(|| {
match bincode::deserialize(&bytes)? {
match ReadIndexEvent::from_bytes(bytes.to_vec())? {
RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"),
UpdatedDocuments(index) => Ok(index),
}
@ -80,7 +83,7 @@ where D: Deref<Target=DB>
fn retrieve_data_ranked_map<D>(snapshot: &Snapshot<D>) -> Result<RankedMap, Box<Error>>
where D: Deref<Target=DB>,
{
use self::update::ReadRankedMapEvent::*;
use self::update::ReadRankedMapEvent::{self, *};
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_RANKED_MAP));
info!("loading ranked map from kv-store took {}", elapsed);
@ -92,7 +95,7 @@ where D: Deref<Target=DB>,
info!("ranked map size is {}B", size);
let (elapsed, result) = elapsed::measure_time(|| {
match bincode::deserialize(&bytes)? {
match ReadRankedMapEvent::from_bytes(bytes.to_vec())? {
RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"),
UpdatedDocuments(ranked_map) => Ok(ranked_map),
}
@ -102,40 +105,38 @@ where D: Deref<Target=DB>,
result
},
None => Ok(HashMap::new()),
None => Ok(RankedMap::new()),
}
}
fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
use self::update::ReadIndexEvent::*;
use self::update::ReadIndexEvent::{self, *};
use self::update::WriteIndexEvent;
let mut index = Index::default();
for bytes in existing.into_iter().chain(operands) {
match bincode::deserialize(bytes).unwrap() {
RemovedDocuments(d) => index = index.remove_documents(&d),
match ReadIndexEvent::from_bytes(bytes.to_vec()).unwrap() {
RemovedDocuments(d) => index = index.remove_documents(d.as_ref()),
UpdatedDocuments(i) => index = index.union(&i),
}
}
let event = WriteIndexEvent::UpdatedDocuments(&index);
bincode::serialize(&event).unwrap()
WriteIndexEvent::UpdatedDocuments(&index).into_bytes()
}
fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
use self::update::ReadRankedMapEvent::*;
use self::update::ReadRankedMapEvent::{self, *};
use self::update::WriteRankedMapEvent;
let mut ranked_map = RankedMap::default();
for bytes in existing.into_iter().chain(operands) {
match bincode::deserialize(bytes).unwrap() {
RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.contains(k)),
match ReadRankedMapEvent::from_bytes(bytes.to_vec()).unwrap() {
RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.as_ref().contains(k)),
UpdatedDocuments(i) => ranked_map.extend(i),
}
}
let event = WriteRankedMapEvent::UpdatedDocuments(&ranked_map);
bincode::serialize(&event).unwrap()
WriteRankedMapEvent::UpdatedDocuments(&ranked_map).into_bytes()
}
fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {

View File

@ -1,17 +1,45 @@
use sdset::{Set, SetBuf};
use serde_derive::{Serialize, Deserialize};
use std::error::Error;
use byteorder::{ReadBytesExt, WriteBytesExt};
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
use crate::write_to_bytes::WriteToBytes;
use crate::database::Index;
use crate::DocumentId;
use crate::data::DocIds;
#[derive(Serialize)]
pub enum WriteIndexEvent<'a> {
RemovedDocuments(&'a Set<DocumentId>),
RemovedDocuments(&'a DocIds),
UpdatedDocuments(&'a Index),
}
#[derive(Deserialize)]
impl<'a> WriteToBytes for WriteIndexEvent<'a> {
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
match self {
WriteIndexEvent::RemovedDocuments(doc_ids) => {
let _ = bytes.write_u8(0);
doc_ids.write_to_bytes(bytes);
},
WriteIndexEvent::UpdatedDocuments(index) => {
let _ = bytes.write_u8(1);
index.write_to_bytes(bytes);
}
}
}
}
pub enum ReadIndexEvent {
RemovedDocuments(SetBuf<DocumentId>),
RemovedDocuments(DocIds),
UpdatedDocuments(Index),
}
impl FromSharedDataCursor for ReadIndexEvent {
type Error = Box<Error>;
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Self, Self::Error> {
match cursor.read_u8()? {
0 => DocIds::from_shared_data_cursor(cursor).map(ReadIndexEvent::RemovedDocuments),
1 => Index::from_shared_data_cursor(cursor).map(ReadIndexEvent::UpdatedDocuments),
_ => unreachable!(),
}
}
}

View File

@ -10,11 +10,13 @@ use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
use crate::database::serde::serializer::Serializer;
use crate::database::serde::SerializerError;
use crate::database::schema::SchemaAttr;
use crate::tokenizer::TokenizerBuilder;
use crate::database::schema::Schema;
use crate::database::index::IndexBuilder;
use crate::database::{DATA_INDEX, DATA_RANKED_MAP};
use crate::database::{RankedMap, Number};
use crate::tokenizer::TokenizerBuilder;
use crate::write_to_bytes::WriteToBytes;
use crate::data::DocIds;
use crate::{DocumentId, DocIndex};
pub use self::index_event::{ReadIndexEvent, WriteIndexEvent};
@ -119,7 +121,8 @@ impl RawUpdateBuilder {
}
document_ids.sort_unstable();
SetBuf::new_unchecked(document_ids)
let setbuf = SetBuf::new_unchecked(document_ids);
DocIds::new(&setbuf)
};
// create the Index of all the document updates
@ -140,28 +143,24 @@ impl RawUpdateBuilder {
if !removed_documents.is_empty() {
// remove the documents using the appropriate IndexEvent
let event = WriteIndexEvent::RemovedDocuments(&removed_documents);
let event_bytes = bincode::serialize(&event).unwrap();
let event_bytes = WriteIndexEvent::RemovedDocuments(&removed_documents).into_bytes();
self.batch.merge(DATA_INDEX, &event_bytes)?;
}
// update the documents using the appropriate IndexEvent
let event = WriteIndexEvent::UpdatedDocuments(&index);
let event_bytes = bincode::serialize(&event).unwrap();
let event_bytes = WriteIndexEvent::UpdatedDocuments(&index).into_bytes();
self.batch.merge(DATA_INDEX, &event_bytes)?;
// === ranked map ===
if !removed_documents.is_empty() {
// update the ranked map using the appropriate RankedMapEvent
let event = WriteRankedMapEvent::RemovedDocuments(&removed_documents);
let event_bytes = bincode::serialize(&event).unwrap();
let event_bytes = WriteRankedMapEvent::RemovedDocuments(&removed_documents).into_bytes();
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
}
// update the documents using the appropriate IndexEvent
let event = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields);
let event_bytes = bincode::serialize(&event).unwrap();
let event_bytes = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields).into_bytes();
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
Ok(self.batch)

View File

@ -1,17 +1,48 @@
use sdset::{Set, SetBuf};
use serde_derive::{Serialize, Deserialize};
use std::error::Error;
use byteorder::{ReadBytesExt, WriteBytesExt};
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
use crate::write_to_bytes::WriteToBytes;
use crate::database::RankedMap;
use crate::DocumentId;
use crate::data::DocIds;
#[derive(Serialize)]
pub enum WriteRankedMapEvent<'a> {
RemovedDocuments(&'a Set<DocumentId>),
RemovedDocuments(&'a DocIds),
UpdatedDocuments(&'a RankedMap),
}
#[derive(Deserialize)]
impl<'a> WriteToBytes for WriteRankedMapEvent<'a> {
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
match self {
WriteRankedMapEvent::RemovedDocuments(doc_ids) => {
let _ = bytes.write_u8(0);
doc_ids.write_to_bytes(bytes);
},
WriteRankedMapEvent::UpdatedDocuments(ranked_map) => {
let _ = bytes.write_u8(1);
bincode::serialize_into(bytes, ranked_map).unwrap()
}
}
}
}
pub enum ReadRankedMapEvent {
RemovedDocuments(SetBuf<DocumentId>),
RemovedDocuments(DocIds),
UpdatedDocuments(RankedMap),
}
impl FromSharedDataCursor for ReadRankedMapEvent {
type Error = Box<Error>;
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Self, Self::Error> {
match cursor.read_u8()? {
0 => DocIds::from_shared_data_cursor(cursor).map(ReadRankedMapEvent::RemovedDocuments),
1 => {
let ranked_map = bincode::deserialize_from(cursor)?;
Ok(ReadRankedMapEvent::UpdatedDocuments(ranked_map))
},
_ => unreachable!(),
}
}
}

View File

@ -45,7 +45,12 @@ impl BufRead for SharedDataCursor {
}
pub trait FromSharedDataCursor: Sized {
type Err;
type Error;
fn from_shared_data_cursor(data: &mut SharedDataCursor) -> Result<Self, Self::Err>;
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Self, Self::Error>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error> {
let mut cursor = SharedDataCursor::from_bytes(bytes);
Self::from_shared_data_cursor(&mut cursor)
}
}