feat: Add a new ranked attribute to the schema

This commit is contained in:
Clément Renault 2019-02-08 15:17:42 +01:00
parent 78908aa34e
commit 084c3a95b6
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
8 changed files with 279 additions and 21 deletions

View File

@ -1,3 +1,5 @@
use crate::DocumentId;
use crate::database::schema::SchemaAttr;
use std::sync::Arc;
use std::error::Error;
use std::ffi::OsStr;
@ -12,6 +14,7 @@ use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
use rocksdb::{DB, MergeOperands};
use lockfree::map::Map;
use hashbrown::HashMap;
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
pub use self::view::{DatabaseView, DocumentIter};
@ -21,6 +24,7 @@ pub use self::schema::Schema;
pub use self::index::Index;
const DATA_INDEX: &[u8] = b"data-index";
const DATA_RANKED_MAP: &[u8] = b"data-ranked-map";
const DATA_SCHEMA: &[u8] = b"data-schema";
pub mod schema;
@ -61,9 +65,17 @@ where D: Deref<Target=DB>
Ok(index)
}
fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
assert_eq!(key, DATA_INDEX, "The merge operator only supports \"data-index\" merging");
fn retrieve_data_ranked_map<D>(snapshot: &Snapshot<D>)
-> Result<HashMap<(DocumentId, SchemaAttr), i64>, Box<Error>>
where D: Deref<Target=DB>
{
match snapshot.get(DATA_RANKED_MAP)? {
Some(vector) => Ok(bincode::deserialize(&*vector)?),
None => Ok(HashMap::new()),
}
}
fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut index: Option<Index> = None;
for bytes in existing.into_iter().chain(operands) {
let operand = Index::from_bytes(bytes.to_vec()).unwrap();
@ -81,6 +93,28 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan
bytes
}
fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut ranked_map: Option<HashMap<_, _>> = None;
for bytes in existing.into_iter().chain(operands) {
let operand: HashMap<(DocumentId, SchemaAttr), i64> = bincode::deserialize(bytes).unwrap();
match ranked_map {
Some(ref mut ranked_map) => ranked_map.extend(operand),
None => { ranked_map.replace(operand); },
};
}
let ranked_map = ranked_map.unwrap_or_default();
bincode::serialize(&ranked_map).unwrap()
}
fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
match key {
DATA_INDEX => merge_indexes(existing, operands),
DATA_RANKED_MAP => merge_ranked_maps(existing, operands),
key => panic!("The merge operator does not support merging {:?}", key),
}
}
pub struct IndexUpdate {
index: String,
update: Update,
@ -103,14 +137,14 @@ impl DerefMut for IndexUpdate {
struct DatabaseIndex {
db: Arc<DB>,
// This view is updated each time the DB ingests an update
// This view is updated each time the DB ingests an update.
view: ArcCell<DatabaseView<Arc<DB>>>,
// This path is the path to the mdb folder stored on disk
// The path of the mdb folder stored on disk.
path: PathBuf,
// must_die false by default, must be set as true when the Index is dropped.
// It's used to erase the folder saved on disk when the user request to delete an index
// It is used to erase the folder saved on disk when the user request to delete an index.
must_die: AtomicBool,
}
@ -128,7 +162,7 @@ impl DatabaseIndex {
// opts.error_if_exists(true); // FIXME pull request that
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
cf_opts.add_merge_operator("data merge operator", merge_operator);
let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?;
@ -156,7 +190,7 @@ impl DatabaseIndex {
opts.create_if_missing(false);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
cf_opts.add_merge_operator("data merge operator", merge_operator);
let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?;

View File

@ -13,8 +13,9 @@ use crate::database::serde::find_id::FindDocumentIdSerializer;
use crate::database::serde::SerializerError;
use crate::DocumentId;
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false };
pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true };
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false, ranked: false };
pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true, ranked: false };
pub const RANKED: SchemaProps = SchemaProps { stored: false, indexed: false, ranked: true };
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchemaProps {
@ -23,6 +24,9 @@ pub struct SchemaProps {
#[serde(default)]
indexed: bool,
#[serde(default)]
ranked: bool,
}
impl SchemaProps {
@ -33,6 +37,10 @@ impl SchemaProps {
pub fn is_indexed(self) -> bool {
self.indexed
}
pub fn is_ranked(self) -> bool {
self.ranked
}
}
impl BitOr for SchemaProps {
@ -42,6 +50,7 @@ impl BitOr for SchemaProps {
SchemaProps {
stored: self.stored | other.stored,
indexed: self.indexed | other.indexed,
ranked: self.ranked | other.ranked,
}
}
}
@ -185,7 +194,8 @@ impl Schema {
}
}
#[derive(Debug, Copy, Clone, PartialOrd, Ord, PartialEq, Eq)]
#[derive(Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct SchemaAttr(pub(crate) u16);
impl SchemaAttr {

View File

@ -17,6 +17,7 @@ macro_rules! forward_to_unserializable_type {
pub mod find_id;
pub mod key_to_string;
pub mod value_to_i64;
pub mod serializer;
pub mod indexer_serializer;
pub mod deserializer;

View File

@ -5,6 +5,7 @@ use serde::ser;
use crate::database::serde::indexer_serializer::IndexerSerializer;
use crate::database::serde::key_to_string::KeyToStringSerializer;
use crate::database::serde::value_to_i64::ValueToI64Serializer;
use crate::database::update::DocumentUpdate;
use crate::database::serde::SerializerError;
use crate::tokenizer::TokenizerBuilder;
@ -155,8 +156,8 @@ where B: TokenizerBuilder
{
Ok(StructSerializer {
schema: self.schema,
update: self.update,
document_id: self.document_id,
update: self.update,
tokenizer_builder: self.tokenizer_builder,
stop_words: self.stop_words,
})
@ -229,6 +230,10 @@ where B: TokenizerBuilder
};
value.serialize(serializer)?;
}
if props.is_ranked() {
let integer = value.serialize(ValueToI64Serializer)?;
self.update.register_ranked_attribute(attr, integer)?;
}
}
Ok(())
@ -276,6 +281,10 @@ where B: TokenizerBuilder
};
value.serialize(serializer)?;
}
if props.is_ranked() {
let integer = value.serialize(ValueToI64Serializer)?;
self.update.register_ranked_attribute(attr, integer)?;
}
}
Ok(())

View File

@ -0,0 +1,169 @@
use serde::Serialize;
use serde::{ser, ser::Error};
use crate::database::serde::SerializerError;
pub struct ValueToI64Serializer;
impl ser::Serializer for ValueToI64Serializer {
type Ok = i64;
type Error = SerializerError;
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
type SerializeStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
forward_to_unserializable_type! {
bool => serialize_bool,
char => serialize_char,
f32 => serialize_f32,
f64 => serialize_f64,
}
fn serialize_i8(self, value: i8) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_i16(self, value: i16) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_i32(self, value: i32) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_i64(self, value: i64) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_u8(self, value: u8) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_u16(self, value: u16) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_u32(self, value: u32) -> Result<Self::Ok, Self::Error> {
Ok(i64::from(value))
}
fn serialize_u64(self, value: u64) -> Result<Self::Ok, Self::Error> {
// Ok(i64::from(value))
unimplemented!()
}
fn serialize_str(self, value: &str) -> Result<Self::Ok, Self::Error> {
i64::from_str_radix(value, 10).map_err(SerializerError::custom)
}
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "&[u8]" })
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "()" })
}
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "unit struct" })
}
fn serialize_unit_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str
) -> Result<Self::Ok, Self::Error>
{
Err(SerializerError::UnserializableType { name: "unit variant" })
}
fn serialize_newtype_struct<T: ?Sized>(
self,
_name: &'static str,
value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
value.serialize(self)
}
fn serialize_newtype_variant<T: ?Sized>(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "newtype variant" })
}
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Err(SerializerError::UnserializableType { name: "sequence" })
}
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
Err(SerializerError::UnserializableType { name: "tuple" })
}
fn serialize_tuple_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeTupleStruct, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple struct" })
}
fn serialize_tuple_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeTupleVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple variant" })
}
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Err(SerializerError::UnserializableType { name: "map" })
}
fn serialize_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeStruct, Self::Error>
{
Err(SerializerError::UnserializableType { name: "struct" })
}
fn serialize_struct_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeStructVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "struct variant" })
}
}

View File

@ -17,7 +17,7 @@ use crate::data::{DocIds, DocIndexes};
use crate::database::schema::Schema;
use crate::database::index::Index;
use crate::{DocumentId, DocIndex};
use crate::database::DATA_INDEX;
use crate::database::{DATA_INDEX, DATA_RANKED_MAP};
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
@ -78,6 +78,7 @@ use UpdateType::{Updated, Deleted};
pub struct RawUpdateBuilder {
documents_update: HashMap<DocumentId, UpdateType>,
documents_ranked_fields: HashMap<(DocumentId, SchemaAttr), i64>,
indexed_words: BTreeMap<Token, Vec<DocIndex>>,
batch: WriteBatch,
}
@ -86,6 +87,7 @@ impl RawUpdateBuilder {
pub fn new() -> RawUpdateBuilder {
RawUpdateBuilder {
documents_update: HashMap::new(),
documents_ranked_fields: HashMap::new(),
indexed_words: BTreeMap::new(),
batch: WriteBatch::new(),
}
@ -137,9 +139,12 @@ impl RawUpdateBuilder {
let index = Index { negative, positive };
// write the data-index
let mut bytes = Vec::new();
index.write_to_bytes(&mut bytes);
self.batch.merge(DATA_INDEX, &bytes)?;
let mut bytes_index = Vec::new();
index.write_to_bytes(&mut bytes_index);
self.batch.merge(DATA_INDEX, &bytes_index)?;
let bytes_ranked_map = bincode::serialize(&self.documents_ranked_fields).unwrap();
self.batch.merge(DATA_RANKED_MAP, &bytes_ranked_map)?;
Ok(self.batch)
}
@ -195,4 +200,23 @@ impl<'a> DocumentUpdate<'a> {
Ok(())
}
pub fn register_ranked_attribute(
&mut self,
attr: SchemaAttr,
integer: i64,
) -> Result<(), SerializerError>
{
use serde::ser::Error;
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
return Err(SerializerError::custom(
"This document has already been deleted, ranked attributes cannot be added in the same update"
));
}
self.inner.documents_ranked_fields.insert((self.document_id, attr), integer);
Ok(())
}
}

View File

@ -1,3 +1,4 @@
use hashbrown::HashMap;
use std::error::Error;
use std::path::Path;
use std::ops::Deref;
@ -7,12 +8,13 @@ use rocksdb::rocksdb_options::{ReadOptions, EnvOptions, ColumnFamilyOptions};
use rocksdb::rocksdb::{DB, DBVector, Snapshot, SeekKey, SstFileWriter};
use serde::de::DeserializeOwned;
use crate::database::{DocumentKey, DocumentKeyAttr};
use crate::database::{retrieve_data_schema, retrieve_data_index};
use crate::database::{retrieve_data_schema, retrieve_data_index, retrieve_data_ranked_map};
use crate::database::serde::deserializer::Deserializer;
use crate::database::{DocumentKey, DocumentKeyAttr};
use crate::rank::{QueryBuilder, FilterFunc};
use crate::database::schema::SchemaAttr;
use crate::database::schema::Schema;
use crate::database::index::Index;
use crate::rank::{QueryBuilder, FilterFunc};
use crate::DocumentId;
pub struct DatabaseView<D>
@ -20,6 +22,7 @@ where D: Deref<Target=DB>
{
snapshot: Snapshot<D>,
index: Index,
ranked_map: HashMap<(DocumentId, SchemaAttr), i64>,
schema: Schema,
}
@ -29,7 +32,8 @@ where D: Deref<Target=DB>
pub fn new(snapshot: Snapshot<D>) -> Result<DatabaseView<D>, Box<Error>> {
let schema = retrieve_data_schema(&snapshot)?;
let index = retrieve_data_index(&snapshot)?;
Ok(DatabaseView { snapshot, index, schema })
let ranked_map = retrieve_data_ranked_map(&snapshot)?;
Ok(DatabaseView { snapshot, index, ranked_map, schema })
}
pub fn schema(&self) -> &Schema {
@ -40,6 +44,10 @@ where D: Deref<Target=DB>
&self.index
}
pub fn ranked_map(&self) -> &HashMap<(DocumentId, SchemaAttr), i64> {
&self.ranked_map
}
pub fn into_snapshot(self) -> Snapshot<D> {
self.snapshot
}

View File

@ -7,6 +7,8 @@ pub mod rank;
pub mod tokenizer;
mod common_words;
use serde_derive::{Serialize, Deserialize};
pub use rocksdb;
pub use self::tokenizer::Tokenizer;
@ -16,6 +18,7 @@ pub use self::common_words::CommonWords;
///
/// It is used to inform the database the document you want to deserialize.
/// Helpful for custom ranking.
#[derive(Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct DocumentId(u64);