Merge pull request #61 from Kerollmops/update-remove-kv-attributes

UpdateBuilder handles document attributes deletion
This commit is contained in:
Clément Renault 2019-01-02 18:20:14 +01:00 committed by GitHub
commit e6d3840f12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 156 additions and 73 deletions

View File

@ -2,7 +2,7 @@ use std::io::{Cursor, Read, Write};
use std::mem::size_of;
use std::fmt;
use byteorder::{NativeEndian, WriteBytesExt, ReadBytesExt};
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
use crate::database::schema::SchemaAttr;
use crate::DocumentId;
@ -19,7 +19,7 @@ impl DocumentKey {
let mut wtr = Cursor::new(&mut buffer[..]);
wtr.write_all(b"doc-").unwrap();
wtr.write_u64::<NativeEndian>(id.0).unwrap();
wtr.write_u64::<BigEndian>(id.0).unwrap();
DocumentKey(buffer)
}
@ -43,7 +43,7 @@ impl DocumentKey {
}
pub fn document_id(&self) -> DocumentId {
let id = (&self.0[4..]).read_u64::<NativeEndian>().unwrap();
let id = (&self.0[4..]).read_u64::<BigEndian>().unwrap();
DocumentId(id)
}
}
@ -73,11 +73,19 @@ impl DocumentKeyAttr {
let mut wtr = Cursor::new(&mut buffer[..]);
wtr.write_all(&raw_key).unwrap();
wtr.write_all(b"-").unwrap();
wtr.write_u16::<NativeEndian>(attr.0).unwrap();
wtr.write_u16::<BigEndian>(attr.0).unwrap();
DocumentKeyAttr(buffer)
}
pub fn with_attribute_min(id: DocumentId) -> DocumentKeyAttr {
DocumentKeyAttr::new(id, SchemaAttr::min())
}
pub fn with_attribute_max(id: DocumentId) -> DocumentKeyAttr {
DocumentKeyAttr::new(id, SchemaAttr::max())
}
pub fn from_bytes(mut bytes: &[u8]) -> DocumentKeyAttr {
assert!(bytes.len() >= DOC_KEY_ATTR_LEN);
assert_eq!(&bytes[..4], b"doc-");
@ -89,13 +97,13 @@ impl DocumentKeyAttr {
}
pub fn document_id(&self) -> DocumentId {
let id = (&self.0[4..]).read_u64::<NativeEndian>().unwrap();
let id = (&self.0[4..]).read_u64::<BigEndian>().unwrap();
DocumentId(id)
}
pub fn attribute(&self) -> SchemaAttr {
let offset = 4 + size_of::<u64>() + 1;
let value = (&self.0[offset..]).read_u16::<NativeEndian>().unwrap();
let value = (&self.0[offset..]).read_u16::<BigEndian>().unwrap();
SchemaAttr::new(value)
}

View File

@ -176,6 +176,18 @@ impl SchemaAttr {
SchemaAttr(value)
}
pub fn min() -> SchemaAttr {
SchemaAttr(0)
}
pub fn next(self) -> Option<SchemaAttr> {
self.0.checked_add(1).map(SchemaAttr)
}
pub fn prev(self) -> Option<SchemaAttr> {
self.0.checked_sub(1).map(SchemaAttr)
}
pub fn max() -> SchemaAttr {
SchemaAttr(u16::MAX)
}

View File

@ -1,6 +1,6 @@
use crate::database::update::RawUpdateBuilder;
use crate::database::schema::SchemaAttr;
use crate::database::update::DocumentUpdate;
use crate::database::serde::SerializerError;
use crate::database::schema::SchemaAttr;
use crate::tokenizer::TokenizerBuilder;
use crate::tokenizer::Token;
use crate::{DocumentId, DocIndex, Attribute, WordArea};
@ -10,7 +10,7 @@ use serde::ser;
pub struct IndexerSerializer<'a, B> {
pub tokenizer_builder: &'a B,
pub builder: &'a mut RawUpdateBuilder,
pub update: &'a mut DocumentUpdate,
pub document_id: DocumentId,
pub attribute: SchemaAttr,
}
@ -72,10 +72,10 @@ where B: TokenizerBuilder
// and the unidecoded lowercased version
let word_unidecoded = unidecode::unidecode(word).to_lowercase();
if word_lower != word_unidecoded {
self.builder.insert_doc_index(word_unidecoded.into_bytes(), doc_index);
self.update.insert_doc_index(word_unidecoded.into_bytes(), doc_index);
}
self.builder.insert_doc_index(word_lower.into_bytes(), doc_index);
self.update.insert_doc_index(word_lower.into_bytes(), doc_index);
}
Ok(())
}

View File

@ -3,8 +3,7 @@ use serde::ser;
use crate::database::serde::indexer_serializer::IndexerSerializer;
use crate::database::serde::key_to_string::KeyToStringSerializer;
use crate::database::document_key::DocumentKeyAttr;
use crate::database::update::RawUpdateBuilder;
use crate::database::update::DocumentUpdate;
use crate::database::serde::SerializerError;
use crate::tokenizer::TokenizerBuilder;
use crate::database::schema::Schema;
@ -12,9 +11,9 @@ use crate::DocumentId;
pub struct Serializer<'a, B> {
pub schema: &'a Schema,
pub update: &'a mut DocumentUpdate,
pub document_id: DocumentId,
pub tokenizer_builder: &'a B,
pub builder: &'a mut RawUpdateBuilder,
}
impl<'a, B> ser::Serializer for Serializer<'a, B>
@ -137,10 +136,10 @@ where B: TokenizerBuilder
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Ok(MapSerializer {
schema: self.schema,
tokenizer_builder: self.tokenizer_builder,
document_id: self.document_id,
update: self.update,
tokenizer_builder: self.tokenizer_builder,
current_key_name: None,
builder: self.builder,
})
}
@ -152,9 +151,9 @@ where B: TokenizerBuilder
{
Ok(StructSerializer {
schema: self.schema,
tokenizer_builder: self.tokenizer_builder,
update: self.update,
document_id: self.document_id,
builder: self.builder,
tokenizer_builder: self.tokenizer_builder,
})
}
@ -172,10 +171,10 @@ where B: TokenizerBuilder
pub struct MapSerializer<'a, B> {
pub schema: &'a Schema,
pub tokenizer_builder: &'a B,
pub document_id: DocumentId,
pub update: &'a mut DocumentUpdate,
pub tokenizer_builder: &'a B,
pub current_key_name: Option<String>,
pub builder: &'a mut RawUpdateBuilder,
}
impl<'a, B> ser::SerializeMap for MapSerializer<'a, B>
@ -212,12 +211,11 @@ where B: TokenizerBuilder
let props = self.schema.props(attr);
if props.is_stored() {
let value = bincode::serialize(value).unwrap();
let key = DocumentKeyAttr::new(self.document_id, attr);
self.builder.insert_attribute_value(key, value);
self.update.insert_attribute_value(attr, value);
}
if props.is_indexed() {
let serializer = IndexerSerializer {
builder: self.builder,
update: self.update,
tokenizer_builder: self.tokenizer_builder,
document_id: self.document_id,
attribute: attr,
@ -237,8 +235,8 @@ where B: TokenizerBuilder
pub struct StructSerializer<'a, B> {
pub schema: &'a Schema,
pub document_id: DocumentId,
pub update: &'a mut DocumentUpdate,
pub tokenizer_builder: &'a B,
pub builder: &'a mut RawUpdateBuilder,
}
impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B>
@ -258,12 +256,11 @@ where B: TokenizerBuilder
let props = self.schema.props(attr);
if props.is_stored() {
let value = bincode::serialize(value).unwrap();
let key = DocumentKeyAttr::new(self.document_id, attr);
self.builder.insert_attribute_value(key, value);
self.update.insert_attribute_value(attr, value);
}
if props.is_indexed() {
let serializer = IndexerSerializer {
builder: self.builder,
update: self.update,
tokenizer_builder: self.tokenizer_builder,
document_id: self.document_id,
attribute: attr,

View File

@ -33,12 +33,13 @@ impl UpdateBuilder {
B: TokenizerBuilder,
{
let document_id = self.schema.document_id(&document)?;
let update = self.raw_builder.document_update(document_id);
let serializer = Serializer {
schema: &self.schema,
document_id: document_id,
tokenizer_builder: tokenizer_builder,
builder: &mut self.raw_builder,
update: update,
};
document.serialize(serializer)?;
@ -50,7 +51,7 @@ impl UpdateBuilder {
where T: Serialize,
{
let document_id = self.schema.document_id(&document)?;
self.raw_builder.remove_document(document_id);
self.raw_builder.document_update(document_id).remove();
Ok(document_id)
}

View File

@ -4,7 +4,7 @@ mod builder;
mod raw_builder;
pub use self::builder::UpdateBuilder;
pub use self::raw_builder::RawUpdateBuilder;
pub use self::raw_builder::{RawUpdateBuilder, DocumentUpdate};
pub struct Update {
sst_file: PathBuf,

View File

@ -1,13 +1,15 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::error::Error;
use rocksdb::rocksdb_options;
use hashbrown::HashMap;
use fst::map::Map;
use sdset::Set;
use crate::database::index::{Index, Positive, PositiveBuilder, Negative};
use crate::database::{DATA_INDEX, DocumentKeyAttr};
use crate::database::schema::SchemaAttr;
use crate::data::{DocIds, DocIndexes};
use crate::{DocumentId, DocIndex};
use super::Update;
@ -17,60 +19,86 @@ type Value = Vec<u8>;
pub struct RawUpdateBuilder {
sst_file: PathBuf,
removed_documents: BTreeSet<DocumentId>,
words_indexes: BTreeMap<Token, Vec<DocIndex>>,
keys_values: BTreeMap<DocumentKeyAttr, Value>,
document_updates: BTreeMap<DocumentId, DocumentUpdate>,
}
pub struct DocumentUpdate {
cleared: bool,
words_indexes: HashMap<Token, Vec<DocIndex>>,
attributes: BTreeMap<SchemaAttr, Value>,
}
impl DocumentUpdate {
pub fn new() -> DocumentUpdate {
DocumentUpdate {
cleared: false,
words_indexes: HashMap::new(),
attributes: BTreeMap::new(),
}
}
pub fn remove(&mut self) {
self.cleared = true;
self.words_indexes.clear();
self.attributes.clear();
}
pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: Vec<u8>) {
self.attributes.insert(attr, value);
}
pub fn insert_doc_index(&mut self, token: Vec<u8>, doc_index: DocIndex) {
self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index)
}
}
impl RawUpdateBuilder {
pub fn new(path: PathBuf) -> RawUpdateBuilder {
RawUpdateBuilder {
sst_file: path,
removed_documents: BTreeSet::new(),
words_indexes: BTreeMap::new(),
keys_values: BTreeMap::new(),
document_updates: BTreeMap::new(),
}
}
pub fn insert_doc_index(&mut self, token: Vec<u8>, doc_index: DocIndex) {
self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index)
pub fn document_update(&mut self, document_id: DocumentId) -> &mut DocumentUpdate {
self.document_updates.entry(document_id).or_insert_with(DocumentUpdate::new)
}
pub fn insert_attribute_value(&mut self, key_attr: DocumentKeyAttr, value: Vec<u8>) -> Option<Vec<u8>> {
self.keys_values.insert(key_attr, value)
}
pub fn build(mut self) -> Result<Update, Box<Error>> {
let mut removed_document_ids = Vec::new();
let mut words_indexes = BTreeMap::new();
pub fn remove_document(&mut self, id: DocumentId) {
self.removed_documents.insert(id);
}
for (&id, update) in self.document_updates.iter_mut() {
if update.cleared { removed_document_ids.push(id) }
pub fn build(self) -> Result<Update, Box<Error>> {
let tree = {
let negative = {
let documents_ids: Vec<_> = self.removed_documents.into_iter().collect();
let documents_ids = Set::new_unchecked(&documents_ids);
let doc_ids = DocIds::new(documents_ids);
Negative::new(doc_ids)
};
for (token, indexes) in &update.words_indexes {
words_indexes.entry(token).or_insert_with(Vec::new).extend_from_slice(indexes)
}
}
let positive = {
let mut builder = PositiveBuilder::memory();
for (key, mut indexes) in self.words_indexes {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
builder.insert(key, indexes)?;
}
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
};
Index { negative, positive }
let negative = {
let removed_document_ids = Set::new_unchecked(&removed_document_ids);
let doc_ids = DocIds::new(removed_document_ids);
Negative::new(doc_ids)
};
let positive = {
let mut positive_builder = PositiveBuilder::memory();
for (key, mut indexes) in words_indexes {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
positive_builder.insert(key, indexes)?;
}
let (map, indexes) = positive_builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
};
let index = Index { negative, positive };
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
@ -78,12 +106,49 @@ impl RawUpdateBuilder {
// write the data-index
let mut bytes = Vec::new();
tree.write_to_bytes(&mut bytes);
index.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents attributes updates
for (key, value) in self.keys_values {
file_writer.put(key.as_ref(), &value)?;
for (id, update) in self.document_updates {
let mut last_attr: Option<SchemaAttr> = None;
for (attr, value) in update.attributes {
if update.cleared {
// if there is no last attribute, remove from the first attribute
let start_attr = match last_attr {
Some(attr) => attr.next(),
None => Some(SchemaAttr::min())
};
let start = start_attr.map(|a| DocumentKeyAttr::new(id, a));
let end = attr.prev().map(|a| DocumentKeyAttr::new(id, a));
// delete_range between (last_attr + 1) and (attr - 1)
if let (Some(start), Some(end)) = (start, end) {
file_writer.delete_range(start.as_ref(), end.as_ref())?;
}
}
let key = DocumentKeyAttr::new(id, attr);
file_writer.put(key.as_ref(), &value)?;
last_attr = Some(attr);
}
if update.cleared {
// if there is no last attribute, remove from the first attribute
let start_attr = match last_attr {
Some(attr) => attr.next(),
None => Some(SchemaAttr::min())
};
let start = start_attr.map(|a| DocumentKeyAttr::new(id, a));
let end = DocumentKeyAttr::with_attribute_max(id);
// delete_range between (last_attr + 1) and attr_max
if let Some(start) = start {
file_writer.delete_range(start.as_ref(), end.as_ref())?;
}
}
}
file_writer.finish()?;

View File

@ -232,11 +232,11 @@ where D: Deref<Target=DB>,
// we must compute the real distinguished len of this sub-group
for document in group.iter() {
let filter_accepted = match &self.inner.filter {
None => true,
Some(filter) => {
let entry = filter_map.entry(document.id);
*entry.or_insert_with(|| (filter)(document.id, view))
},
None => true,
};
if filter_accepted {