diff --git a/src/data/doc_ids.rs b/src/data/doc_ids.rs index 9c4ea6474..ff951bb35 100644 --- a/src/data/doc_ids.rs +++ b/src/data/doc_ids.rs @@ -1,12 +1,15 @@ -use std::io::{self, Cursor, BufRead}; use std::slice::from_raw_parts; use std::mem::size_of; +use std::error::Error; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use sdset::Set; -use crate::DocumentId; +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::data::SharedData; +use crate::DocumentId; + use super::into_u8_slice; #[derive(Default, Clone)] @@ -19,21 +22,6 @@ impl DocIds { DocIds(data) } - pub fn from_cursor(cursor: &mut Cursor) -> io::Result { - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let doc_ids = cursor.get_ref().range(offset, len); - cursor.consume(len); - - Ok(DocIds(doc_ids)) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let len = self.0.len() as u64; - bytes.write_u64::(len).unwrap(); - bytes.extend_from_slice(&self.0); - } - pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -52,3 +40,22 @@ impl AsRef> for DocIds { Set::new_unchecked(slice) } } + +impl FromSharedDataCursor for DocIds { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + let len = cursor.read_u64::()? as usize; + let data = cursor.extract(len); + + Ok(DocIds(data)) + } +} + +impl WriteToBytes for DocIds { + fn write_to_bytes(&self, bytes: &mut Vec) { + let len = self.0.len() as u64; + bytes.write_u64::(len).unwrap(); + bytes.extend_from_slice(&self.0); + } +} diff --git a/src/data/doc_indexes.rs b/src/data/doc_indexes.rs index 17d43fd9a..2bb946745 100644 --- a/src/data/doc_indexes.rs +++ b/src/data/doc_indexes.rs @@ -1,14 +1,16 @@ -use std::io::{self, Write, Cursor, BufRead}; +use std::io::{self, Write}; use std::slice::from_raw_parts; use std::mem::size_of; use std::ops::Index; -use std::sync::Arc; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use sdset::Set; -use crate::DocIndex; +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::data::SharedData; +use crate::DocIndex; + use super::into_u8_slice; #[derive(Debug)] @@ -25,38 +27,6 @@ pub struct DocIndexes { } impl DocIndexes { - pub fn from_bytes(bytes: Vec) -> io::Result { - let len = bytes.len(); - let bytes = Arc::from(bytes); - let data = SharedData::new(bytes, 0, len); - let mut cursor = Cursor::new(data); - DocIndexes::from_cursor(&mut cursor) - } - - pub fn from_cursor(cursor: &mut Cursor) -> io::Result { - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let ranges = cursor.get_ref().range(offset, len); - cursor.consume(len); - - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let indexes = cursor.get_ref().range(offset, len); - cursor.consume(len); - - Ok(DocIndexes { ranges, indexes }) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let ranges_len = self.ranges.len() as u64; - let _ = bytes.write_u64::(ranges_len); - bytes.extend_from_slice(&self.ranges); - - let indexes_len = self.indexes.len() as u64; - let _ = bytes.write_u64::(indexes_len); - bytes.extend_from_slice(&self.indexes); - } - pub fn get(&self, index: usize) -> Option<&Set> { self.ranges().get(index).map(|Range { start, end }| { let start = *start as usize; @@ -92,6 +62,32 @@ impl Index for DocIndexes { } } +impl FromSharedDataCursor for DocIndexes { + type Error = io::Error; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + let len = cursor.read_u64::()? as usize; + let ranges = cursor.extract(len); + + let len = cursor.read_u64::()? as usize; + let indexes = cursor.extract(len); + + Ok(DocIndexes { ranges, indexes }) + } +} + +impl WriteToBytes for DocIndexes { + fn write_to_bytes(&self, bytes: &mut Vec) { + let ranges_len = self.ranges.len() as u64; + let _ = bytes.write_u64::(ranges_len); + bytes.extend_from_slice(&self.ranges); + + let indexes_len = self.indexes.len() as u64; + let _ = bytes.write_u64::(indexes_len); + bytes.extend_from_slice(&self.indexes); + } +} + pub struct DocIndexesBuilder { ranges: Vec, indexes: Vec, diff --git a/src/database/index.rs b/src/database/index.rs index 696042d09..44a76d359 100644 --- a/src/database/index.rs +++ b/src/database/index.rs @@ -1,6 +1,4 @@ -use std::io::{Cursor, BufRead}; use std::error::Error; -use std::sync::Arc; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use fst::{map, Map, IntoStreamer, Streamer}; @@ -8,7 +6,9 @@ use fst::raw::Fst; use sdset::duo::{Union, DifferenceByKey}; use sdset::{Set, SetOperation}; -use crate::data::{SharedData, DocIndexes, DocIndexesBuilder}; +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; +use crate::data::{DocIndexes, DocIndexesBuilder}; use crate::{DocumentId, DocIndex}; #[derive(Default)] @@ -18,46 +18,6 @@ pub struct Index { } impl Index { - pub fn from_bytes(bytes: Vec) -> Result> { - let len = bytes.len(); - Index::from_shared_bytes(Arc::from(bytes), 0, len) - } - - pub fn from_shared_bytes( - bytes: Arc>, - offset: usize, - len: usize, - ) -> Result> - { - let data = SharedData::new(bytes, offset, len); - let mut cursor = Cursor::new(data); - - Index::from_cursor(&mut cursor) - } - - pub fn from_cursor(cursor: &mut Cursor) -> Result> { - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let data = cursor.get_ref().range(offset, len); - - let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?; - let map = Map::from(fst); - cursor.consume(len); - - let indexes = DocIndexes::from_cursor(cursor)?; - - Ok(Index { map, indexes}) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let slice = self.map.as_fst().as_bytes(); - let len = slice.len() as u64; - let _ = bytes.write_u64::(len); - bytes.extend_from_slice(slice); - - self.indexes.write_to_bytes(bytes); - } - pub fn remove_documents(&self, documents: &Set) -> Index { let mut buffer = Vec::new(); let mut builder = IndexBuilder::new(); @@ -116,6 +76,33 @@ impl Index { } } +impl FromSharedDataCursor for Index { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + let len = cursor.read_u64::()? as usize; + let data = cursor.extract(len); + + let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?; + let map = Map::from(fst); + + let indexes = DocIndexes::from_shared_data_cursor(cursor)?; + + Ok(Index { map, indexes}) + } +} + +impl WriteToBytes for Index { + fn write_to_bytes(&self, bytes: &mut Vec) { + let slice = self.map.as_fst().as_bytes(); + let len = slice.len() as u64; + let _ = bytes.write_u64::(len); + bytes.extend_from_slice(slice); + + self.indexes.write_to_bytes(bytes); + } +} + impl<'m, 'a> IntoStreamer<'a> for &'m Index { type Item = (&'a [u8], &'a Set); type Into = Stream<'m>; diff --git a/src/database/mod.rs b/src/database/mod.rs index b92a2af79..c23e5000f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -17,6 +17,9 @@ use lockfree::map::Map; use hashbrown::HashMap; use log::{info, error, warn}; +use crate::shared_data_cursor::FromSharedDataCursor; +use crate::write_to_bytes::WriteToBytes; + pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; pub use self::update::Update; @@ -51,7 +54,7 @@ where D: Deref fn retrieve_data_index(snapshot: &Snapshot) -> Result> where D: Deref { - use self::update::ReadIndexEvent::*; + use self::update::ReadIndexEvent::{self, *}; let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX)); info!("loading index from kv-store took {}", elapsed); @@ -63,7 +66,7 @@ where D: Deref info!("index size is {}B", size); let (elapsed, result) = elapsed::measure_time(|| { - match bincode::deserialize(&bytes)? { + match ReadIndexEvent::from_bytes(bytes.to_vec())? { RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), UpdatedDocuments(index) => Ok(index), } @@ -80,7 +83,7 @@ where D: Deref fn retrieve_data_ranked_map(snapshot: &Snapshot) -> Result> where D: Deref, { - use self::update::ReadRankedMapEvent::*; + use self::update::ReadRankedMapEvent::{self, *}; let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_RANKED_MAP)); info!("loading ranked map from kv-store took {}", elapsed); @@ -92,7 +95,7 @@ where D: Deref, info!("ranked map size is {}B", size); let (elapsed, result) = elapsed::measure_time(|| { - match bincode::deserialize(&bytes)? { + match ReadRankedMapEvent::from_bytes(bytes.to_vec())? { RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), UpdatedDocuments(ranked_map) => Ok(ranked_map), } @@ -102,40 +105,38 @@ where D: Deref, result }, - None => Ok(HashMap::new()), + None => Ok(RankedMap::new()), } } fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - use self::update::ReadIndexEvent::*; + use self::update::ReadIndexEvent::{self, *}; use self::update::WriteIndexEvent; let mut index = Index::default(); for bytes in existing.into_iter().chain(operands) { - match bincode::deserialize(bytes).unwrap() { - RemovedDocuments(d) => index = index.remove_documents(&d), + match ReadIndexEvent::from_bytes(bytes.to_vec()).unwrap() { + RemovedDocuments(d) => index = index.remove_documents(d.as_ref()), UpdatedDocuments(i) => index = index.union(&i), } } - let event = WriteIndexEvent::UpdatedDocuments(&index); - bincode::serialize(&event).unwrap() + WriteIndexEvent::UpdatedDocuments(&index).into_bytes() } fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - use self::update::ReadRankedMapEvent::*; + use self::update::ReadRankedMapEvent::{self, *}; use self::update::WriteRankedMapEvent; let mut ranked_map = RankedMap::default(); for bytes in existing.into_iter().chain(operands) { - match bincode::deserialize(bytes).unwrap() { - RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.contains(k)), + match ReadRankedMapEvent::from_bytes(bytes.to_vec()).unwrap() { + RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.as_ref().contains(k)), UpdatedDocuments(i) => ranked_map.extend(i), } } - let event = WriteRankedMapEvent::UpdatedDocuments(&ranked_map); - bincode::serialize(&event).unwrap() + WriteRankedMapEvent::UpdatedDocuments(&ranked_map).into_bytes() } fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { diff --git a/src/database/update/index_event.rs b/src/database/update/index_event.rs index fb8af1bd5..74c6242e7 100644 --- a/src/database/update/index_event.rs +++ b/src/database/update/index_event.rs @@ -1,17 +1,45 @@ -use sdset::{Set, SetBuf}; -use serde_derive::{Serialize, Deserialize}; +use std::error::Error; +use byteorder::{ReadBytesExt, WriteBytesExt}; + +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::database::Index; -use crate::DocumentId; +use crate::data::DocIds; -#[derive(Serialize)] pub enum WriteIndexEvent<'a> { - RemovedDocuments(&'a Set), + RemovedDocuments(&'a DocIds), UpdatedDocuments(&'a Index), } -#[derive(Deserialize)] +impl<'a> WriteToBytes for WriteIndexEvent<'a> { + fn write_to_bytes(&self, bytes: &mut Vec) { + match self { + WriteIndexEvent::RemovedDocuments(doc_ids) => { + let _ = bytes.write_u8(0); + doc_ids.write_to_bytes(bytes); + }, + WriteIndexEvent::UpdatedDocuments(index) => { + let _ = bytes.write_u8(1); + index.write_to_bytes(bytes); + } + } + } +} + pub enum ReadIndexEvent { - RemovedDocuments(SetBuf), + RemovedDocuments(DocIds), UpdatedDocuments(Index), } + +impl FromSharedDataCursor for ReadIndexEvent { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + match cursor.read_u8()? { + 0 => DocIds::from_shared_data_cursor(cursor).map(ReadIndexEvent::RemovedDocuments), + 1 => Index::from_shared_data_cursor(cursor).map(ReadIndexEvent::UpdatedDocuments), + _ => unreachable!(), + } + } +} diff --git a/src/database/update/mod.rs b/src/database/update/mod.rs index ba32dc718..548fb8bc2 100644 --- a/src/database/update/mod.rs +++ b/src/database/update/mod.rs @@ -10,11 +10,13 @@ use crate::database::document_key::{DocumentKey, DocumentKeyAttr}; use crate::database::serde::serializer::Serializer; use crate::database::serde::SerializerError; use crate::database::schema::SchemaAttr; -use crate::tokenizer::TokenizerBuilder; use crate::database::schema::Schema; use crate::database::index::IndexBuilder; use crate::database::{DATA_INDEX, DATA_RANKED_MAP}; use crate::database::{RankedMap, Number}; +use crate::tokenizer::TokenizerBuilder; +use crate::write_to_bytes::WriteToBytes; +use crate::data::DocIds; use crate::{DocumentId, DocIndex}; pub use self::index_event::{ReadIndexEvent, WriteIndexEvent}; @@ -119,7 +121,8 @@ impl RawUpdateBuilder { } document_ids.sort_unstable(); - SetBuf::new_unchecked(document_ids) + let setbuf = SetBuf::new_unchecked(document_ids); + DocIds::new(&setbuf) }; // create the Index of all the document updates @@ -140,28 +143,24 @@ impl RawUpdateBuilder { if !removed_documents.is_empty() { // remove the documents using the appropriate IndexEvent - let event = WriteIndexEvent::RemovedDocuments(&removed_documents); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteIndexEvent::RemovedDocuments(&removed_documents).into_bytes(); self.batch.merge(DATA_INDEX, &event_bytes)?; } // update the documents using the appropriate IndexEvent - let event = WriteIndexEvent::UpdatedDocuments(&index); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteIndexEvent::UpdatedDocuments(&index).into_bytes(); self.batch.merge(DATA_INDEX, &event_bytes)?; // === ranked map === if !removed_documents.is_empty() { // update the ranked map using the appropriate RankedMapEvent - let event = WriteRankedMapEvent::RemovedDocuments(&removed_documents); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteRankedMapEvent::RemovedDocuments(&removed_documents).into_bytes(); self.batch.merge(DATA_RANKED_MAP, &event_bytes)?; } // update the documents using the appropriate IndexEvent - let event = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields).into_bytes(); self.batch.merge(DATA_RANKED_MAP, &event_bytes)?; Ok(self.batch) diff --git a/src/database/update/ranked_map_event.rs b/src/database/update/ranked_map_event.rs index 16bbed71b..2fba12f04 100644 --- a/src/database/update/ranked_map_event.rs +++ b/src/database/update/ranked_map_event.rs @@ -1,17 +1,48 @@ -use sdset::{Set, SetBuf}; -use serde_derive::{Serialize, Deserialize}; +use std::error::Error; +use byteorder::{ReadBytesExt, WriteBytesExt}; + +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::database::RankedMap; -use crate::DocumentId; +use crate::data::DocIds; -#[derive(Serialize)] pub enum WriteRankedMapEvent<'a> { - RemovedDocuments(&'a Set), + RemovedDocuments(&'a DocIds), UpdatedDocuments(&'a RankedMap), } -#[derive(Deserialize)] +impl<'a> WriteToBytes for WriteRankedMapEvent<'a> { + fn write_to_bytes(&self, bytes: &mut Vec) { + match self { + WriteRankedMapEvent::RemovedDocuments(doc_ids) => { + let _ = bytes.write_u8(0); + doc_ids.write_to_bytes(bytes); + }, + WriteRankedMapEvent::UpdatedDocuments(ranked_map) => { + let _ = bytes.write_u8(1); + bincode::serialize_into(bytes, ranked_map).unwrap() + } + } + } +} + pub enum ReadRankedMapEvent { - RemovedDocuments(SetBuf), + RemovedDocuments(DocIds), UpdatedDocuments(RankedMap), } + +impl FromSharedDataCursor for ReadRankedMapEvent { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + match cursor.read_u8()? { + 0 => DocIds::from_shared_data_cursor(cursor).map(ReadRankedMapEvent::RemovedDocuments), + 1 => { + let ranked_map = bincode::deserialize_from(cursor)?; + Ok(ReadRankedMapEvent::UpdatedDocuments(ranked_map)) + }, + _ => unreachable!(), + } + } +} diff --git a/src/shared_data_cursor.rs b/src/shared_data_cursor.rs index 785d1b2cf..00d36884a 100644 --- a/src/shared_data_cursor.rs +++ b/src/shared_data_cursor.rs @@ -45,7 +45,12 @@ impl BufRead for SharedDataCursor { } pub trait FromSharedDataCursor: Sized { - type Err; + type Error; - fn from_shared_data_cursor(data: &mut SharedDataCursor) -> Result; + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result; + + fn from_bytes(bytes: Vec) -> Result { + let mut cursor = SharedDataCursor::from_bytes(bytes); + Self::from_shared_data_cursor(&mut cursor) + } }