feat: Introduce the PositiveUpdate

This commit is contained in:
Clément Renault 2018-11-30 14:31:46 +01:00
parent 8b2a8da8fa
commit 2719f1ad3b
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
14 changed files with 251 additions and 204 deletions

View File

@ -3,7 +3,7 @@ pub mod positive;
pub mod negative; pub mod negative;
pub use self::positive::{PositiveBlob, PositiveBlobBuilder}; pub use self::positive::{PositiveBlob, PositiveBlobBuilder};
pub use self::negative::{NegativeBlob, NegativeBlobBuilder}; pub use self::negative::NegativeBlob;
pub use self::ops::OpBuilder; pub use self::ops::OpBuilder;
use std::fmt; use std::fmt;

View File

@ -1,11 +1,10 @@
use std::io::Write;
use std::path::Path; use std::path::Path;
use std::error::Error; use std::error::Error;
use crate::DocumentId;
use crate::data::{DocIds, DocIdsBuilder};
use serde::ser::{Serialize, Serializer};
use serde::de::{self, Deserialize, Deserializer}; use serde::de::{self, Deserialize, Deserializer};
use serde::ser::{Serialize, Serializer};
use crate::data::DocIds;
use crate::DocumentId;
pub struct NegativeBlob { pub struct NegativeBlob {
doc_ids: DocIds, doc_ids: DocIds,
@ -55,33 +54,3 @@ impl<'de> Deserialize<'de> for NegativeBlob {
NegativeBlob::from_bytes(bytes).map_err(de::Error::custom) NegativeBlob::from_bytes(bytes).map_err(de::Error::custom)
} }
} }
pub struct NegativeBlobBuilder<W> {
doc_ids: DocIdsBuilder<W>,
}
impl<W: Write> NegativeBlobBuilder<W> {
pub fn new(wrt: W) -> Self {
Self { doc_ids: DocIdsBuilder::new(wrt) }
}
pub fn insert(&mut self, doc: DocumentId) -> bool {
self.doc_ids.insert(doc)
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(|_| ())
}
pub fn into_inner(self) -> Result<W, Box<Error>> {
// FIXME insert a magic number that indicates if the endianess
// of the input is the same as the machine that is reading it.
Ok(self.doc_ids.into_inner()?)
}
}
impl NegativeBlobBuilder<Vec<u8>> {
pub fn build(self) -> Result<NegativeBlob, Box<Error>> {
self.into_inner().and_then(|ids| NegativeBlob::from_bytes(ids))
}
}

View File

@ -1,5 +1,5 @@
mod blob; mod blob;
mod ops; mod ops;
pub use self::blob::{NegativeBlob, NegativeBlobBuilder}; pub use self::blob::NegativeBlob;
pub use self::ops::OpBuilder; pub use self::ops::OpBuilder;

View File

@ -1,11 +1,9 @@
use std::collections::BTreeSet;
use std::slice::from_raw_parts; use std::slice::from_raw_parts;
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::{io, mem}; use std::{io, mem};
use byteorder::{NativeEndian, WriteBytesExt};
use fst::raw::MmapReadOnly; use fst::raw::MmapReadOnly;
use serde::ser::{Serialize, Serializer}; use serde::ser::{Serialize, Serializer};
@ -57,28 +55,3 @@ impl Serialize for DocIds {
self.data.as_ref().serialize(serializer) self.data.as_ref().serialize(serializer)
} }
} }
pub struct DocIdsBuilder<W> {
doc_ids: BTreeSet<DocumentId>, // TODO: prefer a linked-list
wrt: W,
}
impl<W: io::Write> DocIdsBuilder<W> {
pub fn new(wrt: W) -> Self {
Self {
doc_ids: BTreeSet::new(),
wrt: wrt,
}
}
pub fn insert(&mut self, doc: DocumentId) -> bool {
self.doc_ids.insert(doc)
}
pub fn into_inner(mut self) -> io::Result<W> {
for id in self.doc_ids {
self.wrt.write_u64::<NativeEndian>(id)?;
}
Ok(self.wrt)
}
}

View File

@ -6,7 +6,7 @@ use std::sync::Arc;
use fst::raw::MmapReadOnly; use fst::raw::MmapReadOnly;
pub use self::doc_ids::{DocIds, DocIdsBuilder}; pub use self::doc_ids::DocIds;
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder, RawDocIndexesBuilder}; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder, RawDocIndexesBuilder};
#[derive(Clone)] #[derive(Clone)]

BIN
src/index/update/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -1,11 +1,20 @@
use std::io::{Cursor, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::error::Error; use std::error::Error;
mod negative_update; use byteorder::{NetworkEndian, WriteBytesExt};
mod positive_update;
pub use self::positive_update::{PositiveUpdateBuilder, NewState}; use crate::index::schema::SchemaAttr;
pub use self::negative_update::NegativeUpdateBuilder; use crate::DocumentId;
mod negative;
mod positive;
pub use self::positive::{PositiveUpdateBuilder, NewState};
pub use self::negative::NegativeUpdateBuilder;
const DOC_KEY_LEN: usize = 4 + std::mem::size_of::<u64>();
const DOC_KEY_ATTR_LEN: usize = DOC_KEY_LEN + 1 + std::mem::size_of::<u32>();
pub struct Update { pub struct Update {
path: PathBuf, path: PathBuf,
@ -20,3 +29,27 @@ impl Update {
self.path self.path
} }
} }
// "doc-{ID_8_BYTES}"
fn raw_document_key(id: DocumentId) -> [u8; DOC_KEY_LEN] {
let mut key = [0; DOC_KEY_LEN];
let mut wtr = Cursor::new(&mut key[..]);
wtr.write_all(b"doc-").unwrap();
wtr.write_u64::<NetworkEndian>(id).unwrap();
key
}
// "doc-{ID_8_BYTES}-{ATTR_4_BYTES}"
fn raw_document_key_attr(id: DocumentId, attr: SchemaAttr) -> [u8; DOC_KEY_ATTR_LEN] {
let mut key = [0; DOC_KEY_ATTR_LEN];
let raw_key = raw_document_key(id);
let mut wtr = Cursor::new(&mut key[..]);
wtr.write_all(&raw_key).unwrap();
wtr.write_all(b"-").unwrap();
wtr.write_u32::<NetworkEndian>(attr.as_u32()).unwrap();
key
}

View File

@ -0,0 +1,4 @@
mod update;
mod unordered_builder;
pub use self::update::NegativeUpdateBuilder;

View File

@ -0,0 +1,37 @@
use std::collections::BTreeSet;
use std::io;
use byteorder::{NativeEndian, WriteBytesExt};
use crate::DocumentId;
pub struct UnorderedNegativeBlobBuilder<W> {
doc_ids: BTreeSet<DocumentId>, // TODO: prefer a linked-list
wrt: W,
}
impl UnorderedNegativeBlobBuilder<Vec<u8>> {
pub fn memory() -> Self {
UnorderedNegativeBlobBuilder::new(Vec::new())
}
}
impl<W: io::Write> UnorderedNegativeBlobBuilder<W> {
pub fn new(wrt: W) -> Self {
Self {
doc_ids: BTreeSet::new(),
wrt: wrt,
}
}
pub fn insert(&mut self, doc: DocumentId) -> bool {
self.doc_ids.insert(doc)
}
pub fn into_inner(mut self) -> io::Result<W> {
for id in self.doc_ids {
self.wrt.write_u64::<NativeEndian>(id)?;
}
Ok(self.wrt)
}
}

View File

@ -1,37 +1,24 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::error::Error; use std::error::Error;
use std::io::{Cursor, Write};
use byteorder::{NetworkEndian, WriteBytesExt};
use ::rocksdb::rocksdb_options; use ::rocksdb::rocksdb_options;
use crate::data::{DocIds, DocIdsBuilder}; use crate::index::update::negative::unordered_builder::UnorderedNegativeBlobBuilder;
use crate::index::update::{Update, raw_document_key};
use crate::blob::{Blob, NegativeBlob}; use crate::blob::{Blob, NegativeBlob};
use crate::index::update::Update;
use crate::index::DATA_INDEX; use crate::index::DATA_INDEX;
use crate::DocumentId; use crate::DocumentId;
const DOC_KEY_LEN: usize = 4 + std::mem::size_of::<u64>();
// "doc-ID_8_BYTES"
fn raw_document_key(id: DocumentId) -> [u8; DOC_KEY_LEN] {
let mut key = [0; DOC_KEY_LEN];
let mut rdr = Cursor::new(&mut key[..]);
rdr.write_all(b"doc-").unwrap();
rdr.write_u64::<NetworkEndian>(id).unwrap();
key
}
pub struct NegativeUpdateBuilder { pub struct NegativeUpdateBuilder {
path: PathBuf, path: PathBuf,
doc_ids: DocIdsBuilder<Vec<u8>>, doc_ids: UnorderedNegativeBlobBuilder<Vec<u8>>,
} }
impl NegativeUpdateBuilder { impl NegativeUpdateBuilder {
pub fn new<P: Into<PathBuf>>(path: P) -> NegativeUpdateBuilder { pub fn new<P: Into<PathBuf>>(path: P) -> NegativeUpdateBuilder {
NegativeUpdateBuilder { NegativeUpdateBuilder {
path: path.into(), path: path.into(),
doc_ids: DocIdsBuilder::new(Vec::new()), doc_ids: UnorderedNegativeBlobBuilder::memory(),
} }
} }
@ -45,10 +32,11 @@ impl NegativeUpdateBuilder {
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?; file_writer.open(&self.path.to_string_lossy())?;
// write the data-index aka negative blob
let bytes = self.doc_ids.into_inner()?; let bytes = self.doc_ids.into_inner()?;
let doc_ids = DocIds::from_bytes(bytes)?; let negative_blob = NegativeBlob::from_bytes(bytes)?;
let blob = Blob::Negative(NegativeBlob::from_raw(doc_ids)); let blob = Blob::Negative(negative_blob);
// write the data-index aka negative blob
let bytes = bincode::serialize(&blob)?; let bytes = bincode::serialize(&blob)?;
file_writer.merge(DATA_INDEX, &bytes)?; file_writer.merge(DATA_INDEX, &bytes)?;

View File

@ -0,0 +1,4 @@
mod update;
mod unordered_builder;
pub use self::update::{PositiveUpdateBuilder, NewState};

View File

@ -0,0 +1,45 @@
use std::collections::BTreeMap;
use std::error::Error;
use std::io::Write;
use crate::blob::positive::PositiveBlobBuilder;
use crate::DocIndex;
pub struct UnorderedPositiveBlobBuilder<W, X> {
builder: PositiveBlobBuilder<W, X>,
map: BTreeMap<Vec<u8>, Vec<DocIndex>>,
}
impl UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
Self {
builder: PositiveBlobBuilder::memory(),
map: BTreeMap::new(),
}
}
}
impl<W: Write, X: Write> UnorderedPositiveBlobBuilder<W, X> {
pub fn new(map_wtr: W, doc_wtr: X) -> Result<Self, Box<Error>> {
Ok(UnorderedPositiveBlobBuilder {
builder: PositiveBlobBuilder::new(map_wtr, doc_wtr)?,
map: BTreeMap::new(),
})
}
pub fn insert<K: Into<Vec<u8>>>(&mut self, input: K, doc_index: DocIndex) {
self.map.entry(input.into()).or_insert_with(Vec::new).push(doc_index);
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(drop)
}
pub fn into_inner(mut self) -> Result<(W, X), Box<Error>> {
for (key, mut doc_indexes) in self.map {
doc_indexes.sort_unstable();
self.builder.insert(&key, &doc_indexes)?;
}
self.builder.into_inner()
}
}

View File

@ -0,0 +1,110 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::index::update::positive::unordered_builder::UnorderedPositiveBlobBuilder;
use crate::index::schema::{SchemaProps, Schema, SchemaAttr};
use crate::index::update::{Update, raw_document_key_attr};
use crate::blob::positive::PositiveBlob;
use crate::tokenizer::TokenizerBuilder;
use crate::{DocumentId, DocIndex};
use crate::index::DATA_INDEX;
use crate::blob::Blob;
pub enum NewState {
Updated {
value: String,
props: SchemaProps,
},
Removed,
}
pub struct PositiveUpdateBuilder<B> {
path: PathBuf,
schema: Schema,
tokenizer_builder: B,
new_states: BTreeMap<(DocumentId, SchemaAttr), NewState>,
}
impl<B> PositiveUpdateBuilder<B> {
pub fn new<P: Into<PathBuf>>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder<B> {
PositiveUpdateBuilder {
path: path.into(),
schema: schema,
tokenizer_builder: tokenizer_builder,
new_states: BTreeMap::new(),
}
}
// TODO value must be a field that can be indexed
pub fn update_field(&mut self, id: DocumentId, field: SchemaAttr, value: String) {
let state = NewState::Updated { value, props: self.schema.props(field) };
self.new_states.insert((id, field), state);
}
pub fn remove_field(&mut self, id: DocumentId, field: SchemaAttr) {
self.new_states.insert((id, field), NewState::Removed);
}
}
impl<B> PositiveUpdateBuilder<B>
where B: TokenizerBuilder
{
pub fn build(self) -> Result<Update, Box<Error>> {
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
let mut builder = UnorderedPositiveBlobBuilder::memory();
for ((document_id, attr), state) in &self.new_states {
let value = match state {
NewState::Updated { value, props } if props.is_indexed() => value,
_ => continue,
};
for (index, word) in self.tokenizer_builder.build(value) {
let doc_index = DocIndex {
document_id: *document_id,
attribute: attr.as_u32() as u8,
attribute_index: index as u32,
};
// insert the exact representation
let word_lower = word.to_lowercase();
// and the unidecoded lowercased version
let word_unidecoded = unidecode::unidecode(word).to_lowercase();
if word_lower != word_unidecoded {
builder.insert(word_unidecoded, doc_index);
}
builder.insert(word_lower, doc_index);
}
}
let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(blob_fst_map, blob_doc_idx)?;
let blob = Blob::Positive(positive_blob);
// write the data-index aka positive blob
let bytes = bincode::serialize(&blob)?;
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents fields updates
for ((id, attr), state) in self.new_states {
let key = raw_document_key_attr(id, attr);
match state {
NewState::Updated { value, props } => if props.is_stored() {
file_writer.put(&key, value.as_bytes())?
},
NewState::Removed => file_writer.delete(&key)?,
}
}
file_writer.finish()?;
Update::open(self.path)
}
}

View File

@ -1,116 +0,0 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::index::update::Update;
use crate::index::schema::{SchemaProps, Schema, SchemaAttr};
use crate::tokenizer::TokenizerBuilder;
use crate::DocumentId;
pub enum NewState {
Updated {
value: String,
props: SchemaProps,
},
Removed,
}
pub struct PositiveUpdateBuilder<B> {
path: PathBuf,
schema: Schema,
tokenizer_builder: B,
new_states: BTreeMap<(DocumentId, SchemaAttr), NewState>,
}
impl<B> PositiveUpdateBuilder<B> {
pub fn new<P: Into<PathBuf>>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder<B> {
PositiveUpdateBuilder {
path: path.into(),
schema: schema,
tokenizer_builder: tokenizer_builder,
new_states: BTreeMap::new(),
}
}
// TODO value must be a field that can be indexed
pub fn update_field(&mut self, id: DocumentId, field: SchemaAttr, value: String) {
let state = NewState::Updated { value, props: self.schema.props(field) };
self.new_states.insert((id, field), state);
}
pub fn remove_field(&mut self, id: DocumentId, field: SchemaAttr) {
self.new_states.insert((id, field), NewState::Removed);
}
}
impl<B> PositiveUpdateBuilder<B>
where B: TokenizerBuilder
{
pub fn build(self) -> Result<Update, Box<Error>> {
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
// let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
// for ((document_id, field), state) in &self.new_states {
// let value = match state {
// NewState::Updated { value, props } if props.is_indexed() => value,
// _ => continue,
// };
// for (index, word) in self.tokenizer_builder.build(value) {
// let doc_index = DocIndex {
// document_id: *document_id,
// attribute: field.as_u32() as u8,
// attribute_index: index as u32,
// };
// // insert the exact representation
// let word_lower = word.to_lowercase();
// // and the unidecoded lowercased version
// let word_unidecoded = unidecode::unidecode(word).to_lowercase();
// if word_lower != word_unidecoded {
// builder.insert(word_unidecoded, doc_index);
// }
// builder.insert(word_lower, doc_index);
// }
// }
// let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
// // write the doc-idx
// let blob_key = Identifier::blob(blob_info.name).document_indexes().build();
// file_writer.put(&blob_key, &blob_doc_idx)?;
// // write the fst
// let blob_key = Identifier::blob(blob_info.name).fst_map().build();
// file_writer.put(&blob_key, &blob_fst_map)?;
// {
// // write the blob name to be merged
// let mut buffer = Vec::new();
// blob_info.write_into(&mut buffer);
// let data_key = Identifier::data().blobs_order().build();
// file_writer.merge(&data_key, &buffer)?;
// }
// // write all the documents fields updates
// for ((id, attr), state) in self.new_states {
// let key = Identifier::document(id).attribute(attr).build();
// match state {
// NewState::Updated { value, props } => if props.is_stored() {
// file_writer.put(&key, value.as_bytes())?
// },
// NewState::Removed => file_writer.delete(&key)?,
// }
// }
// file_writer.finish()?;
// Update::open(self.path)
unimplemented!()
}
}