From 95dc6fe904268580af343e0765c66d5b9a47a763 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 2 Jan 2019 15:07:46 +0100 Subject: [PATCH 1/2] feat: Rework the UpdateBuilder struct --- src/database/document_key.rs | 8 ++ src/database/schema.rs | 12 ++ src/database/serde/indexer_serializer.rs | 10 +- src/database/serde/serializer.rs | 29 ++--- src/database/update/builder.rs | 5 +- src/database/update/mod.rs | 2 +- src/database/update/raw_builder.rs | 149 ++++++++++++++++------- src/rank/query_builder.rs | 2 +- 8 files changed, 150 insertions(+), 67 deletions(-) diff --git a/src/database/document_key.rs b/src/database/document_key.rs index b0a952a97..f56bda1dd 100644 --- a/src/database/document_key.rs +++ b/src/database/document_key.rs @@ -78,6 +78,14 @@ impl DocumentKeyAttr { DocumentKeyAttr(buffer) } + pub fn with_attribute_min(id: DocumentId) -> DocumentKeyAttr { + DocumentKeyAttr::new(id, SchemaAttr::min()) + } + + pub fn with_attribute_max(id: DocumentId) -> DocumentKeyAttr { + DocumentKeyAttr::new(id, SchemaAttr::max()) + } + pub fn from_bytes(mut bytes: &[u8]) -> DocumentKeyAttr { assert!(bytes.len() >= DOC_KEY_ATTR_LEN); assert_eq!(&bytes[..4], b"doc-"); diff --git a/src/database/schema.rs b/src/database/schema.rs index 6e3deabf5..0363f47a3 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -176,6 +176,18 @@ impl SchemaAttr { SchemaAttr(value) } + pub fn min() -> SchemaAttr { + SchemaAttr(0) + } + + pub fn next(self) -> Option { + self.0.checked_add(1).map(SchemaAttr) + } + + pub fn prev(self) -> Option { + self.0.checked_sub(1).map(SchemaAttr) + } + pub fn max() -> SchemaAttr { SchemaAttr(u16::MAX) } diff --git a/src/database/serde/indexer_serializer.rs b/src/database/serde/indexer_serializer.rs index 0bfb6e44a..1cea2da67 100644 --- a/src/database/serde/indexer_serializer.rs +++ b/src/database/serde/indexer_serializer.rs @@ -1,6 +1,6 @@ -use crate::database::update::RawUpdateBuilder; -use crate::database::schema::SchemaAttr; +use crate::database::update::DocumentUpdate; use crate::database::serde::SerializerError; +use crate::database::schema::SchemaAttr; use crate::tokenizer::TokenizerBuilder; use crate::tokenizer::Token; use crate::{DocumentId, DocIndex, Attribute, WordArea}; @@ -10,7 +10,7 @@ use serde::ser; pub struct IndexerSerializer<'a, B> { pub tokenizer_builder: &'a B, - pub builder: &'a mut RawUpdateBuilder, + pub update: &'a mut DocumentUpdate, pub document_id: DocumentId, pub attribute: SchemaAttr, } @@ -72,10 +72,10 @@ where B: TokenizerBuilder // and the unidecoded lowercased version let word_unidecoded = unidecode::unidecode(word).to_lowercase(); if word_lower != word_unidecoded { - self.builder.insert_doc_index(word_unidecoded.into_bytes(), doc_index); + self.update.insert_doc_index(word_unidecoded.into_bytes(), doc_index); } - self.builder.insert_doc_index(word_lower.into_bytes(), doc_index); + self.update.insert_doc_index(word_lower.into_bytes(), doc_index); } Ok(()) } diff --git a/src/database/serde/serializer.rs b/src/database/serde/serializer.rs index 48c58fd0d..074aba23c 100644 --- a/src/database/serde/serializer.rs +++ b/src/database/serde/serializer.rs @@ -3,8 +3,7 @@ use serde::ser; use crate::database::serde::indexer_serializer::IndexerSerializer; use crate::database::serde::key_to_string::KeyToStringSerializer; -use crate::database::document_key::DocumentKeyAttr; -use crate::database::update::RawUpdateBuilder; +use crate::database::update::DocumentUpdate; use crate::database::serde::SerializerError; use crate::tokenizer::TokenizerBuilder; use crate::database::schema::Schema; @@ -12,9 +11,9 @@ use crate::DocumentId; pub struct Serializer<'a, B> { pub schema: &'a Schema, + pub update: &'a mut DocumentUpdate, pub document_id: DocumentId, pub tokenizer_builder: &'a B, - pub builder: &'a mut RawUpdateBuilder, } impl<'a, B> ser::Serializer for Serializer<'a, B> @@ -137,10 +136,10 @@ where B: TokenizerBuilder fn serialize_map(self, _len: Option) -> Result { Ok(MapSerializer { schema: self.schema, - tokenizer_builder: self.tokenizer_builder, document_id: self.document_id, + update: self.update, + tokenizer_builder: self.tokenizer_builder, current_key_name: None, - builder: self.builder, }) } @@ -152,9 +151,9 @@ where B: TokenizerBuilder { Ok(StructSerializer { schema: self.schema, - tokenizer_builder: self.tokenizer_builder, + update: self.update, document_id: self.document_id, - builder: self.builder, + tokenizer_builder: self.tokenizer_builder, }) } @@ -172,10 +171,10 @@ where B: TokenizerBuilder pub struct MapSerializer<'a, B> { pub schema: &'a Schema, - pub tokenizer_builder: &'a B, pub document_id: DocumentId, + pub update: &'a mut DocumentUpdate, + pub tokenizer_builder: &'a B, pub current_key_name: Option, - pub builder: &'a mut RawUpdateBuilder, } impl<'a, B> ser::SerializeMap for MapSerializer<'a, B> @@ -212,12 +211,11 @@ where B: TokenizerBuilder let props = self.schema.props(attr); if props.is_stored() { let value = bincode::serialize(value).unwrap(); - let key = DocumentKeyAttr::new(self.document_id, attr); - self.builder.insert_attribute_value(key, value); + self.update.insert_attribute_value(attr, value); } if props.is_indexed() { let serializer = IndexerSerializer { - builder: self.builder, + update: self.update, tokenizer_builder: self.tokenizer_builder, document_id: self.document_id, attribute: attr, @@ -237,8 +235,8 @@ where B: TokenizerBuilder pub struct StructSerializer<'a, B> { pub schema: &'a Schema, pub document_id: DocumentId, + pub update: &'a mut DocumentUpdate, pub tokenizer_builder: &'a B, - pub builder: &'a mut RawUpdateBuilder, } impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B> @@ -258,12 +256,11 @@ where B: TokenizerBuilder let props = self.schema.props(attr); if props.is_stored() { let value = bincode::serialize(value).unwrap(); - let key = DocumentKeyAttr::new(self.document_id, attr); - self.builder.insert_attribute_value(key, value); + self.update.insert_attribute_value(attr, value); } if props.is_indexed() { let serializer = IndexerSerializer { - builder: self.builder, + update: self.update, tokenizer_builder: self.tokenizer_builder, document_id: self.document_id, attribute: attr, diff --git a/src/database/update/builder.rs b/src/database/update/builder.rs index 344eb84e4..16a805a67 100644 --- a/src/database/update/builder.rs +++ b/src/database/update/builder.rs @@ -33,12 +33,13 @@ impl UpdateBuilder { B: TokenizerBuilder, { let document_id = self.schema.document_id(&document)?; + let update = self.raw_builder.document_update(document_id); let serializer = Serializer { schema: &self.schema, document_id: document_id, tokenizer_builder: tokenizer_builder, - builder: &mut self.raw_builder, + update: update, }; document.serialize(serializer)?; @@ -50,7 +51,7 @@ impl UpdateBuilder { where T: Serialize, { let document_id = self.schema.document_id(&document)?; - self.raw_builder.remove_document(document_id); + self.raw_builder.document_update(document_id).remove(); Ok(document_id) } diff --git a/src/database/update/mod.rs b/src/database/update/mod.rs index 3e3eb8cca..721fc549f 100644 --- a/src/database/update/mod.rs +++ b/src/database/update/mod.rs @@ -4,7 +4,7 @@ mod builder; mod raw_builder; pub use self::builder::UpdateBuilder; -pub use self::raw_builder::RawUpdateBuilder; +pub use self::raw_builder::{RawUpdateBuilder, DocumentUpdate}; pub struct Update { sst_file: PathBuf, diff --git a/src/database/update/raw_builder.rs b/src/database/update/raw_builder.rs index e7e65a5fc..d116e05fe 100644 --- a/src/database/update/raw_builder.rs +++ b/src/database/update/raw_builder.rs @@ -1,13 +1,15 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::path::PathBuf; use std::error::Error; use rocksdb::rocksdb_options; +use hashbrown::HashMap; use fst::map::Map; use sdset::Set; use crate::database::index::{Index, Positive, PositiveBuilder, Negative}; use crate::database::{DATA_INDEX, DocumentKeyAttr}; +use crate::database::schema::SchemaAttr; use crate::data::{DocIds, DocIndexes}; use crate::{DocumentId, DocIndex}; use super::Update; @@ -17,60 +19,86 @@ type Value = Vec; pub struct RawUpdateBuilder { sst_file: PathBuf, - removed_documents: BTreeSet, - words_indexes: BTreeMap>, - keys_values: BTreeMap, + document_updates: BTreeMap, +} + +pub struct DocumentUpdate { + cleared: bool, + words_indexes: HashMap>, + attributes: BTreeMap, +} + +impl DocumentUpdate { + pub fn new() -> DocumentUpdate { + DocumentUpdate { + cleared: false, + words_indexes: HashMap::new(), + attributes: BTreeMap::new(), + } + } + + pub fn remove(&mut self) { + self.cleared = true; + self.words_indexes.clear(); + self.attributes.clear(); + } + + pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: Vec) { + self.attributes.insert(attr, value); + } + + pub fn insert_doc_index(&mut self, token: Vec, doc_index: DocIndex) { + self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index) + } } impl RawUpdateBuilder { pub fn new(path: PathBuf) -> RawUpdateBuilder { RawUpdateBuilder { sst_file: path, - removed_documents: BTreeSet::new(), - words_indexes: BTreeMap::new(), - keys_values: BTreeMap::new(), + document_updates: BTreeMap::new(), } } - pub fn insert_doc_index(&mut self, token: Vec, doc_index: DocIndex) { - self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index) + pub fn document_update(&mut self, document_id: DocumentId) -> &mut DocumentUpdate { + self.document_updates.entry(document_id).or_insert_with(DocumentUpdate::new) } - pub fn insert_attribute_value(&mut self, key_attr: DocumentKeyAttr, value: Vec) -> Option> { - self.keys_values.insert(key_attr, value) - } + pub fn build(mut self) -> Result> { + let mut removed_document_ids = Vec::new(); + let mut words_indexes = BTreeMap::new(); - pub fn remove_document(&mut self, id: DocumentId) { - self.removed_documents.insert(id); - } + for (&id, update) in self.document_updates.iter_mut() { + if update.cleared { removed_document_ids.push(id) } - pub fn build(self) -> Result> { - let tree = { - let negative = { - let documents_ids: Vec<_> = self.removed_documents.into_iter().collect(); - let documents_ids = Set::new_unchecked(&documents_ids); - let doc_ids = DocIds::new(documents_ids); - Negative::new(doc_ids) - }; + for (token, indexes) in &update.words_indexes { + words_indexes.entry(token).or_insert_with(Vec::new).extend_from_slice(indexes) + } + } - let positive = { - let mut builder = PositiveBuilder::memory(); - - for (key, mut indexes) in self.words_indexes { - indexes.sort_unstable(); - let indexes = Set::new_unchecked(&indexes); - builder.insert(key, indexes)?; - } - - let (map, indexes) = builder.into_inner()?; - let map = Map::from_bytes(map)?; - let indexes = DocIndexes::from_bytes(indexes)?; - Positive::new(map, indexes) - }; - - Index { negative, positive } + let negative = { + let removed_document_ids = Set::new_unchecked(&removed_document_ids); + let doc_ids = DocIds::new(removed_document_ids); + Negative::new(doc_ids) }; + let positive = { + let mut positive_builder = PositiveBuilder::memory(); + + for (key, mut indexes) in words_indexes { + indexes.sort_unstable(); + let indexes = Set::new_unchecked(&indexes); + positive_builder.insert(key, indexes)?; + } + + let (map, indexes) = positive_builder.into_inner()?; + let map = Map::from_bytes(map)?; + let indexes = DocIndexes::from_bytes(indexes)?; + Positive::new(map, indexes) + }; + + let index = Index { negative, positive }; + let env_options = rocksdb_options::EnvOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); @@ -78,12 +106,49 @@ impl RawUpdateBuilder { // write the data-index let mut bytes = Vec::new(); - tree.write_to_bytes(&mut bytes); + index.write_to_bytes(&mut bytes); file_writer.merge(DATA_INDEX, &bytes)?; // write all the documents attributes updates - for (key, value) in self.keys_values { - file_writer.put(key.as_ref(), &value)?; + for (id, update) in self.document_updates { + + let mut last_attr: Option = None; + for (attr, value) in update.attributes { + + if update.cleared { + // if there is no last attribute, remove from the first attribute + let start_attr = match last_attr { + Some(attr) => attr.next(), + None => Some(SchemaAttr::min()) + }; + let start = start_attr.map(|a| DocumentKeyAttr::new(id, a)); + let end = attr.prev().map(|a| DocumentKeyAttr::new(id, a)); + + // delete_range between (last_attr + 1) and (attr - 1) + if let (Some(start), Some(end)) = (start, end) { + file_writer.delete_range(start.as_ref(), end.as_ref())?; + } + } + + let key = DocumentKeyAttr::new(id, attr); + file_writer.put(key.as_ref(), &value)?; + last_attr = Some(attr); + } + + if update.cleared { + // if there is no last attribute, remove from the first attribute + let start_attr = match last_attr { + Some(attr) => attr.next(), + None => Some(SchemaAttr::min()) + }; + let start = start_attr.map(|a| DocumentKeyAttr::new(id, a)); + let end = DocumentKeyAttr::with_attribute_max(id); + + // delete_range between (last_attr + 1) and attr_max + if let Some(start) = start { + file_writer.delete_range(start.as_ref(), end.as_ref())?; + } + } } file_writer.finish()?; diff --git a/src/rank/query_builder.rs b/src/rank/query_builder.rs index 5e4ee0d11..4df983f8a 100644 --- a/src/rank/query_builder.rs +++ b/src/rank/query_builder.rs @@ -232,11 +232,11 @@ where D: Deref, // we must compute the real distinguished len of this sub-group for document in group.iter() { let filter_accepted = match &self.inner.filter { - None => true, Some(filter) => { let entry = filter_map.entry(document.id); *entry.or_insert_with(|| (filter)(document.id, view)) }, + None => true, }; if filter_accepted { From c05fab783a017596f50f50f8edac12a2fee39c6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 2 Jan 2019 17:52:18 +0100 Subject: [PATCH 2/2] fix: Write and Read DocumentKeyAttr in big endian --- src/database/document_key.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/database/document_key.rs b/src/database/document_key.rs index f56bda1dd..cfcb34ad6 100644 --- a/src/database/document_key.rs +++ b/src/database/document_key.rs @@ -2,7 +2,7 @@ use std::io::{Cursor, Read, Write}; use std::mem::size_of; use std::fmt; -use byteorder::{NativeEndian, WriteBytesExt, ReadBytesExt}; +use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt}; use crate::database::schema::SchemaAttr; use crate::DocumentId; @@ -19,7 +19,7 @@ impl DocumentKey { let mut wtr = Cursor::new(&mut buffer[..]); wtr.write_all(b"doc-").unwrap(); - wtr.write_u64::(id.0).unwrap(); + wtr.write_u64::(id.0).unwrap(); DocumentKey(buffer) } @@ -43,7 +43,7 @@ impl DocumentKey { } pub fn document_id(&self) -> DocumentId { - let id = (&self.0[4..]).read_u64::().unwrap(); + let id = (&self.0[4..]).read_u64::().unwrap(); DocumentId(id) } } @@ -73,7 +73,7 @@ impl DocumentKeyAttr { let mut wtr = Cursor::new(&mut buffer[..]); wtr.write_all(&raw_key).unwrap(); wtr.write_all(b"-").unwrap(); - wtr.write_u16::(attr.0).unwrap(); + wtr.write_u16::(attr.0).unwrap(); DocumentKeyAttr(buffer) } @@ -97,13 +97,13 @@ impl DocumentKeyAttr { } pub fn document_id(&self) -> DocumentId { - let id = (&self.0[4..]).read_u64::().unwrap(); + let id = (&self.0[4..]).read_u64::().unwrap(); DocumentId(id) } pub fn attribute(&self) -> SchemaAttr { let offset = 4 + size_of::() + 1; - let value = (&self.0[offset..]).read_u16::().unwrap(); + let value = (&self.0[offset..]).read_u16::().unwrap(); SchemaAttr::new(value) }