diff --git a/src/database/mod.rs b/src/database/mod.rs index 701a7e23b..ddc7e128d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,5 @@ +use crate::DocumentId; +use crate::database::schema::SchemaAttr; use std::sync::Arc; use std::error::Error; use std::ffi::OsStr; @@ -12,6 +14,7 @@ use rocksdb::rocksdb::{Writable, Snapshot}; use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions}; use rocksdb::{DB, MergeOperands}; use lockfree::map::Map; +use hashbrown::HashMap; pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; @@ -20,8 +23,9 @@ pub use self::serde::SerializerError; pub use self::schema::Schema; pub use self::index::Index; -const DATA_INDEX: &[u8] = b"data-index"; -const DATA_SCHEMA: &[u8] = b"data-schema"; +const DATA_INDEX: &[u8] = b"data-index"; +const DATA_RANKED_MAP: &[u8] = b"data-ranked-map"; +const DATA_SCHEMA: &[u8] = b"data-schema"; pub mod schema; pub(crate) mod index; @@ -61,9 +65,17 @@ where D: Deref Ok(index) } -fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - assert_eq!(key, DATA_INDEX, "The merge operator only supports \"data-index\" merging"); +fn retrieve_data_ranked_map(snapshot: &Snapshot) +-> Result, Box> +where D: Deref +{ + match snapshot.get(DATA_RANKED_MAP)? { + Some(vector) => Ok(bincode::deserialize(&*vector)?), + None => Ok(HashMap::new()), + } +} +fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { let mut index: Option = None; for bytes in existing.into_iter().chain(operands) { let operand = Index::from_bytes(bytes.to_vec()).unwrap(); @@ -81,6 +93,28 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan bytes } +fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { + let mut ranked_map: Option> = None; + for bytes in existing.into_iter().chain(operands) { + let operand: HashMap<(DocumentId, SchemaAttr), i64> = bincode::deserialize(bytes).unwrap(); + match ranked_map { + Some(ref mut ranked_map) => ranked_map.extend(operand), + None => { ranked_map.replace(operand); }, + }; + } + + let ranked_map = ranked_map.unwrap_or_default(); + bincode::serialize(&ranked_map).unwrap() +} + +fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { + match key { + DATA_INDEX => merge_indexes(existing, operands), + DATA_RANKED_MAP => merge_ranked_maps(existing, operands), + key => panic!("The merge operator does not support merging {:?}", key), + } +} + pub struct IndexUpdate { index: String, update: Update, @@ -103,14 +137,14 @@ impl DerefMut for IndexUpdate { struct DatabaseIndex { db: Arc, - // This view is updated each time the DB ingests an update + // This view is updated each time the DB ingests an update. view: ArcCell>>, - // This path is the path to the mdb folder stored on disk + // The path of the mdb folder stored on disk. path: PathBuf, // must_die false by default, must be set as true when the Index is dropped. - // It's used to erase the folder saved on disk when the user request to delete an index + // It is used to erase the folder saved on disk when the user request to delete an index. must_die: AtomicBool, } @@ -128,7 +162,7 @@ impl DatabaseIndex { // opts.error_if_exists(true); // FIXME pull request that let mut cf_opts = ColumnFamilyOptions::new(); - cf_opts.add_merge_operator("data-index merge operator", merge_indexes); + cf_opts.add_merge_operator("data merge operator", merge_operator); let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?; @@ -156,7 +190,7 @@ impl DatabaseIndex { opts.create_if_missing(false); let mut cf_opts = ColumnFamilyOptions::new(); - cf_opts.add_merge_operator("data-index merge operator", merge_indexes); + cf_opts.add_merge_operator("data merge operator", merge_operator); let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?; diff --git a/src/database/schema.rs b/src/database/schema.rs index 5b4b48731..3a8878ee3 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -13,8 +13,9 @@ use crate::database::serde::find_id::FindDocumentIdSerializer; use crate::database::serde::SerializerError; use crate::DocumentId; -pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false }; -pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true }; +pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false, ranked: false }; +pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true, ranked: false }; +pub const RANKED: SchemaProps = SchemaProps { stored: false, indexed: false, ranked: true }; #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SchemaProps { @@ -23,6 +24,9 @@ pub struct SchemaProps { #[serde(default)] indexed: bool, + + #[serde(default)] + ranked: bool, } impl SchemaProps { @@ -33,6 +37,10 @@ impl SchemaProps { pub fn is_indexed(self) -> bool { self.indexed } + + pub fn is_ranked(self) -> bool { + self.ranked + } } impl BitOr for SchemaProps { @@ -42,6 +50,7 @@ impl BitOr for SchemaProps { SchemaProps { stored: self.stored | other.stored, indexed: self.indexed | other.indexed, + ranked: self.ranked | other.ranked, } } } @@ -185,7 +194,8 @@ impl Schema { } } -#[derive(Debug, Copy, Clone, PartialOrd, Ord, PartialEq, Eq)] +#[derive(Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct SchemaAttr(pub(crate) u16); impl SchemaAttr { diff --git a/src/database/serde/mod.rs b/src/database/serde/mod.rs index 2f9415c25..50a3c619e 100644 --- a/src/database/serde/mod.rs +++ b/src/database/serde/mod.rs @@ -17,6 +17,7 @@ macro_rules! forward_to_unserializable_type { pub mod find_id; pub mod key_to_string; +pub mod value_to_i64; pub mod serializer; pub mod indexer_serializer; pub mod deserializer; diff --git a/src/database/serde/serializer.rs b/src/database/serde/serializer.rs index d516be609..bc8b4d1ab 100644 --- a/src/database/serde/serializer.rs +++ b/src/database/serde/serializer.rs @@ -5,6 +5,7 @@ use serde::ser; use crate::database::serde::indexer_serializer::IndexerSerializer; use crate::database::serde::key_to_string::KeyToStringSerializer; +use crate::database::serde::value_to_i64::ValueToI64Serializer; use crate::database::update::DocumentUpdate; use crate::database::serde::SerializerError; use crate::tokenizer::TokenizerBuilder; @@ -155,8 +156,8 @@ where B: TokenizerBuilder { Ok(StructSerializer { schema: self.schema, - update: self.update, document_id: self.document_id, + update: self.update, tokenizer_builder: self.tokenizer_builder, stop_words: self.stop_words, }) @@ -229,6 +230,10 @@ where B: TokenizerBuilder }; value.serialize(serializer)?; } + if props.is_ranked() { + let integer = value.serialize(ValueToI64Serializer)?; + self.update.register_ranked_attribute(attr, integer)?; + } } Ok(()) @@ -276,6 +281,10 @@ where B: TokenizerBuilder }; value.serialize(serializer)?; } + if props.is_ranked() { + let integer = value.serialize(ValueToI64Serializer)?; + self.update.register_ranked_attribute(attr, integer)?; + } } Ok(()) diff --git a/src/database/serde/value_to_i64.rs b/src/database/serde/value_to_i64.rs new file mode 100644 index 000000000..9c046d391 --- /dev/null +++ b/src/database/serde/value_to_i64.rs @@ -0,0 +1,169 @@ +use serde::Serialize; +use serde::{ser, ser::Error}; + +use crate::database::serde::SerializerError; + +pub struct ValueToI64Serializer; + +impl ser::Serializer for ValueToI64Serializer { + type Ok = i64; + type Error = SerializerError; + type SerializeSeq = ser::Impossible; + type SerializeTuple = ser::Impossible; + type SerializeTupleStruct = ser::Impossible; + type SerializeTupleVariant = ser::Impossible; + type SerializeMap = ser::Impossible; + type SerializeStruct = ser::Impossible; + type SerializeStructVariant = ser::Impossible; + + forward_to_unserializable_type! { + bool => serialize_bool, + char => serialize_char, + + f32 => serialize_f32, + f64 => serialize_f64, + } + + fn serialize_i8(self, value: i8) -> Result { + Ok(i64::from(value)) + } + + fn serialize_i16(self, value: i16) -> Result { + Ok(i64::from(value)) + } + + fn serialize_i32(self, value: i32) -> Result { + Ok(i64::from(value)) + } + + fn serialize_i64(self, value: i64) -> Result { + Ok(i64::from(value)) + } + + fn serialize_u8(self, value: u8) -> Result { + Ok(i64::from(value)) + } + + fn serialize_u16(self, value: u16) -> Result { + Ok(i64::from(value)) + } + + fn serialize_u32(self, value: u32) -> Result { + Ok(i64::from(value)) + } + + fn serialize_u64(self, value: u64) -> Result { + // Ok(i64::from(value)) + unimplemented!() + } + + fn serialize_str(self, value: &str) -> Result { + i64::from_str_radix(value, 10).map_err(SerializerError::custom) + } + + fn serialize_bytes(self, _v: &[u8]) -> Result { + Err(SerializerError::UnserializableType { name: "&[u8]" }) + } + + fn serialize_none(self) -> Result { + Err(SerializerError::UnserializableType { name: "Option" }) + } + + fn serialize_some(self, _value: &T) -> Result + where T: Serialize, + { + Err(SerializerError::UnserializableType { name: "Option" }) + } + + fn serialize_unit(self) -> Result { + Err(SerializerError::UnserializableType { name: "()" }) + } + + fn serialize_unit_struct(self, _name: &'static str) -> Result { + Err(SerializerError::UnserializableType { name: "unit struct" }) + } + + fn serialize_unit_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str + ) -> Result + { + Err(SerializerError::UnserializableType { name: "unit variant" }) + } + + fn serialize_newtype_struct( + self, + _name: &'static str, + value: &T + ) -> Result + where T: Serialize, + { + value.serialize(self) + } + + fn serialize_newtype_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _value: &T + ) -> Result + where T: Serialize, + { + Err(SerializerError::UnserializableType { name: "newtype variant" }) + } + + fn serialize_seq(self, _len: Option) -> Result { + Err(SerializerError::UnserializableType { name: "sequence" }) + } + + fn serialize_tuple(self, _len: usize) -> Result { + Err(SerializerError::UnserializableType { name: "tuple" }) + } + + fn serialize_tuple_struct( + self, + _name: &'static str, + _len: usize + ) -> Result + { + Err(SerializerError::UnserializableType { name: "tuple struct" }) + } + + fn serialize_tuple_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _len: usize + ) -> Result + { + Err(SerializerError::UnserializableType { name: "tuple variant" }) + } + + fn serialize_map(self, _len: Option) -> Result { + Err(SerializerError::UnserializableType { name: "map" }) + } + + fn serialize_struct( + self, + _name: &'static str, + _len: usize + ) -> Result + { + Err(SerializerError::UnserializableType { name: "struct" }) + } + + fn serialize_struct_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _len: usize + ) -> Result + { + Err(SerializerError::UnserializableType { name: "struct variant" }) + } +} diff --git a/src/database/update.rs b/src/database/update.rs index 616c070e5..0c165550d 100644 --- a/src/database/update.rs +++ b/src/database/update.rs @@ -17,7 +17,7 @@ use crate::data::{DocIds, DocIndexes}; use crate::database::schema::Schema; use crate::database::index::Index; use crate::{DocumentId, DocIndex}; -use crate::database::DATA_INDEX; +use crate::database::{DATA_INDEX, DATA_RANKED_MAP}; pub type Token = Vec; // TODO could be replaced by a SmallVec @@ -78,6 +78,7 @@ use UpdateType::{Updated, Deleted}; pub struct RawUpdateBuilder { documents_update: HashMap, + documents_ranked_fields: HashMap<(DocumentId, SchemaAttr), i64>, indexed_words: BTreeMap>, batch: WriteBatch, } @@ -86,6 +87,7 @@ impl RawUpdateBuilder { pub fn new() -> RawUpdateBuilder { RawUpdateBuilder { documents_update: HashMap::new(), + documents_ranked_fields: HashMap::new(), indexed_words: BTreeMap::new(), batch: WriteBatch::new(), } @@ -137,9 +139,12 @@ impl RawUpdateBuilder { let index = Index { negative, positive }; // write the data-index - let mut bytes = Vec::new(); - index.write_to_bytes(&mut bytes); - self.batch.merge(DATA_INDEX, &bytes)?; + let mut bytes_index = Vec::new(); + index.write_to_bytes(&mut bytes_index); + self.batch.merge(DATA_INDEX, &bytes_index)?; + + let bytes_ranked_map = bincode::serialize(&self.documents_ranked_fields).unwrap(); + self.batch.merge(DATA_RANKED_MAP, &bytes_ranked_map)?; Ok(self.batch) } @@ -195,4 +200,23 @@ impl<'a> DocumentUpdate<'a> { Ok(()) } + + pub fn register_ranked_attribute( + &mut self, + attr: SchemaAttr, + integer: i64, + ) -> Result<(), SerializerError> + { + use serde::ser::Error; + + if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) { + return Err(SerializerError::custom( + "This document has already been deleted, ranked attributes cannot be added in the same update" + )); + } + + self.inner.documents_ranked_fields.insert((self.document_id, attr), integer); + + Ok(()) + } } diff --git a/src/database/view.rs b/src/database/view.rs index b9144a281..6f04ac4b1 100644 --- a/src/database/view.rs +++ b/src/database/view.rs @@ -1,3 +1,4 @@ +use hashbrown::HashMap; use std::error::Error; use std::path::Path; use std::ops::Deref; @@ -7,12 +8,13 @@ use rocksdb::rocksdb_options::{ReadOptions, EnvOptions, ColumnFamilyOptions}; use rocksdb::rocksdb::{DB, DBVector, Snapshot, SeekKey, SstFileWriter}; use serde::de::DeserializeOwned; -use crate::database::{DocumentKey, DocumentKeyAttr}; -use crate::database::{retrieve_data_schema, retrieve_data_index}; +use crate::database::{retrieve_data_schema, retrieve_data_index, retrieve_data_ranked_map}; use crate::database::serde::deserializer::Deserializer; +use crate::database::{DocumentKey, DocumentKeyAttr}; +use crate::rank::{QueryBuilder, FilterFunc}; +use crate::database::schema::SchemaAttr; use crate::database::schema::Schema; use crate::database::index::Index; -use crate::rank::{QueryBuilder, FilterFunc}; use crate::DocumentId; pub struct DatabaseView @@ -20,6 +22,7 @@ where D: Deref { snapshot: Snapshot, index: Index, + ranked_map: HashMap<(DocumentId, SchemaAttr), i64>, schema: Schema, } @@ -29,7 +32,8 @@ where D: Deref pub fn new(snapshot: Snapshot) -> Result, Box> { let schema = retrieve_data_schema(&snapshot)?; let index = retrieve_data_index(&snapshot)?; - Ok(DatabaseView { snapshot, index, schema }) + let ranked_map = retrieve_data_ranked_map(&snapshot)?; + Ok(DatabaseView { snapshot, index, ranked_map, schema }) } pub fn schema(&self) -> &Schema { @@ -40,6 +44,10 @@ where D: Deref &self.index } + pub fn ranked_map(&self) -> &HashMap<(DocumentId, SchemaAttr), i64> { + &self.ranked_map + } + pub fn into_snapshot(self) -> Snapshot { self.snapshot } diff --git a/src/lib.rs b/src/lib.rs index bfa0b3cd9..9c0641090 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,8 @@ pub mod rank; pub mod tokenizer; mod common_words; +use serde_derive::{Serialize, Deserialize}; + pub use rocksdb; pub use self::tokenizer::Tokenizer; @@ -16,6 +18,7 @@ pub use self::common_words::CommonWords; /// /// It is used to inform the database the document you want to deserialize. /// Helpful for custom ranking. +#[derive(Serialize, Deserialize)] #[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct DocumentId(u64);