diff --git a/src/blob/mod.rs b/src/blob/mod.rs index 76f28fef2..05f9367c4 100644 --- a/src/blob/mod.rs +++ b/src/blob/mod.rs @@ -3,7 +3,7 @@ pub mod positive; pub mod negative; pub use self::positive::{PositiveBlob, PositiveBlobBuilder}; -pub use self::negative::{NegativeBlob, NegativeBlobBuilder}; +pub use self::negative::NegativeBlob; pub use self::ops::OpBuilder; use std::fmt; diff --git a/src/blob/negative/blob.rs b/src/blob/negative/blob.rs index 53e50b021..425be4cf6 100644 --- a/src/blob/negative/blob.rs +++ b/src/blob/negative/blob.rs @@ -1,11 +1,10 @@ -use std::io::Write; use std::path::Path; use std::error::Error; -use crate::DocumentId; -use crate::data::{DocIds, DocIdsBuilder}; -use serde::ser::{Serialize, Serializer}; use serde::de::{self, Deserialize, Deserializer}; +use serde::ser::{Serialize, Serializer}; +use crate::data::DocIds; +use crate::DocumentId; pub struct NegativeBlob { doc_ids: DocIds, @@ -55,33 +54,3 @@ impl<'de> Deserialize<'de> for NegativeBlob { NegativeBlob::from_bytes(bytes).map_err(de::Error::custom) } } - -pub struct NegativeBlobBuilder { - doc_ids: DocIdsBuilder, -} - -impl NegativeBlobBuilder { - pub fn new(wrt: W) -> Self { - Self { doc_ids: DocIdsBuilder::new(wrt) } - } - - pub fn insert(&mut self, doc: DocumentId) -> bool { - self.doc_ids.insert(doc) - } - - pub fn finish(self) -> Result<(), Box> { - self.into_inner().map(|_| ()) - } - - pub fn into_inner(self) -> Result> { - // FIXME insert a magic number that indicates if the endianess - // of the input is the same as the machine that is reading it. - Ok(self.doc_ids.into_inner()?) - } -} - -impl NegativeBlobBuilder> { - pub fn build(self) -> Result> { - self.into_inner().and_then(|ids| NegativeBlob::from_bytes(ids)) - } -} diff --git a/src/blob/negative/mod.rs b/src/blob/negative/mod.rs index 56c9f4ef4..ce0000da0 100644 --- a/src/blob/negative/mod.rs +++ b/src/blob/negative/mod.rs @@ -1,5 +1,5 @@ mod blob; mod ops; -pub use self::blob::{NegativeBlob, NegativeBlobBuilder}; +pub use self::blob::NegativeBlob; pub use self::ops::OpBuilder; diff --git a/src/data/doc_ids.rs b/src/data/doc_ids.rs index f93fe495e..d5650cce6 100644 --- a/src/data/doc_ids.rs +++ b/src/data/doc_ids.rs @@ -1,11 +1,9 @@ -use std::collections::BTreeSet; use std::slice::from_raw_parts; use std::error::Error; use std::path::Path; use std::sync::Arc; use std::{io, mem}; -use byteorder::{NativeEndian, WriteBytesExt}; use fst::raw::MmapReadOnly; use serde::ser::{Serialize, Serializer}; @@ -57,28 +55,3 @@ impl Serialize for DocIds { self.data.as_ref().serialize(serializer) } } - -pub struct DocIdsBuilder { - doc_ids: BTreeSet, // TODO: prefer a linked-list - wrt: W, -} - -impl DocIdsBuilder { - pub fn new(wrt: W) -> Self { - Self { - doc_ids: BTreeSet::new(), - wrt: wrt, - } - } - - pub fn insert(&mut self, doc: DocumentId) -> bool { - self.doc_ids.insert(doc) - } - - pub fn into_inner(mut self) -> io::Result { - for id in self.doc_ids { - self.wrt.write_u64::(id)?; - } - Ok(self.wrt) - } -} diff --git a/src/data/mod.rs b/src/data/mod.rs index 4cd21b65e..bf810de93 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use fst::raw::MmapReadOnly; -pub use self::doc_ids::{DocIds, DocIdsBuilder}; +pub use self::doc_ids::DocIds; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder, RawDocIndexesBuilder}; #[derive(Clone)] diff --git a/src/index/update/.DS_Store b/src/index/update/.DS_Store new file mode 100644 index 000000000..445e2e61a Binary files /dev/null and b/src/index/update/.DS_Store differ diff --git a/src/index/update/mod.rs b/src/index/update/mod.rs index 3c3601f26..104ec469f 100644 --- a/src/index/update/mod.rs +++ b/src/index/update/mod.rs @@ -1,11 +1,20 @@ +use std::io::{Cursor, Write}; use std::path::PathBuf; use std::error::Error; -mod negative_update; -mod positive_update; +use byteorder::{NetworkEndian, WriteBytesExt}; -pub use self::positive_update::{PositiveUpdateBuilder, NewState}; -pub use self::negative_update::NegativeUpdateBuilder; +use crate::index::schema::SchemaAttr; +use crate::DocumentId; + +mod negative; +mod positive; + +pub use self::positive::{PositiveUpdateBuilder, NewState}; +pub use self::negative::NegativeUpdateBuilder; + +const DOC_KEY_LEN: usize = 4 + std::mem::size_of::(); +const DOC_KEY_ATTR_LEN: usize = DOC_KEY_LEN + 1 + std::mem::size_of::(); pub struct Update { path: PathBuf, @@ -20,3 +29,27 @@ impl Update { self.path } } + +// "doc-{ID_8_BYTES}" +fn raw_document_key(id: DocumentId) -> [u8; DOC_KEY_LEN] { + let mut key = [0; DOC_KEY_LEN]; + + let mut wtr = Cursor::new(&mut key[..]); + wtr.write_all(b"doc-").unwrap(); + wtr.write_u64::(id).unwrap(); + + key +} + +// "doc-{ID_8_BYTES}-{ATTR_4_BYTES}" +fn raw_document_key_attr(id: DocumentId, attr: SchemaAttr) -> [u8; DOC_KEY_ATTR_LEN] { + let mut key = [0; DOC_KEY_ATTR_LEN]; + let raw_key = raw_document_key(id); + + let mut wtr = Cursor::new(&mut key[..]); + wtr.write_all(&raw_key).unwrap(); + wtr.write_all(b"-").unwrap(); + wtr.write_u32::(attr.as_u32()).unwrap(); + + key +} diff --git a/src/index/update/negative/mod.rs b/src/index/update/negative/mod.rs new file mode 100644 index 000000000..bad19c918 --- /dev/null +++ b/src/index/update/negative/mod.rs @@ -0,0 +1,4 @@ +mod update; +mod unordered_builder; + +pub use self::update::NegativeUpdateBuilder; diff --git a/src/index/update/negative/unordered_builder.rs b/src/index/update/negative/unordered_builder.rs new file mode 100644 index 000000000..b73ecd2e3 --- /dev/null +++ b/src/index/update/negative/unordered_builder.rs @@ -0,0 +1,37 @@ +use std::collections::BTreeSet; +use std::io; + +use byteorder::{NativeEndian, WriteBytesExt}; + +use crate::DocumentId; + +pub struct UnorderedNegativeBlobBuilder { + doc_ids: BTreeSet, // TODO: prefer a linked-list + wrt: W, +} + +impl UnorderedNegativeBlobBuilder> { + pub fn memory() -> Self { + UnorderedNegativeBlobBuilder::new(Vec::new()) + } +} + +impl UnorderedNegativeBlobBuilder { + pub fn new(wrt: W) -> Self { + Self { + doc_ids: BTreeSet::new(), + wrt: wrt, + } + } + + pub fn insert(&mut self, doc: DocumentId) -> bool { + self.doc_ids.insert(doc) + } + + pub fn into_inner(mut self) -> io::Result { + for id in self.doc_ids { + self.wrt.write_u64::(id)?; + } + Ok(self.wrt) + } +} diff --git a/src/index/update/negative_update.rs b/src/index/update/negative/update.rs similarity index 70% rename from src/index/update/negative_update.rs rename to src/index/update/negative/update.rs index 342fde98c..54355c6e0 100644 --- a/src/index/update/negative_update.rs +++ b/src/index/update/negative/update.rs @@ -1,37 +1,24 @@ use std::path::PathBuf; use std::error::Error; -use std::io::{Cursor, Write}; -use byteorder::{NetworkEndian, WriteBytesExt}; use ::rocksdb::rocksdb_options; -use crate::data::{DocIds, DocIdsBuilder}; +use crate::index::update::negative::unordered_builder::UnorderedNegativeBlobBuilder; +use crate::index::update::{Update, raw_document_key}; use crate::blob::{Blob, NegativeBlob}; -use crate::index::update::Update; use crate::index::DATA_INDEX; use crate::DocumentId; -const DOC_KEY_LEN: usize = 4 + std::mem::size_of::(); - -// "doc-ID_8_BYTES" -fn raw_document_key(id: DocumentId) -> [u8; DOC_KEY_LEN] { - let mut key = [0; DOC_KEY_LEN]; - let mut rdr = Cursor::new(&mut key[..]); - rdr.write_all(b"doc-").unwrap(); - rdr.write_u64::(id).unwrap(); - key -} - pub struct NegativeUpdateBuilder { path: PathBuf, - doc_ids: DocIdsBuilder>, + doc_ids: UnorderedNegativeBlobBuilder>, } impl NegativeUpdateBuilder { pub fn new>(path: P) -> NegativeUpdateBuilder { NegativeUpdateBuilder { path: path.into(), - doc_ids: DocIdsBuilder::new(Vec::new()), + doc_ids: UnorderedNegativeBlobBuilder::memory(), } } @@ -45,10 +32,11 @@ impl NegativeUpdateBuilder { let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); file_writer.open(&self.path.to_string_lossy())?; - // write the data-index aka negative blob let bytes = self.doc_ids.into_inner()?; - let doc_ids = DocIds::from_bytes(bytes)?; - let blob = Blob::Negative(NegativeBlob::from_raw(doc_ids)); + let negative_blob = NegativeBlob::from_bytes(bytes)?; + let blob = Blob::Negative(negative_blob); + + // write the data-index aka negative blob let bytes = bincode::serialize(&blob)?; file_writer.merge(DATA_INDEX, &bytes)?; diff --git a/src/index/update/positive/mod.rs b/src/index/update/positive/mod.rs new file mode 100644 index 000000000..e05bd9dff --- /dev/null +++ b/src/index/update/positive/mod.rs @@ -0,0 +1,4 @@ +mod update; +mod unordered_builder; + +pub use self::update::{PositiveUpdateBuilder, NewState}; diff --git a/src/index/update/positive/unordered_builder.rs b/src/index/update/positive/unordered_builder.rs new file mode 100644 index 000000000..5b75fe28d --- /dev/null +++ b/src/index/update/positive/unordered_builder.rs @@ -0,0 +1,45 @@ +use std::collections::BTreeMap; +use std::error::Error; +use std::io::Write; + +use crate::blob::positive::PositiveBlobBuilder; +use crate::DocIndex; + +pub struct UnorderedPositiveBlobBuilder { + builder: PositiveBlobBuilder, + map: BTreeMap, Vec>, +} + +impl UnorderedPositiveBlobBuilder, Vec> { + pub fn memory() -> Self { + Self { + builder: PositiveBlobBuilder::memory(), + map: BTreeMap::new(), + } + } +} + +impl UnorderedPositiveBlobBuilder { + pub fn new(map_wtr: W, doc_wtr: X) -> Result> { + Ok(UnorderedPositiveBlobBuilder { + builder: PositiveBlobBuilder::new(map_wtr, doc_wtr)?, + map: BTreeMap::new(), + }) + } + + pub fn insert>>(&mut self, input: K, doc_index: DocIndex) { + self.map.entry(input.into()).or_insert_with(Vec::new).push(doc_index); + } + + pub fn finish(self) -> Result<(), Box> { + self.into_inner().map(drop) + } + + pub fn into_inner(mut self) -> Result<(W, X), Box> { + for (key, mut doc_indexes) in self.map { + doc_indexes.sort_unstable(); + self.builder.insert(&key, &doc_indexes)?; + } + self.builder.into_inner() + } +} diff --git a/src/index/update/positive/update.rs b/src/index/update/positive/update.rs new file mode 100644 index 000000000..8551867b1 --- /dev/null +++ b/src/index/update/positive/update.rs @@ -0,0 +1,110 @@ +use std::collections::BTreeMap; +use std::path::PathBuf; +use std::error::Error; + +use ::rocksdb::rocksdb_options; + +use crate::index::update::positive::unordered_builder::UnorderedPositiveBlobBuilder; +use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; +use crate::index::update::{Update, raw_document_key_attr}; +use crate::blob::positive::PositiveBlob; +use crate::tokenizer::TokenizerBuilder; +use crate::{DocumentId, DocIndex}; +use crate::index::DATA_INDEX; +use crate::blob::Blob; + +pub enum NewState { + Updated { + value: String, + props: SchemaProps, + }, + Removed, +} + +pub struct PositiveUpdateBuilder { + path: PathBuf, + schema: Schema, + tokenizer_builder: B, + new_states: BTreeMap<(DocumentId, SchemaAttr), NewState>, +} + +impl PositiveUpdateBuilder { + pub fn new>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder { + PositiveUpdateBuilder { + path: path.into(), + schema: schema, + tokenizer_builder: tokenizer_builder, + new_states: BTreeMap::new(), + } + } + + // TODO value must be a field that can be indexed + pub fn update_field(&mut self, id: DocumentId, field: SchemaAttr, value: String) { + let state = NewState::Updated { value, props: self.schema.props(field) }; + self.new_states.insert((id, field), state); + } + + pub fn remove_field(&mut self, id: DocumentId, field: SchemaAttr) { + self.new_states.insert((id, field), NewState::Removed); + } +} + +impl PositiveUpdateBuilder +where B: TokenizerBuilder +{ + pub fn build(self) -> Result> { + let env_options = rocksdb_options::EnvOptions::new(); + let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); + let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); + file_writer.open(&self.path.to_string_lossy())?; + + let mut builder = UnorderedPositiveBlobBuilder::memory(); + for ((document_id, attr), state) in &self.new_states { + let value = match state { + NewState::Updated { value, props } if props.is_indexed() => value, + _ => continue, + }; + + for (index, word) in self.tokenizer_builder.build(value) { + let doc_index = DocIndex { + document_id: *document_id, + attribute: attr.as_u32() as u8, + attribute_index: index as u32, + }; + + // insert the exact representation + let word_lower = word.to_lowercase(); + + // and the unidecoded lowercased version + let word_unidecoded = unidecode::unidecode(word).to_lowercase(); + if word_lower != word_unidecoded { + builder.insert(word_unidecoded, doc_index); + } + + builder.insert(word_lower, doc_index); + } + } + + let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; + let positive_blob = PositiveBlob::from_bytes(blob_fst_map, blob_doc_idx)?; + let blob = Blob::Positive(positive_blob); + + // write the data-index aka positive blob + let bytes = bincode::serialize(&blob)?; + file_writer.merge(DATA_INDEX, &bytes)?; + + // write all the documents fields updates + for ((id, attr), state) in self.new_states { + let key = raw_document_key_attr(id, attr); + match state { + NewState::Updated { value, props } => if props.is_stored() { + file_writer.put(&key, value.as_bytes())? + }, + NewState::Removed => file_writer.delete(&key)?, + } + } + + file_writer.finish()?; + Update::open(self.path) + } +} diff --git a/src/index/update/positive_update.rs b/src/index/update/positive_update.rs deleted file mode 100644 index 1e6d38316..000000000 --- a/src/index/update/positive_update.rs +++ /dev/null @@ -1,116 +0,0 @@ -use std::collections::BTreeMap; -use std::path::PathBuf; -use std::error::Error; - -use ::rocksdb::rocksdb_options; - -use crate::index::update::Update; -use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; -use crate::tokenizer::TokenizerBuilder; -use crate::DocumentId; - -pub enum NewState { - Updated { - value: String, - props: SchemaProps, - }, - Removed, -} - -pub struct PositiveUpdateBuilder { - path: PathBuf, - schema: Schema, - tokenizer_builder: B, - new_states: BTreeMap<(DocumentId, SchemaAttr), NewState>, -} - -impl PositiveUpdateBuilder { - pub fn new>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder { - PositiveUpdateBuilder { - path: path.into(), - schema: schema, - tokenizer_builder: tokenizer_builder, - new_states: BTreeMap::new(), - } - } - - // TODO value must be a field that can be indexed - pub fn update_field(&mut self, id: DocumentId, field: SchemaAttr, value: String) { - let state = NewState::Updated { value, props: self.schema.props(field) }; - self.new_states.insert((id, field), state); - } - - pub fn remove_field(&mut self, id: DocumentId, field: SchemaAttr) { - self.new_states.insert((id, field), NewState::Removed); - } -} - -impl PositiveUpdateBuilder -where B: TokenizerBuilder -{ - pub fn build(self) -> Result> { - let env_options = rocksdb_options::EnvOptions::new(); - let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); - let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); - file_writer.open(&self.path.to_string_lossy())?; - - // let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - // for ((document_id, field), state) in &self.new_states { - // let value = match state { - // NewState::Updated { value, props } if props.is_indexed() => value, - // _ => continue, - // }; - - // for (index, word) in self.tokenizer_builder.build(value) { - // let doc_index = DocIndex { - // document_id: *document_id, - // attribute: field.as_u32() as u8, - // attribute_index: index as u32, - // }; - // // insert the exact representation - // let word_lower = word.to_lowercase(); - - // // and the unidecoded lowercased version - // let word_unidecoded = unidecode::unidecode(word).to_lowercase(); - // if word_lower != word_unidecoded { - // builder.insert(word_unidecoded, doc_index); - // } - - // builder.insert(word_lower, doc_index); - // } - // } - // let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; - - // // write the doc-idx - // let blob_key = Identifier::blob(blob_info.name).document_indexes().build(); - // file_writer.put(&blob_key, &blob_doc_idx)?; - - // // write the fst - // let blob_key = Identifier::blob(blob_info.name).fst_map().build(); - // file_writer.put(&blob_key, &blob_fst_map)?; - - // { - // // write the blob name to be merged - // let mut buffer = Vec::new(); - // blob_info.write_into(&mut buffer); - // let data_key = Identifier::data().blobs_order().build(); - // file_writer.merge(&data_key, &buffer)?; - // } - - // // write all the documents fields updates - // for ((id, attr), state) in self.new_states { - // let key = Identifier::document(id).attribute(attr).build(); - // match state { - // NewState::Updated { value, props } => if props.is_stored() { - // file_writer.put(&key, value.as_bytes())? - // }, - // NewState::Removed => file_writer.delete(&key)?, - // } - // } - - // file_writer.finish()?; - // Update::open(self.path) - - unimplemented!() - } -}