From cbb0aaa2179d99c1d02502d0539e700fc8b0cb7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 14 Feb 2019 20:22:25 +0100 Subject: [PATCH 1/9] feat: Introduce the Index structure along with the Events types --- Cargo.toml | 7 +- src/data/doc_indexes.rs | 2 +- src/data/mod.rs | 4 +- src/database/{index/positive.rs => index.rs} | 120 +++++++++++-------- src/database/index/mod.rs | 82 ------------- src/database/index/negative.rs | 43 ------- src/database/mod.rs | 90 +++++++++----- src/database/schema.rs | 3 +- src/database/update/index_event.rs | 17 +++ src/database/{update.rs => update/mod.rs} | 77 +++++++----- src/database/update/ranked_map_event.rs | 17 +++ src/rank/query_builder.rs | 4 +- 12 files changed, 221 insertions(+), 245 deletions(-) rename src/database/{index/positive.rs => index.rs} (57%) delete mode 100644 src/database/index/mod.rs delete mode 100644 src/database/index/negative.rs create mode 100644 src/database/update/index_event.rs rename src/database/{update.rs => update/mod.rs} (73%) create mode 100644 src/database/update/ranked_map_event.rs diff --git a/Cargo.toml b/Cargo.toml index 0b4d6c092..45413edd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,24 +5,25 @@ version = "0.3.0" authors = ["Kerollmops "] [dependencies] +arc-swap = "0.3" bincode = "1.0" byteorder = "1.2" -arc-swap = "0.3" elapsed = "0.1" fst = "0.3" hashbrown = { version = "0.1", features = ["serde"] } lazy_static = "1.1" levenshtein_automata = { version = "0.1", features = ["fst_automaton"] } linked-hash-map = { version = "0.5", features = ["serde_impl"] } +lockfree = "0.5" log = "0.4" +rayon = "1.0" sdset = "0.3" serde = "1.0" serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } +size_format = "1.0" slice-group-by = "0.2" unidecode = "0.3" -rayon = "1.0" -lockfree = "0.5.1" [dependencies.toml] git = "https://github.com/Kerollmops/toml-rs.git" diff --git a/src/data/doc_indexes.rs b/src/data/doc_indexes.rs index 67106a948..17d43fd9a 100644 --- a/src/data/doc_indexes.rs +++ b/src/data/doc_indexes.rs @@ -26,8 +26,8 @@ pub struct DocIndexes { impl DocIndexes { pub fn from_bytes(bytes: Vec) -> io::Result { - let bytes = Arc::new(bytes); let len = bytes.len(); + let bytes = Arc::from(bytes); let data = SharedData::new(bytes, 0, len); let mut cursor = Cursor::new(data); DocIndexes::from_cursor(&mut cursor) diff --git a/src/data/mod.rs b/src/data/mod.rs index 0e0b0e2c4..328d66604 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -9,7 +9,7 @@ use std::sync::Arc; pub use self::doc_ids::DocIds; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder}; -#[derive(Default, Clone)] +#[derive(Clone, Default)] pub struct SharedData { pub bytes: Arc>, pub offset: usize, @@ -19,7 +19,7 @@ pub struct SharedData { impl SharedData { pub fn from_bytes(vec: Vec) -> SharedData { let len = vec.len(); - let bytes = Arc::new(vec); + let bytes = Arc::from(vec); SharedData::new(bytes, 0, len) } diff --git a/src/database/index/positive.rs b/src/database/index.rs similarity index 57% rename from src/database/index/positive.rs rename to src/database/index.rs index d6c3bf3d5..696042d09 100644 --- a/src/database/index/positive.rs +++ b/src/database/index.rs @@ -1,28 +1,41 @@ -use std::io::{Write, BufRead, Cursor}; +use std::io::{Cursor, BufRead}; use std::error::Error; +use std::sync::Arc; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use fst::{map, Map, Streamer, IntoStreamer}; -use sdset::{Set, SetOperation}; -use sdset::duo::Union; +use fst::{map, Map, IntoStreamer, Streamer}; use fst::raw::Fst; +use sdset::duo::{Union, DifferenceByKey}; +use sdset::{Set, SetOperation}; -use crate::data::{DocIndexes, DocIndexesBuilder}; -use crate::data::SharedData; -use crate::DocIndex; +use crate::data::{SharedData, DocIndexes, DocIndexesBuilder}; +use crate::{DocumentId, DocIndex}; #[derive(Default)] -pub struct Positive { - map: Map, - indexes: DocIndexes, +pub struct Index { + pub map: Map, + pub indexes: DocIndexes, } -impl Positive { - pub fn new(map: Map, indexes: DocIndexes) -> Positive { - Positive { map, indexes } +impl Index { + pub fn from_bytes(bytes: Vec) -> Result> { + let len = bytes.len(); + Index::from_shared_bytes(Arc::from(bytes), 0, len) } - pub fn from_cursor(cursor: &mut Cursor) -> Result> { + pub fn from_shared_bytes( + bytes: Arc>, + offset: usize, + len: usize, + ) -> Result> + { + let data = SharedData::new(bytes, offset, len); + let mut cursor = Cursor::new(data); + + Index::from_cursor(&mut cursor) + } + + pub fn from_cursor(cursor: &mut Cursor) -> Result> { let len = cursor.read_u64::()? as usize; let offset = cursor.position() as usize; let data = cursor.get_ref().range(offset, len); @@ -33,7 +46,7 @@ impl Positive { let indexes = DocIndexes::from_cursor(cursor)?; - Ok(Positive { map, indexes}) + Ok(Index { map, indexes}) } pub fn write_to_bytes(&self, bytes: &mut Vec) { @@ -45,16 +58,28 @@ impl Positive { self.indexes.write_to_bytes(bytes); } - pub fn map(&self) -> &Map { - &self.map + pub fn remove_documents(&self, documents: &Set) -> Index { + let mut buffer = Vec::new(); + let mut builder = IndexBuilder::new(); + let mut stream = self.into_stream(); + + while let Some((key, indexes)) = stream.next() { + buffer.clear(); + + let op = DifferenceByKey::new(indexes, documents, |x| x.document_id, |x| *x); + op.extend_vec(&mut buffer); + + if !buffer.is_empty() { + let indexes = Set::new_unchecked(&buffer); + builder.insert(key, indexes).unwrap(); + } + } + + builder.build() } - pub fn indexes(&self) -> &DocIndexes { - &self.indexes - } - - pub fn union(&self, other: &Positive) -> Result> { - let mut builder = PositiveBuilder::memory(); + pub fn union(&self, other: &Index) -> Index { + let mut builder = IndexBuilder::new(); let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union(); let mut buffer = Vec::new(); @@ -63,19 +88,19 @@ impl Positive { match ivalues { [a, b] => { let indexes = if a.index == 0 { &self.indexes } else { &other.indexes }; - let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?; + let indexes = &indexes[a.value as usize]; let a = Set::new_unchecked(indexes); let indexes = if b.index == 0 { &self.indexes } else { &other.indexes }; - let indexes = indexes.get(b.value as usize).ok_or(format!("index not found"))?; + let indexes = &indexes[b.value as usize]; let b = Set::new_unchecked(indexes); let op = Union::new(a, b); op.extend_vec(&mut buffer); }, - [a] => { - let indexes = if a.index == 0 { &self.indexes } else { &other.indexes }; - let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?; + [x] => { + let indexes = if x.index == 0 { &self.indexes } else { &other.indexes }; + let indexes = &indexes[x.value as usize]; buffer.extend_from_slice(indexes) }, _ => continue, @@ -83,23 +108,18 @@ impl Positive { if !buffer.is_empty() { let indexes = Set::new_unchecked(&buffer); - builder.insert(key, indexes)?; + builder.insert(key, indexes).unwrap(); } } - let (map, indexes) = builder.into_inner()?; - let map = Map::from_bytes(map)?; - let indexes = DocIndexes::from_bytes(indexes)?; - Ok(Positive { map, indexes }) + builder.build() } } -impl<'m, 'a> IntoStreamer<'a> for &'m Positive { +impl<'m, 'a> IntoStreamer<'a> for &'m Index { type Item = (&'a [u8], &'a Set); - /// The type of the stream to be constructed. type Into = Stream<'m>; - /// Construct a stream from `Self`. fn into_stream(self) -> Self::Into { Stream { map_stream: self.map.into_stream(), @@ -128,28 +148,26 @@ impl<'m, 'a> Streamer<'a> for Stream<'m> { } } -pub struct PositiveBuilder { - map: fst::MapBuilder, - indexes: DocIndexesBuilder, +pub struct IndexBuilder { + map: fst::MapBuilder>, + indexes: DocIndexesBuilder>, value: u64, } -impl PositiveBuilder, Vec> { - pub fn memory() -> Self { - PositiveBuilder { +impl IndexBuilder { + pub fn new() -> Self { + IndexBuilder { map: fst::MapBuilder::memory(), indexes: DocIndexesBuilder::memory(), value: 0, } } -} -impl PositiveBuilder { /// If a key is inserted that is less than or equal to any previous key added, /// then an error is returned. Similarly, if there was a problem writing /// to the underlying writer, an error is returned. // FIXME what if one write doesn't work but the other do ? - pub fn insert(&mut self, key: K, indexes: &Set) -> Result<(), Box> + pub fn insert(&mut self, key: K, indexes: &Set) -> fst::Result<()> where K: AsRef<[u8]>, { self.map.insert(key, self.value)?; @@ -158,9 +176,13 @@ impl PositiveBuilder { Ok(()) } - pub fn into_inner(self) -> Result<(W, X), Box> { - let map = self.map.into_inner()?; - let indexes = self.indexes.into_inner()?; - Ok((map, indexes)) + pub fn build(self) -> Index { + let map = self.map.into_inner().unwrap(); + let indexes = self.indexes.into_inner().unwrap(); + + let map = Map::from_bytes(map).unwrap(); + let indexes = DocIndexes::from_bytes(indexes).unwrap(); + + Index { map, indexes } } } diff --git a/src/database/index/mod.rs b/src/database/index/mod.rs deleted file mode 100644 index f9964f1f5..000000000 --- a/src/database/index/mod.rs +++ /dev/null @@ -1,82 +0,0 @@ -mod negative; -mod positive; - -pub(crate) use self::negative::Negative; -pub(crate) use self::positive::{Positive, PositiveBuilder}; - -use std::error::Error; -use std::io::Cursor; -use std::sync::Arc; - -use fst::{IntoStreamer, Streamer}; -use sdset::duo::DifferenceByKey; -use sdset::{Set, SetOperation}; -use fst::Map; - -use crate::data::{SharedData, DocIndexes}; - -#[derive(Default)] -pub struct Index { - pub(crate) negative: Negative, - pub(crate) positive: Positive, -} - -impl Index { - pub fn from_bytes(bytes: Vec) -> Result> { - let len = bytes.len(); - Index::from_shared_bytes(Arc::new(bytes), 0, len) - } - - pub fn from_shared_bytes( - bytes: Arc>, - offset: usize, - len: usize, - ) -> Result> - { - let data = SharedData::new(bytes, offset, len); - let mut cursor = Cursor::new(data); - - let negative = Negative::from_cursor(&mut cursor)?; - let positive = Positive::from_cursor(&mut cursor)?; - Ok(Index { negative, positive }) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - self.negative.write_to_bytes(bytes); - self.positive.write_to_bytes(bytes); - } - - pub fn merge(&self, other: &Index) -> Result> { - if other.negative.is_empty() { - let negative = Negative::default(); - let positive = self.positive.union(&other.positive)?; - return Ok(Index { negative, positive }) - } - - let mut buffer = Vec::new(); - let mut builder = PositiveBuilder::memory(); - let mut stream = self.positive.into_stream(); - while let Some((key, indexes)) = stream.next() { - let op = DifferenceByKey::new(indexes, &other.negative, |x| x.document_id, |x| *x); - - buffer.clear(); - op.extend_vec(&mut buffer); - - if !buffer.is_empty() { - let indexes = Set::new_unchecked(&buffer); - builder.insert(key, indexes)?; - } - } - - let positive = { - let (map, indexes) = builder.into_inner()?; - let map = Map::from_bytes(map)?; - let indexes = DocIndexes::from_bytes(indexes)?; - Positive::new(map, indexes) - }; - - let negative = Negative::default(); - let positive = positive.union(&other.positive)?; - Ok(Index { negative, positive }) - } -} diff --git a/src/database/index/negative.rs b/src/database/index/negative.rs deleted file mode 100644 index 822c99d20..000000000 --- a/src/database/index/negative.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::error::Error; -use std::io::Cursor; -use std::ops::Deref; - -use sdset::Set; -use byteorder::{LittleEndian, WriteBytesExt}; - -use crate::data::SharedData; -use crate::data::DocIds; -use crate::DocumentId; - -#[derive(Default)] -pub struct Negative(DocIds); - -impl Negative { - pub fn new(doc_ids: DocIds) -> Negative { - Negative(doc_ids) - } - - pub fn from_cursor(cursor: &mut Cursor) -> Result> { - let doc_ids = DocIds::from_cursor(cursor)?; - Ok(Negative(doc_ids)) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let slice = self.0.as_bytes(); - let len = slice.len() as u64; - let _ = bytes.write_u64::(len); - bytes.extend_from_slice(slice); - } - - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } -} - -impl Deref for Negative { - type Target = Set; - - fn deref(&self) -> &Self::Target { - self.0.as_ref() - } -} diff --git a/src/database/mod.rs b/src/database/mod.rs index 791634b2d..b92a2af79 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -11,6 +11,7 @@ use std::ops::{Deref, DerefMut}; use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions}; use rocksdb::rocksdb::{Writable, Snapshot}; use rocksdb::{DB, MergeOperands}; +use size_format::SizeFormatterBinary; use arc_swap::ArcSwap; use lockfree::map::Map; use hashbrown::HashMap; @@ -50,64 +51,91 @@ where D: Deref fn retrieve_data_index(snapshot: &Snapshot) -> Result> where D: Deref { + use self::update::ReadIndexEvent::*; + let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX)); info!("loading index from kv-store took {}", elapsed); - let index = match vector? { + match vector? { Some(vector) => { let bytes = vector.as_ref().to_vec(); - info!("index size if {} MiB", bytes.len() / 1024 / 1024); + let size = SizeFormatterBinary::new(bytes.len() as u64); + info!("index size is {}B", size); + + let (elapsed, result) = elapsed::measure_time(|| { + match bincode::deserialize(&bytes)? { + RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), + UpdatedDocuments(index) => Ok(index), + } + }); - let (elapsed, index) = elapsed::measure_time(|| Index::from_bytes(bytes)); info!("loading index from bytes took {}", elapsed); - index? + result }, - None => Index::default(), - }; - - Ok(index) + None => Ok(Index::default()), + } } fn retrieve_data_ranked_map(snapshot: &Snapshot) -> Result> where D: Deref, { - match snapshot.get(DATA_RANKED_MAP)? { - Some(vector) => Ok(bincode::deserialize(&*vector)?), + use self::update::ReadRankedMapEvent::*; + + let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_RANKED_MAP)); + info!("loading ranked map from kv-store took {}", elapsed); + + match vector? { + Some(vector) => { + let bytes = vector.as_ref().to_vec(); + let size = SizeFormatterBinary::new(bytes.len() as u64); + info!("ranked map size is {}B", size); + + let (elapsed, result) = elapsed::measure_time(|| { + match bincode::deserialize(&bytes)? { + RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), + UpdatedDocuments(ranked_map) => Ok(ranked_map), + } + }); + + info!("loading ranked map from bytes took {}", elapsed); + + result + }, None => Ok(HashMap::new()), } } fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - let mut index: Option = None; - for bytes in existing.into_iter().chain(operands) { - let operand = Index::from_bytes(bytes.to_vec()).unwrap(); - let merged = match index { - Some(ref index) => index.merge(&operand).unwrap(), - None => operand, - }; + use self::update::ReadIndexEvent::*; + use self::update::WriteIndexEvent; - index.replace(merged); + let mut index = Index::default(); + for bytes in existing.into_iter().chain(operands) { + match bincode::deserialize(bytes).unwrap() { + RemovedDocuments(d) => index = index.remove_documents(&d), + UpdatedDocuments(i) => index = index.union(&i), + } } - let index = index.unwrap_or_default(); - let mut bytes = Vec::new(); - index.write_to_bytes(&mut bytes); - bytes + let event = WriteIndexEvent::UpdatedDocuments(&index); + bincode::serialize(&event).unwrap() } fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - let mut ranked_map: Option = None; + use self::update::ReadRankedMapEvent::*; + use self::update::WriteRankedMapEvent; + + let mut ranked_map = RankedMap::default(); for bytes in existing.into_iter().chain(operands) { - let operand: RankedMap = bincode::deserialize(bytes).unwrap(); - match ranked_map { - Some(ref mut ranked_map) => ranked_map.extend(operand), - None => { ranked_map.replace(operand); }, - }; + match bincode::deserialize(bytes).unwrap() { + RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.contains(k)), + UpdatedDocuments(i) => ranked_map.extend(i), + } } - let ranked_map = ranked_map.unwrap_or_default(); - bincode::serialize(&ranked_map).unwrap() + let event = WriteRankedMapEvent::UpdatedDocuments(&ranked_map); + bincode::serialize(&event).unwrap() } fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { @@ -247,7 +275,7 @@ impl Drop for DatabaseIndex { fn drop(&mut self) { if self.must_die.load(Ordering::Relaxed) { if let Err(err) = fs::remove_dir_all(&self.path) { - error!("Impossible to remove mdb when Database id dropped; {}", err); + error!("Impossible to remove mdb when Database is dropped; {}", err); } } } diff --git a/src/database/schema.rs b/src/database/schema.rs index 3a8878ee3..7ae4289cf 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use serde_derive::{Serialize, Deserialize}; use linked_hash_map::LinkedHashMap; -use serde::Serialize; use crate::database::serde::find_id::FindDocumentIdSerializer; use crate::database::serde::SerializerError; @@ -168,7 +167,7 @@ impl Schema { } pub fn document_id(&self, document: T) -> Result - where T: Serialize, + where T: serde::Serialize, { let id_attribute_name = &self.inner.identifier; let serializer = FindDocumentIdSerializer { id_attribute_name }; diff --git a/src/database/update/index_event.rs b/src/database/update/index_event.rs new file mode 100644 index 000000000..fb8af1bd5 --- /dev/null +++ b/src/database/update/index_event.rs @@ -0,0 +1,17 @@ +use sdset::{Set, SetBuf}; +use serde_derive::{Serialize, Deserialize}; + +use crate::database::Index; +use crate::DocumentId; + +#[derive(Serialize)] +pub enum WriteIndexEvent<'a> { + RemovedDocuments(&'a Set), + UpdatedDocuments(&'a Index), +} + +#[derive(Deserialize)] +pub enum ReadIndexEvent { + RemovedDocuments(SetBuf), + UpdatedDocuments(Index), +} diff --git a/src/database/update.rs b/src/database/update/mod.rs similarity index 73% rename from src/database/update.rs rename to src/database/update/mod.rs index e37576e6d..ba32dc718 100644 --- a/src/database/update.rs +++ b/src/database/update/mod.rs @@ -3,23 +3,26 @@ use std::error::Error; use rocksdb::rocksdb::{Writable, WriteBatch}; use hashbrown::hash_map::HashMap; +use sdset::{Set, SetBuf}; use serde::Serialize; -use fst::map::Map; -use sdset::Set; -use crate::database::index::{Positive, PositiveBuilder, Negative}; use crate::database::document_key::{DocumentKey, DocumentKeyAttr}; use crate::database::serde::serializer::Serializer; use crate::database::serde::SerializerError; use crate::database::schema::SchemaAttr; use crate::tokenizer::TokenizerBuilder; -use crate::data::{DocIds, DocIndexes}; use crate::database::schema::Schema; -use crate::database::index::Index; +use crate::database::index::IndexBuilder; use crate::database::{DATA_INDEX, DATA_RANKED_MAP}; use crate::database::{RankedMap, Number}; use crate::{DocumentId, DocIndex}; +pub use self::index_event::{ReadIndexEvent, WriteIndexEvent}; +pub use self::ranked_map_event::{ReadRankedMapEvent, WriteRankedMapEvent}; + +mod index_event; +mod ranked_map_event; + pub type Token = Vec; // TODO could be replaced by a SmallVec pub struct Update { @@ -106,46 +109,60 @@ impl RawUpdateBuilder { } pub fn build(self) -> Result> { - let negative = { - let mut removed_document_ids = Vec::new(); + // create the list of all the removed documents + let removed_documents = { + let mut document_ids = Vec::new(); for (id, update_type) in self.documents_update { if update_type == Deleted { - removed_document_ids.push(id); + document_ids.push(id); } } - removed_document_ids.sort_unstable(); - let removed_document_ids = Set::new_unchecked(&removed_document_ids); - let doc_ids = DocIds::new(removed_document_ids); - - Negative::new(doc_ids) + document_ids.sort_unstable(); + SetBuf::new_unchecked(document_ids) }; - let positive = { - let mut positive_builder = PositiveBuilder::memory(); - + // create the Index of all the document updates + let index = { + let mut builder = IndexBuilder::new(); for (key, mut indexes) in self.indexed_words { indexes.sort_unstable(); let indexes = Set::new_unchecked(&indexes); - positive_builder.insert(key, indexes)?; + builder.insert(key, indexes).unwrap(); } - - let (map, indexes) = positive_builder.into_inner()?; - let map = Map::from_bytes(map)?; - let indexes = DocIndexes::from_bytes(indexes)?; - - Positive::new(map, indexes) + builder.build() }; - let index = Index { negative, positive }; + // WARN: removed documents must absolutely + // be merged *before* document updates - // write the data-index - let mut bytes_index = Vec::new(); - index.write_to_bytes(&mut bytes_index); - self.batch.merge(DATA_INDEX, &bytes_index)?; + // === index === - let bytes_ranked_map = bincode::serialize(&self.documents_ranked_fields).unwrap(); - self.batch.merge(DATA_RANKED_MAP, &bytes_ranked_map)?; + if !removed_documents.is_empty() { + // remove the documents using the appropriate IndexEvent + let event = WriteIndexEvent::RemovedDocuments(&removed_documents); + let event_bytes = bincode::serialize(&event).unwrap(); + self.batch.merge(DATA_INDEX, &event_bytes)?; + } + + // update the documents using the appropriate IndexEvent + let event = WriteIndexEvent::UpdatedDocuments(&index); + let event_bytes = bincode::serialize(&event).unwrap(); + self.batch.merge(DATA_INDEX, &event_bytes)?; + + // === ranked map === + + if !removed_documents.is_empty() { + // update the ranked map using the appropriate RankedMapEvent + let event = WriteRankedMapEvent::RemovedDocuments(&removed_documents); + let event_bytes = bincode::serialize(&event).unwrap(); + self.batch.merge(DATA_RANKED_MAP, &event_bytes)?; + } + + // update the documents using the appropriate IndexEvent + let event = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields); + let event_bytes = bincode::serialize(&event).unwrap(); + self.batch.merge(DATA_RANKED_MAP, &event_bytes)?; Ok(self.batch) } diff --git a/src/database/update/ranked_map_event.rs b/src/database/update/ranked_map_event.rs new file mode 100644 index 000000000..16bbed71b --- /dev/null +++ b/src/database/update/ranked_map_event.rs @@ -0,0 +1,17 @@ +use sdset::{Set, SetBuf}; +use serde_derive::{Serialize, Deserialize}; + +use crate::database::RankedMap; +use crate::DocumentId; + +#[derive(Serialize)] +pub enum WriteRankedMapEvent<'a> { + RemovedDocuments(&'a Set), + UpdatedDocuments(&'a RankedMap), +} + +#[derive(Deserialize)] +pub enum ReadRankedMapEvent { + RemovedDocuments(SetBuf), + UpdatedDocuments(RankedMap), +} diff --git a/src/rank/query_builder.rs b/src/rank/query_builder.rs index c7fe7b528..7bdaa109d 100644 --- a/src/rank/query_builder.rs +++ b/src/rank/query_builder.rs @@ -89,7 +89,7 @@ where D: Deref, let mut stream = { let mut op_builder = fst::map::OpBuilder::new(); for automaton in &automatons { - let stream = self.view.index().positive.map().search(automaton); + let stream = self.view.index().map.search(automaton); op_builder.push(stream); } op_builder.union() @@ -103,7 +103,7 @@ where D: Deref, let distance = automaton.eval(input).to_u8(); let is_exact = distance == 0 && input.len() == automaton.query_len(); - let doc_indexes = &self.view.index().positive.indexes(); + let doc_indexes = &self.view.index().indexes; let doc_indexes = &doc_indexes[iv.value as usize]; for doc_index in doc_indexes { From c4e70d04759f2737d83b07c9c6216962901ce3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 17 Feb 2019 16:29:11 +0100 Subject: [PATCH 2/9] feat: Introduce the SharedDataCursor type --- src/shared_data_cursor.rs | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/shared_data_cursor.rs diff --git a/src/shared_data_cursor.rs b/src/shared_data_cursor.rs new file mode 100644 index 000000000..c7eeee5c9 --- /dev/null +++ b/src/shared_data_cursor.rs @@ -0,0 +1,45 @@ +use std::io::{self, Read, Cursor, BufRead}; +use std::sync::Arc; +use crate::data::SharedData; + +pub struct SharedDataCursor(Cursor); + +impl SharedDataCursor { + pub fn from_bytes(bytes: Vec) -> SharedDataCursor { + let len = bytes.len(); + let bytes = Arc::new(bytes); + + SharedDataCursor::from_shared_bytes(bytes, 0, len) + } + + pub fn from_shared_bytes(bytes: Arc>, offset: usize, len: usize) -> SharedDataCursor { + let data = SharedData::new(bytes, offset, len); + let cursor = Cursor::new(data); + + SharedDataCursor(cursor) + } + + pub fn extract(&mut self, amt: usize) -> SharedData { + let offset = self.0.position() as usize; + let extracted = self.0.get_ref().range(offset, amt); + self.0.consume(amt); + + extracted + } +} + +impl Read for SharedDataCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } +} + +impl BufRead for SharedDataCursor { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.0.fill_buf() + } + + fn consume(&mut self, amt: usize) { + self.0.consume(amt) + } +} From 9e7261a48fc8dd5d0b693be92bbd5c73a1e4f197 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 17 Feb 2019 16:29:49 +0100 Subject: [PATCH 3/9] feat: Introduce the FromSharedDataCursor trait --- src/shared_data_cursor.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/shared_data_cursor.rs b/src/shared_data_cursor.rs index c7eeee5c9..785d1b2cf 100644 --- a/src/shared_data_cursor.rs +++ b/src/shared_data_cursor.rs @@ -43,3 +43,9 @@ impl BufRead for SharedDataCursor { self.0.consume(amt) } } + +pub trait FromSharedDataCursor: Sized { + type Err; + + fn from_shared_data_cursor(data: &mut SharedDataCursor) -> Result; +} From 8014857ebfdff9f6d6ab105dc271057c10e3b24d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 17 Feb 2019 16:32:43 +0100 Subject: [PATCH 4/9] feat: Introduce the WriteToBytes trait --- src/lib.rs | 2 ++ src/write_to_bytes.rs | 9 +++++++++ 2 files changed, 11 insertions(+) create mode 100644 src/write_to_bytes.rs diff --git a/src/lib.rs b/src/lib.rs index 9c0641090..a111b5049 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,8 @@ pub mod data; pub mod rank; pub mod tokenizer; mod common_words; +mod shared_data_cursor; +mod write_to_bytes; use serde_derive::{Serialize, Deserialize}; diff --git a/src/write_to_bytes.rs b/src/write_to_bytes.rs new file mode 100644 index 000000000..4837eb6a3 --- /dev/null +++ b/src/write_to_bytes.rs @@ -0,0 +1,9 @@ +pub trait WriteToBytes { + fn write_to_bytes(&self, bytes: &mut Vec); + + fn into_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + self.write_to_bytes(&mut bytes); + bytes + } +} From a8df438814cc71735d76ce1a26cd840754a3dce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 17 Feb 2019 16:33:42 +0100 Subject: [PATCH 5/9] feat: Implement WriteToBytes/FromSharedDataCursor --- src/data/doc_ids.rs | 41 ++++++++------ src/data/doc_indexes.rs | 66 +++++++++++----------- src/database/index.rs | 73 ++++++++++--------------- src/database/mod.rs | 31 ++++++----- src/database/update/index_event.rs | 42 +++++++++++--- src/database/update/mod.rs | 19 +++---- src/database/update/ranked_map_event.rs | 45 ++++++++++++--- src/shared_data_cursor.rs | 9 ++- 8 files changed, 190 insertions(+), 136 deletions(-) diff --git a/src/data/doc_ids.rs b/src/data/doc_ids.rs index 9c4ea6474..ff951bb35 100644 --- a/src/data/doc_ids.rs +++ b/src/data/doc_ids.rs @@ -1,12 +1,15 @@ -use std::io::{self, Cursor, BufRead}; use std::slice::from_raw_parts; use std::mem::size_of; +use std::error::Error; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use sdset::Set; -use crate::DocumentId; +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::data::SharedData; +use crate::DocumentId; + use super::into_u8_slice; #[derive(Default, Clone)] @@ -19,21 +22,6 @@ impl DocIds { DocIds(data) } - pub fn from_cursor(cursor: &mut Cursor) -> io::Result { - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let doc_ids = cursor.get_ref().range(offset, len); - cursor.consume(len); - - Ok(DocIds(doc_ids)) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let len = self.0.len() as u64; - bytes.write_u64::(len).unwrap(); - bytes.extend_from_slice(&self.0); - } - pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -52,3 +40,22 @@ impl AsRef> for DocIds { Set::new_unchecked(slice) } } + +impl FromSharedDataCursor for DocIds { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + let len = cursor.read_u64::()? as usize; + let data = cursor.extract(len); + + Ok(DocIds(data)) + } +} + +impl WriteToBytes for DocIds { + fn write_to_bytes(&self, bytes: &mut Vec) { + let len = self.0.len() as u64; + bytes.write_u64::(len).unwrap(); + bytes.extend_from_slice(&self.0); + } +} diff --git a/src/data/doc_indexes.rs b/src/data/doc_indexes.rs index 17d43fd9a..2bb946745 100644 --- a/src/data/doc_indexes.rs +++ b/src/data/doc_indexes.rs @@ -1,14 +1,16 @@ -use std::io::{self, Write, Cursor, BufRead}; +use std::io::{self, Write}; use std::slice::from_raw_parts; use std::mem::size_of; use std::ops::Index; -use std::sync::Arc; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use sdset::Set; -use crate::DocIndex; +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::data::SharedData; +use crate::DocIndex; + use super::into_u8_slice; #[derive(Debug)] @@ -25,38 +27,6 @@ pub struct DocIndexes { } impl DocIndexes { - pub fn from_bytes(bytes: Vec) -> io::Result { - let len = bytes.len(); - let bytes = Arc::from(bytes); - let data = SharedData::new(bytes, 0, len); - let mut cursor = Cursor::new(data); - DocIndexes::from_cursor(&mut cursor) - } - - pub fn from_cursor(cursor: &mut Cursor) -> io::Result { - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let ranges = cursor.get_ref().range(offset, len); - cursor.consume(len); - - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let indexes = cursor.get_ref().range(offset, len); - cursor.consume(len); - - Ok(DocIndexes { ranges, indexes }) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let ranges_len = self.ranges.len() as u64; - let _ = bytes.write_u64::(ranges_len); - bytes.extend_from_slice(&self.ranges); - - let indexes_len = self.indexes.len() as u64; - let _ = bytes.write_u64::(indexes_len); - bytes.extend_from_slice(&self.indexes); - } - pub fn get(&self, index: usize) -> Option<&Set> { self.ranges().get(index).map(|Range { start, end }| { let start = *start as usize; @@ -92,6 +62,32 @@ impl Index for DocIndexes { } } +impl FromSharedDataCursor for DocIndexes { + type Error = io::Error; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + let len = cursor.read_u64::()? as usize; + let ranges = cursor.extract(len); + + let len = cursor.read_u64::()? as usize; + let indexes = cursor.extract(len); + + Ok(DocIndexes { ranges, indexes }) + } +} + +impl WriteToBytes for DocIndexes { + fn write_to_bytes(&self, bytes: &mut Vec) { + let ranges_len = self.ranges.len() as u64; + let _ = bytes.write_u64::(ranges_len); + bytes.extend_from_slice(&self.ranges); + + let indexes_len = self.indexes.len() as u64; + let _ = bytes.write_u64::(indexes_len); + bytes.extend_from_slice(&self.indexes); + } +} + pub struct DocIndexesBuilder { ranges: Vec, indexes: Vec, diff --git a/src/database/index.rs b/src/database/index.rs index 696042d09..44a76d359 100644 --- a/src/database/index.rs +++ b/src/database/index.rs @@ -1,6 +1,4 @@ -use std::io::{Cursor, BufRead}; use std::error::Error; -use std::sync::Arc; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use fst::{map, Map, IntoStreamer, Streamer}; @@ -8,7 +6,9 @@ use fst::raw::Fst; use sdset::duo::{Union, DifferenceByKey}; use sdset::{Set, SetOperation}; -use crate::data::{SharedData, DocIndexes, DocIndexesBuilder}; +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; +use crate::data::{DocIndexes, DocIndexesBuilder}; use crate::{DocumentId, DocIndex}; #[derive(Default)] @@ -18,46 +18,6 @@ pub struct Index { } impl Index { - pub fn from_bytes(bytes: Vec) -> Result> { - let len = bytes.len(); - Index::from_shared_bytes(Arc::from(bytes), 0, len) - } - - pub fn from_shared_bytes( - bytes: Arc>, - offset: usize, - len: usize, - ) -> Result> - { - let data = SharedData::new(bytes, offset, len); - let mut cursor = Cursor::new(data); - - Index::from_cursor(&mut cursor) - } - - pub fn from_cursor(cursor: &mut Cursor) -> Result> { - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let data = cursor.get_ref().range(offset, len); - - let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?; - let map = Map::from(fst); - cursor.consume(len); - - let indexes = DocIndexes::from_cursor(cursor)?; - - Ok(Index { map, indexes}) - } - - pub fn write_to_bytes(&self, bytes: &mut Vec) { - let slice = self.map.as_fst().as_bytes(); - let len = slice.len() as u64; - let _ = bytes.write_u64::(len); - bytes.extend_from_slice(slice); - - self.indexes.write_to_bytes(bytes); - } - pub fn remove_documents(&self, documents: &Set) -> Index { let mut buffer = Vec::new(); let mut builder = IndexBuilder::new(); @@ -116,6 +76,33 @@ impl Index { } } +impl FromSharedDataCursor for Index { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + let len = cursor.read_u64::()? as usize; + let data = cursor.extract(len); + + let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?; + let map = Map::from(fst); + + let indexes = DocIndexes::from_shared_data_cursor(cursor)?; + + Ok(Index { map, indexes}) + } +} + +impl WriteToBytes for Index { + fn write_to_bytes(&self, bytes: &mut Vec) { + let slice = self.map.as_fst().as_bytes(); + let len = slice.len() as u64; + let _ = bytes.write_u64::(len); + bytes.extend_from_slice(slice); + + self.indexes.write_to_bytes(bytes); + } +} + impl<'m, 'a> IntoStreamer<'a> for &'m Index { type Item = (&'a [u8], &'a Set); type Into = Stream<'m>; diff --git a/src/database/mod.rs b/src/database/mod.rs index b92a2af79..c23e5000f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -17,6 +17,9 @@ use lockfree::map::Map; use hashbrown::HashMap; use log::{info, error, warn}; +use crate::shared_data_cursor::FromSharedDataCursor; +use crate::write_to_bytes::WriteToBytes; + pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; pub use self::update::Update; @@ -51,7 +54,7 @@ where D: Deref fn retrieve_data_index(snapshot: &Snapshot) -> Result> where D: Deref { - use self::update::ReadIndexEvent::*; + use self::update::ReadIndexEvent::{self, *}; let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX)); info!("loading index from kv-store took {}", elapsed); @@ -63,7 +66,7 @@ where D: Deref info!("index size is {}B", size); let (elapsed, result) = elapsed::measure_time(|| { - match bincode::deserialize(&bytes)? { + match ReadIndexEvent::from_bytes(bytes.to_vec())? { RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), UpdatedDocuments(index) => Ok(index), } @@ -80,7 +83,7 @@ where D: Deref fn retrieve_data_ranked_map(snapshot: &Snapshot) -> Result> where D: Deref, { - use self::update::ReadRankedMapEvent::*; + use self::update::ReadRankedMapEvent::{self, *}; let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_RANKED_MAP)); info!("loading ranked map from kv-store took {}", elapsed); @@ -92,7 +95,7 @@ where D: Deref, info!("ranked map size is {}B", size); let (elapsed, result) = elapsed::measure_time(|| { - match bincode::deserialize(&bytes)? { + match ReadRankedMapEvent::from_bytes(bytes.to_vec())? { RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), UpdatedDocuments(ranked_map) => Ok(ranked_map), } @@ -102,40 +105,38 @@ where D: Deref, result }, - None => Ok(HashMap::new()), + None => Ok(RankedMap::new()), } } fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - use self::update::ReadIndexEvent::*; + use self::update::ReadIndexEvent::{self, *}; use self::update::WriteIndexEvent; let mut index = Index::default(); for bytes in existing.into_iter().chain(operands) { - match bincode::deserialize(bytes).unwrap() { - RemovedDocuments(d) => index = index.remove_documents(&d), + match ReadIndexEvent::from_bytes(bytes.to_vec()).unwrap() { + RemovedDocuments(d) => index = index.remove_documents(d.as_ref()), UpdatedDocuments(i) => index = index.union(&i), } } - let event = WriteIndexEvent::UpdatedDocuments(&index); - bincode::serialize(&event).unwrap() + WriteIndexEvent::UpdatedDocuments(&index).into_bytes() } fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - use self::update::ReadRankedMapEvent::*; + use self::update::ReadRankedMapEvent::{self, *}; use self::update::WriteRankedMapEvent; let mut ranked_map = RankedMap::default(); for bytes in existing.into_iter().chain(operands) { - match bincode::deserialize(bytes).unwrap() { - RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.contains(k)), + match ReadRankedMapEvent::from_bytes(bytes.to_vec()).unwrap() { + RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.as_ref().contains(k)), UpdatedDocuments(i) => ranked_map.extend(i), } } - let event = WriteRankedMapEvent::UpdatedDocuments(&ranked_map); - bincode::serialize(&event).unwrap() + WriteRankedMapEvent::UpdatedDocuments(&ranked_map).into_bytes() } fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { diff --git a/src/database/update/index_event.rs b/src/database/update/index_event.rs index fb8af1bd5..74c6242e7 100644 --- a/src/database/update/index_event.rs +++ b/src/database/update/index_event.rs @@ -1,17 +1,45 @@ -use sdset::{Set, SetBuf}; -use serde_derive::{Serialize, Deserialize}; +use std::error::Error; +use byteorder::{ReadBytesExt, WriteBytesExt}; + +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::database::Index; -use crate::DocumentId; +use crate::data::DocIds; -#[derive(Serialize)] pub enum WriteIndexEvent<'a> { - RemovedDocuments(&'a Set), + RemovedDocuments(&'a DocIds), UpdatedDocuments(&'a Index), } -#[derive(Deserialize)] +impl<'a> WriteToBytes for WriteIndexEvent<'a> { + fn write_to_bytes(&self, bytes: &mut Vec) { + match self { + WriteIndexEvent::RemovedDocuments(doc_ids) => { + let _ = bytes.write_u8(0); + doc_ids.write_to_bytes(bytes); + }, + WriteIndexEvent::UpdatedDocuments(index) => { + let _ = bytes.write_u8(1); + index.write_to_bytes(bytes); + } + } + } +} + pub enum ReadIndexEvent { - RemovedDocuments(SetBuf), + RemovedDocuments(DocIds), UpdatedDocuments(Index), } + +impl FromSharedDataCursor for ReadIndexEvent { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + match cursor.read_u8()? { + 0 => DocIds::from_shared_data_cursor(cursor).map(ReadIndexEvent::RemovedDocuments), + 1 => Index::from_shared_data_cursor(cursor).map(ReadIndexEvent::UpdatedDocuments), + _ => unreachable!(), + } + } +} diff --git a/src/database/update/mod.rs b/src/database/update/mod.rs index ba32dc718..548fb8bc2 100644 --- a/src/database/update/mod.rs +++ b/src/database/update/mod.rs @@ -10,11 +10,13 @@ use crate::database::document_key::{DocumentKey, DocumentKeyAttr}; use crate::database::serde::serializer::Serializer; use crate::database::serde::SerializerError; use crate::database::schema::SchemaAttr; -use crate::tokenizer::TokenizerBuilder; use crate::database::schema::Schema; use crate::database::index::IndexBuilder; use crate::database::{DATA_INDEX, DATA_RANKED_MAP}; use crate::database::{RankedMap, Number}; +use crate::tokenizer::TokenizerBuilder; +use crate::write_to_bytes::WriteToBytes; +use crate::data::DocIds; use crate::{DocumentId, DocIndex}; pub use self::index_event::{ReadIndexEvent, WriteIndexEvent}; @@ -119,7 +121,8 @@ impl RawUpdateBuilder { } document_ids.sort_unstable(); - SetBuf::new_unchecked(document_ids) + let setbuf = SetBuf::new_unchecked(document_ids); + DocIds::new(&setbuf) }; // create the Index of all the document updates @@ -140,28 +143,24 @@ impl RawUpdateBuilder { if !removed_documents.is_empty() { // remove the documents using the appropriate IndexEvent - let event = WriteIndexEvent::RemovedDocuments(&removed_documents); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteIndexEvent::RemovedDocuments(&removed_documents).into_bytes(); self.batch.merge(DATA_INDEX, &event_bytes)?; } // update the documents using the appropriate IndexEvent - let event = WriteIndexEvent::UpdatedDocuments(&index); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteIndexEvent::UpdatedDocuments(&index).into_bytes(); self.batch.merge(DATA_INDEX, &event_bytes)?; // === ranked map === if !removed_documents.is_empty() { // update the ranked map using the appropriate RankedMapEvent - let event = WriteRankedMapEvent::RemovedDocuments(&removed_documents); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteRankedMapEvent::RemovedDocuments(&removed_documents).into_bytes(); self.batch.merge(DATA_RANKED_MAP, &event_bytes)?; } // update the documents using the appropriate IndexEvent - let event = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields); - let event_bytes = bincode::serialize(&event).unwrap(); + let event_bytes = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields).into_bytes(); self.batch.merge(DATA_RANKED_MAP, &event_bytes)?; Ok(self.batch) diff --git a/src/database/update/ranked_map_event.rs b/src/database/update/ranked_map_event.rs index 16bbed71b..2fba12f04 100644 --- a/src/database/update/ranked_map_event.rs +++ b/src/database/update/ranked_map_event.rs @@ -1,17 +1,48 @@ -use sdset::{Set, SetBuf}; -use serde_derive::{Serialize, Deserialize}; +use std::error::Error; +use byteorder::{ReadBytesExt, WriteBytesExt}; + +use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor}; +use crate::write_to_bytes::WriteToBytes; use crate::database::RankedMap; -use crate::DocumentId; +use crate::data::DocIds; -#[derive(Serialize)] pub enum WriteRankedMapEvent<'a> { - RemovedDocuments(&'a Set), + RemovedDocuments(&'a DocIds), UpdatedDocuments(&'a RankedMap), } -#[derive(Deserialize)] +impl<'a> WriteToBytes for WriteRankedMapEvent<'a> { + fn write_to_bytes(&self, bytes: &mut Vec) { + match self { + WriteRankedMapEvent::RemovedDocuments(doc_ids) => { + let _ = bytes.write_u8(0); + doc_ids.write_to_bytes(bytes); + }, + WriteRankedMapEvent::UpdatedDocuments(ranked_map) => { + let _ = bytes.write_u8(1); + bincode::serialize_into(bytes, ranked_map).unwrap() + } + } + } +} + pub enum ReadRankedMapEvent { - RemovedDocuments(SetBuf), + RemovedDocuments(DocIds), UpdatedDocuments(RankedMap), } + +impl FromSharedDataCursor for ReadRankedMapEvent { + type Error = Box; + + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result { + match cursor.read_u8()? { + 0 => DocIds::from_shared_data_cursor(cursor).map(ReadRankedMapEvent::RemovedDocuments), + 1 => { + let ranked_map = bincode::deserialize_from(cursor)?; + Ok(ReadRankedMapEvent::UpdatedDocuments(ranked_map)) + }, + _ => unreachable!(), + } + } +} diff --git a/src/shared_data_cursor.rs b/src/shared_data_cursor.rs index 785d1b2cf..00d36884a 100644 --- a/src/shared_data_cursor.rs +++ b/src/shared_data_cursor.rs @@ -45,7 +45,12 @@ impl BufRead for SharedDataCursor { } pub trait FromSharedDataCursor: Sized { - type Err; + type Error; - fn from_shared_data_cursor(data: &mut SharedDataCursor) -> Result; + fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result; + + fn from_bytes(bytes: Vec) -> Result { + let mut cursor = SharedDataCursor::from_bytes(bytes); + Self::from_shared_data_cursor(&mut cursor) + } } From 6393b0cbc0616c7bcc402d0185fa5a0a96c49d57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 16 Feb 2019 16:52:56 +0100 Subject: [PATCH 6/9] feat: Prefer binary to exponential search --- src/database/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/mod.rs b/src/database/mod.rs index c23e5000f..7577254fc 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -131,7 +131,7 @@ fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> V let mut ranked_map = RankedMap::default(); for bytes in existing.into_iter().chain(operands) { match ReadRankedMapEvent::from_bytes(bytes.to_vec()).unwrap() { - RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.as_ref().contains(k)), + RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.as_ref().binary_search(k).is_ok()), UpdatedDocuments(i) => ranked_map.extend(i), } } From bddb37e44fc5f4046280e8eadcfac4ff4a2ab131 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 16 Feb 2019 18:56:26 +0100 Subject: [PATCH 7/9] feat: Move SharedData to its own module --- src/data/mod.rs | 44 ++----------------------------------- src/data/shared_data.rs | 48 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 42 deletions(-) create mode 100644 src/data/shared_data.rs diff --git a/src/data/mod.rs b/src/data/mod.rs index 328d66604..f076b08e9 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,5 +1,6 @@ mod doc_ids; mod doc_indexes; +mod shared_data; use std::slice::from_raw_parts; use std::mem::size_of; @@ -8,48 +9,7 @@ use std::sync::Arc; pub use self::doc_ids::DocIds; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder}; - -#[derive(Clone, Default)] -pub struct SharedData { - pub bytes: Arc>, - pub offset: usize, - pub len: usize, -} - -impl SharedData { - pub fn from_bytes(vec: Vec) -> SharedData { - let len = vec.len(); - let bytes = Arc::from(vec); - SharedData::new(bytes, 0, len) - } - - pub fn new(bytes: Arc>, offset: usize, len: usize) -> SharedData { - SharedData { bytes, offset, len } - } - - pub fn range(&self, offset: usize, len: usize) -> SharedData { - assert!(offset + len <= self.len); - SharedData { - bytes: self.bytes.clone(), - offset: self.offset + offset, - len: len, - } - } -} - -impl Deref for SharedData { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.as_ref() - } -} - -impl AsRef<[u8]> for SharedData { - fn as_ref(&self) -> &[u8] { - &self.bytes[self.offset..self.offset + self.len] - } -} +pub use self::shared_data::SharedData; unsafe fn into_u8_slice(slice: &[T]) -> &[u8] { let ptr = slice.as_ptr() as *const u8; diff --git a/src/data/shared_data.rs b/src/data/shared_data.rs new file mode 100644 index 000000000..100f837f7 --- /dev/null +++ b/src/data/shared_data.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; +use std::ops::Deref; + +#[derive(Default, Clone)] +pub struct SharedData { + pub bytes: Arc>, + pub offset: usize, + pub len: usize, +} + +impl SharedData { + pub fn from_bytes(vec: Vec) -> SharedData { + let len = vec.len(); + let bytes = Arc::from(vec); + SharedData::new(bytes, 0, len) + } + + pub fn new(bytes: Arc>, offset: usize, len: usize) -> SharedData { + SharedData { bytes, offset, len } + } + + pub fn as_slice(&self) -> &[u8] { + &self.bytes[self.offset..self.offset + self.len] + } + + pub fn range(&self, offset: usize, len: usize) -> SharedData { + assert!(offset + len <= self.len); + SharedData { + bytes: self.bytes.clone(), + offset: self.offset + offset, + len: len, + } + } +} + +impl Deref for SharedData { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl AsRef<[u8]> for SharedData { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} From 264fffa8260a45ff36ffd57f4e4ff73e81d993f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 16 Feb 2019 20:44:16 +0100 Subject: [PATCH 8/9] feat: Replace the elapsed dependency by std::time::Instant --- Cargo.toml | 1 - examples/create-database.rs | 8 ++--- examples/query-database.rs | 17 ++++++----- src/data/mod.rs | 2 -- src/database/mod.rs | 59 +++++++++++++++++++------------------ src/rank/query_builder.rs | 26 ++++++++-------- 6 files changed, 56 insertions(+), 57 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 45413edd0..98e7bab24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ authors = ["Kerollmops "] arc-swap = "0.3" bincode = "1.0" byteorder = "1.2" -elapsed = "0.1" fst = "0.3" hashbrown = { version = "0.1", features = ["serde"] } lazy_static = "1.1" diff --git a/examples/create-database.rs b/examples/create-database.rs index ee5324919..37e252e1a 100644 --- a/examples/create-database.rs +++ b/examples/create-database.rs @@ -4,6 +4,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; use std::collections::{HashMap, HashSet}; use std::io::{self, BufRead, BufReader}; use std::path::{Path, PathBuf}; +use std::time::Instant; use std::error::Error; use std::borrow::Cow; use std::fs::File; @@ -124,14 +125,13 @@ fn main() -> Result<(), Box> { None => HashSet::new(), }; - let (elapsed, result) = elapsed::measure_time(|| { - index(schema, &opt.database_path, &opt.csv_data_path, opt.update_group_size, &stop_words) - }); + let start = Instant::now(); + let result = index(schema, &opt.database_path, &opt.csv_data_path, opt.update_group_size, &stop_words); if let Err(e) = result { return Err(e.into()) } - println!("database created in {} at: {:?}", elapsed, opt.database_path); + println!("database created in {:.2?} at: {:?}", start.elapsed(), opt.database_path); Ok(()) } diff --git a/examples/query-database.rs b/examples/query-database.rs index 72e990960..f21b038dc 100644 --- a/examples/query-database.rs +++ b/examples/query-database.rs @@ -4,6 +4,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; use std::collections::btree_map::{BTreeMap, Entry}; use std::iter::FromIterator; use std::io::{self, Write}; +use std::time::Instant; use std::path::PathBuf; use std::error::Error; @@ -102,9 +103,9 @@ fn main() -> Result<(), Box> { let _ = env_logger::init(); let opt = Opt::from_args(); - let (elapsed, result) = elapsed::measure_time(|| Database::open(&opt.database_path)); - let database = result?; - println!("database prepared for you in {}", elapsed); + let start = Instant::now(); + let database = Database::open(&opt.database_path)?; + println!("database prepared for you in {:.2?}", start.elapsed()); let mut buffer = String::new(); let input = io::stdin(); @@ -119,10 +120,10 @@ fn main() -> Result<(), Box> { let view = database.view("default")?; let schema = view.schema(); - let (elapsed, documents) = elapsed::measure_time(|| { - let builder = view.query_builder().unwrap(); - builder.query(query, 0..opt.number_results) - }); + let start = Instant::now(); + + let builder = view.query_builder().unwrap(); + let documents = builder.query(query, 0..opt.number_results); let number_of_documents = documents.len(); for doc in documents { @@ -160,7 +161,7 @@ fn main() -> Result<(), Box> { println!(); } - eprintln!("===== Found {} results in {} =====", number_of_documents, elapsed); + eprintln!("===== Found {} results in {:.2?} =====", number_of_documents, start.elapsed()); buffer.clear(); } diff --git a/src/data/mod.rs b/src/data/mod.rs index f076b08e9..895f553a6 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -4,8 +4,6 @@ mod shared_data; use std::slice::from_raw_parts; use std::mem::size_of; -use std::ops::Deref; -use std::sync::Arc; pub use self::doc_ids::DocIds; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder}; diff --git a/src/database/mod.rs b/src/database/mod.rs index 7577254fc..8bca09f2e 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,8 +1,7 @@ -use crate::DocumentId; -use crate::database::schema::SchemaAttr; -use std::sync::Arc; +use std::time::Instant; use std::error::Error; use std::ffi::OsStr; +use std::sync::Arc; use std::fs; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -17,8 +16,10 @@ use lockfree::map::Map; use hashbrown::HashMap; use log::{info, error, warn}; +use crate::database::schema::SchemaAttr; use crate::shared_data_cursor::FromSharedDataCursor; use crate::write_to_bytes::WriteToBytes; +use crate::DocumentId; pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; @@ -56,25 +57,25 @@ where D: Deref { use self::update::ReadIndexEvent::{self, *}; - let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX)); - info!("loading index from kv-store took {}", elapsed); + let start = Instant::now(); + let vector = snapshot.get(DATA_INDEX)?; + info!("loading index from kv-store took {:.2?}", start.elapsed()); - match vector? { + match vector { Some(vector) => { + let start = Instant::now(); + let bytes = vector.as_ref().to_vec(); - let size = SizeFormatterBinary::new(bytes.len() as u64); - info!("index size is {}B", size); + info!("index size is {}B", SizeFormatterBinary::new(bytes.len() as u64)); - let (elapsed, result) = elapsed::measure_time(|| { - match ReadIndexEvent::from_bytes(bytes.to_vec())? { - RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), - UpdatedDocuments(index) => Ok(index), - } - }); + let index = match ReadIndexEvent::from_bytes(bytes)? { + RemovedDocuments(_) => panic!("BUG: RemovedDocument event retrieved"), + UpdatedDocuments(index) => index, + }; - info!("loading index from bytes took {}", elapsed); + info!("loading index from bytes took {:.2?}", start.elapsed()); - result + Ok(index) }, None => Ok(Index::default()), } @@ -85,25 +86,25 @@ where D: Deref, { use self::update::ReadRankedMapEvent::{self, *}; - let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_RANKED_MAP)); - info!("loading ranked map from kv-store took {}", elapsed); + let start = Instant::now(); + let vector = snapshot.get(DATA_RANKED_MAP)?; + info!("loading ranked map from kv-store took {:.2?}", start.elapsed()); - match vector? { + match vector { Some(vector) => { + let start = Instant::now(); + let bytes = vector.as_ref().to_vec(); - let size = SizeFormatterBinary::new(bytes.len() as u64); - info!("ranked map size is {}B", size); + info!("ranked map size is {}B", SizeFormatterBinary::new(bytes.len() as u64)); - let (elapsed, result) = elapsed::measure_time(|| { - match ReadRankedMapEvent::from_bytes(bytes.to_vec())? { - RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"), - UpdatedDocuments(ranked_map) => Ok(ranked_map), - } - }); + let ranked_map = match ReadRankedMapEvent::from_bytes(bytes)? { + RemovedDocuments(_) => panic!("BUG: RemovedDocument event retrieved"), + UpdatedDocuments(ranked_map) => ranked_map, + }; - info!("loading ranked map from bytes took {}", elapsed); + info!("loading ranked map from bytes took {:.2?}", start.elapsed()); - result + Ok(ranked_map) }, None => Ok(RankedMap::new()), } diff --git a/src/rank/query_builder.rs b/src/rank/query_builder.rs index 7bdaa109d..2a8414978 100644 --- a/src/rank/query_builder.rs +++ b/src/rank/query_builder.rs @@ -1,12 +1,12 @@ use std::{cmp, mem, vec, str, char}; use std::ops::{Deref, Range}; +use std::time::Instant; use std::error::Error; use std::hash::Hash; use std::rc::Rc; use rayon::slice::ParallelSliceMut; use slice_group_by::GroupByMut; -use elapsed::measure_time; use hashbrown::HashMap; use fst::Streamer; use rocksdb::DB; @@ -143,8 +143,9 @@ where D: Deref, return builder.query(query, range); } - let (elapsed, mut documents) = measure_time(|| self.query_all(query)); - info!("query_all took {}", elapsed); + let start = Instant::now(); + let mut documents = self.query_all(query); + info!("query_all took {:.2?}", start.elapsed()); let mut groups = vec![documents.as_mut_slice()]; @@ -163,10 +164,9 @@ where D: Deref, continue; } - let (elapsed, _) = measure_time(|| { - group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b)); - }); - info!("criterion {} sort took {}", ci, elapsed); + let start = Instant::now(); + group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b)); + info!("criterion {} sort took {:.2?}", ci, start.elapsed()); for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) { documents_seen += group.len(); @@ -214,8 +214,9 @@ where D: Deref, K: Hash + Eq, { pub fn query(self, query: &str, range: Range) -> Vec { - let (elapsed, mut documents) = measure_time(|| self.inner.query_all(query)); - info!("query_all took {}", elapsed); + let start = Instant::now(); + let mut documents = self.inner.query_all(query); + info!("query_all took {:.2?}", start.elapsed()); let mut groups = vec![documents.as_mut_slice()]; let mut key_cache = HashMap::new(); @@ -244,10 +245,9 @@ where D: Deref, continue; } - let (elapsed, _) = measure_time(|| { - group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b)); - }); - info!("criterion {} sort took {}", ci, elapsed); + let start = Instant::now(); + group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b)); + info!("criterion {} sort took {:.2?}", ci, start.elapsed()); for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) { // we must compute the real distinguished len of this sub-group From a0c4ec0be0ff019cbefb49ef4e2c387ecc0b356e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 18 Feb 2019 18:01:40 +0100 Subject: [PATCH 9/9] feat: Introduce the updated_documents methods --- src/database/mod.rs | 18 ++++++------------ src/database/update/index_event.rs | 10 ++++++++++ src/database/update/ranked_map_event.rs | 10 ++++++++++ 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/database/mod.rs b/src/database/mod.rs index 8bca09f2e..5d526dcd7 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -21,6 +21,8 @@ use crate::shared_data_cursor::FromSharedDataCursor; use crate::write_to_bytes::WriteToBytes; use crate::DocumentId; +use self::update::{ReadIndexEvent, ReadRankedMapEvent}; + pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; pub use self::update::Update; @@ -55,8 +57,6 @@ where D: Deref fn retrieve_data_index(snapshot: &Snapshot) -> Result> where D: Deref { - use self::update::ReadIndexEvent::{self, *}; - let start = Instant::now(); let vector = snapshot.get(DATA_INDEX)?; info!("loading index from kv-store took {:.2?}", start.elapsed()); @@ -68,10 +68,8 @@ where D: Deref let bytes = vector.as_ref().to_vec(); info!("index size is {}B", SizeFormatterBinary::new(bytes.len() as u64)); - let index = match ReadIndexEvent::from_bytes(bytes)? { - RemovedDocuments(_) => panic!("BUG: RemovedDocument event retrieved"), - UpdatedDocuments(index) => index, - }; + let event = ReadIndexEvent::from_bytes(bytes)?; + let index = event.updated_documents().expect("BUG: invalid event deserialized"); info!("loading index from bytes took {:.2?}", start.elapsed()); @@ -84,8 +82,6 @@ where D: Deref fn retrieve_data_ranked_map(snapshot: &Snapshot) -> Result> where D: Deref, { - use self::update::ReadRankedMapEvent::{self, *}; - let start = Instant::now(); let vector = snapshot.get(DATA_RANKED_MAP)?; info!("loading ranked map from kv-store took {:.2?}", start.elapsed()); @@ -97,10 +93,8 @@ where D: Deref, let bytes = vector.as_ref().to_vec(); info!("ranked map size is {}B", SizeFormatterBinary::new(bytes.len() as u64)); - let ranked_map = match ReadRankedMapEvent::from_bytes(bytes)? { - RemovedDocuments(_) => panic!("BUG: RemovedDocument event retrieved"), - UpdatedDocuments(ranked_map) => ranked_map, - }; + let event = ReadRankedMapEvent::from_bytes(bytes)?; + let ranked_map = event.updated_documents().expect("BUG: invalid event deserialized"); info!("loading ranked map from bytes took {:.2?}", start.elapsed()); diff --git a/src/database/update/index_event.rs b/src/database/update/index_event.rs index 74c6242e7..cd006aa3c 100644 --- a/src/database/update/index_event.rs +++ b/src/database/update/index_event.rs @@ -32,6 +32,16 @@ pub enum ReadIndexEvent { UpdatedDocuments(Index), } +impl ReadIndexEvent { + pub fn updated_documents(self) -> Option { + use ReadIndexEvent::*; + match self { + RemovedDocuments(_) => None, + UpdatedDocuments(index) => Some(index), + } + } +} + impl FromSharedDataCursor for ReadIndexEvent { type Error = Box; diff --git a/src/database/update/ranked_map_event.rs b/src/database/update/ranked_map_event.rs index 2fba12f04..5a51f8799 100644 --- a/src/database/update/ranked_map_event.rs +++ b/src/database/update/ranked_map_event.rs @@ -32,6 +32,16 @@ pub enum ReadRankedMapEvent { UpdatedDocuments(RankedMap), } +impl ReadRankedMapEvent { + pub fn updated_documents(self) -> Option { + use ReadRankedMapEvent::*; + match self { + RemovedDocuments(_) => None, + UpdatedDocuments(ranked_map) => Some(ranked_map), + } + } +} + impl FromSharedDataCursor for ReadRankedMapEvent { type Error = Box;