mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-30 00:34:26 +01:00
Merge pull request #46 from Kerollmops/schema-considers-id
Schema considers document ids
This commit is contained in:
commit
e3bfb866e5
@ -1,16 +1,13 @@
|
|||||||
use std::collections::hash_map::DefaultHasher;
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::hash::{Hash, Hasher};
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
||||||
use serde_derive::{Serialize, Deserialize};
|
use serde_derive::{Serialize, Deserialize};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
||||||
use meilidb::database::schema::{Schema, SchemaBuilder, STORED, INDEXED};
|
use meilidb::database::schema::{Schema, SchemaBuilder, STORED, INDEXED};
|
||||||
use meilidb::database::update::PositiveUpdateBuilder;
|
use meilidb::database::PositiveUpdateBuilder;
|
||||||
use meilidb::tokenizer::DefaultBuilder;
|
use meilidb::tokenizer::DefaultBuilder;
|
||||||
use meilidb::database::Database;
|
use meilidb::database::Database;
|
||||||
use meilidb::DocumentId;
|
|
||||||
|
|
||||||
#[derive(Debug, StructOpt)]
|
#[derive(Debug, StructOpt)]
|
||||||
pub struct Opt {
|
pub struct Opt {
|
||||||
@ -31,14 +28,8 @@ struct Document<'a> {
|
|||||||
image: &'a str,
|
image: &'a str,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn calculate_hash<T: Hash>(t: &T) -> u64 {
|
|
||||||
let mut s = DefaultHasher::new();
|
|
||||||
t.hash(&mut s);
|
|
||||||
s.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_schema() -> Schema {
|
fn create_schema() -> Schema {
|
||||||
let mut schema = SchemaBuilder::new();
|
let mut schema = SchemaBuilder::with_identifier("id");
|
||||||
schema.new_attribute("id", STORED);
|
schema.new_attribute("id", STORED);
|
||||||
schema.new_attribute("title", STORED | INDEXED);
|
schema.new_attribute("title", STORED | INDEXED);
|
||||||
schema.new_attribute("description", STORED | INDEXED);
|
schema.new_attribute("description", STORED | INDEXED);
|
||||||
@ -68,8 +59,7 @@ fn index(schema: Schema, database_path: &Path, csv_data_path: &Path) -> Result<D
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let document_id = DocumentId(calculate_hash(&document.id));
|
update.update(&document).unwrap();
|
||||||
update.update(document_id, &document).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut update = update.build()?;
|
let mut update = update.build()?;
|
||||||
|
@ -88,7 +88,7 @@ fn main() -> Result<(), Box<Error>> {
|
|||||||
|
|
||||||
let number_of_documents = documents.len();
|
let number_of_documents = documents.len();
|
||||||
for doc in documents {
|
for doc in documents {
|
||||||
match view.retrieve_document::<Document>(doc.id) {
|
match view.document_by_id::<Document>(doc.id) {
|
||||||
Ok(document) => {
|
Ok(document) => {
|
||||||
|
|
||||||
print!("title: ");
|
print!("title: ");
|
||||||
|
233
src/database/database.rs
Normal file
233
src/database/database.rs
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
|
||||||
|
use std::error::Error;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions};
|
||||||
|
use rocksdb::rocksdb::{Writable, Snapshot};
|
||||||
|
use rocksdb::{DB, DBVector, MergeOperands};
|
||||||
|
|
||||||
|
use crate::database::{DatabaseView, Update, Schema};
|
||||||
|
use crate::database::{DATA_INDEX, DATA_SCHEMA};
|
||||||
|
use crate::database::blob::{self, Blob};
|
||||||
|
|
||||||
|
pub struct Database {
|
||||||
|
// DB is under a Mutex to sync update ingestions and separate DB update locking
|
||||||
|
// and DatabaseView acquiring locking in other words:
|
||||||
|
// "Block readers the minimum possible amount of time"
|
||||||
|
db: Mutex<Arc<DB>>,
|
||||||
|
|
||||||
|
// This view is updated each time the DB ingests an update
|
||||||
|
view: RwLock<DatabaseView<Arc<DB>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Database {
|
||||||
|
pub fn create<P: AsRef<Path>>(path: P, schema: Schema) -> Result<Database, Box<Error>> {
|
||||||
|
let path = path.as_ref();
|
||||||
|
if path.exists() {
|
||||||
|
return Err(format!("File already exists at path: {}, cannot create database.",
|
||||||
|
path.display()).into())
|
||||||
|
}
|
||||||
|
|
||||||
|
let path = path.to_string_lossy();
|
||||||
|
let mut opts = DBOptions::new();
|
||||||
|
opts.create_if_missing(true);
|
||||||
|
// opts.error_if_exists(true); // FIXME pull request that
|
||||||
|
|
||||||
|
let mut cf_opts = ColumnFamilyOptions::new();
|
||||||
|
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
|
||||||
|
|
||||||
|
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
|
||||||
|
|
||||||
|
let mut schema_bytes = Vec::new();
|
||||||
|
schema.write_to(&mut schema_bytes)?;
|
||||||
|
db.put(DATA_SCHEMA, &schema_bytes)?;
|
||||||
|
|
||||||
|
let db = Arc::new(db);
|
||||||
|
let snapshot = Snapshot::new(db.clone());
|
||||||
|
let view = RwLock::new(DatabaseView::new(snapshot)?);
|
||||||
|
|
||||||
|
Ok(Database { db: Mutex::new(db), view })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
|
||||||
|
let path = path.as_ref().to_string_lossy();
|
||||||
|
|
||||||
|
let mut opts = DBOptions::new();
|
||||||
|
opts.create_if_missing(false);
|
||||||
|
|
||||||
|
let mut cf_opts = ColumnFamilyOptions::new();
|
||||||
|
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
|
||||||
|
|
||||||
|
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
|
||||||
|
|
||||||
|
// FIXME create a generic function to do that !
|
||||||
|
let _schema = match db.get(DATA_SCHEMA)? {
|
||||||
|
Some(value) => Schema::read_from(&*value)?,
|
||||||
|
None => return Err(String::from("Database does not contain a schema").into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = Arc::new(db);
|
||||||
|
let snapshot = Snapshot::new(db.clone());
|
||||||
|
let view = RwLock::new(DatabaseView::new(snapshot)?);
|
||||||
|
|
||||||
|
Ok(Database { db: Mutex::new(db), view })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ingest_update_file(&self, update: Update) -> Result<(), Box<Error>> {
|
||||||
|
let snapshot = {
|
||||||
|
// We must have a mutex here to ensure that update ingestions and compactions
|
||||||
|
// are done atomatically and in the right order.
|
||||||
|
// This way update ingestions will block other update ingestions without blocking view
|
||||||
|
// creations while doing the "data-index" compaction
|
||||||
|
let db = match self.db.lock() {
|
||||||
|
Ok(db) => db,
|
||||||
|
Err(e) => return Err(e.to_string().into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let move_update = update.can_be_moved();
|
||||||
|
let path = update.into_path_buf();
|
||||||
|
let path = path.to_string_lossy();
|
||||||
|
|
||||||
|
let mut options = IngestExternalFileOptions::new();
|
||||||
|
options.move_files(move_update);
|
||||||
|
|
||||||
|
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
|
||||||
|
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
|
||||||
|
|
||||||
|
// Compacting to trigger the merge operator only one time
|
||||||
|
// while ingesting the update and not each time searching
|
||||||
|
db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX));
|
||||||
|
|
||||||
|
Snapshot::new(db.clone())
|
||||||
|
};
|
||||||
|
|
||||||
|
// Here we will block the view creation for the minimum amount of time:
|
||||||
|
// updating the DatabaseView itself with the new database snapshot
|
||||||
|
let view = DatabaseView::new(snapshot)?;
|
||||||
|
match self.view.write() {
|
||||||
|
Ok(mut lock) => *lock = view,
|
||||||
|
Err(e) => return Err(e.to_string().into()),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> {
|
||||||
|
self.view().get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush(&self) -> Result<(), Box<Error>> {
|
||||||
|
match self.db.lock() {
|
||||||
|
Ok(db) => Ok(db.flush(true)?),
|
||||||
|
Err(e) => Err(e.to_string().into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn view(&self) -> RwLockReadGuard<DatabaseView<Arc<DB>>> {
|
||||||
|
self.view.read().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
||||||
|
if key != DATA_INDEX {
|
||||||
|
panic!("The merge operator only supports \"data-index\" merging")
|
||||||
|
}
|
||||||
|
|
||||||
|
let capacity = {
|
||||||
|
let remaining = operands.size_hint().0;
|
||||||
|
let already_exist = usize::from(existing_value.is_some());
|
||||||
|
remaining + already_exist
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut op = blob::OpBuilder::with_capacity(capacity);
|
||||||
|
if let Some(existing_value) = existing_value {
|
||||||
|
let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index");
|
||||||
|
op.push(Blob::Positive(blob));
|
||||||
|
}
|
||||||
|
|
||||||
|
for bytes in operands {
|
||||||
|
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blob");
|
||||||
|
op.push(blob);
|
||||||
|
}
|
||||||
|
|
||||||
|
let blob = op.merge().expect("BUG: could not merge blobs");
|
||||||
|
bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
use serde_derive::{Serialize, Deserialize};
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
use crate::tokenizer::DefaultBuilder;
|
||||||
|
use crate::database::update::PositiveUpdateBuilder;
|
||||||
|
use crate::database::schema::{SchemaBuilder, STORED, INDEXED};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ingest_update_file() -> Result<(), Box<Error>> {
|
||||||
|
let dir = tempdir()?;
|
||||||
|
|
||||||
|
let rocksdb_path = dir.path().join("rocksdb.rdb");
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct SimpleDoc {
|
||||||
|
id: u64,
|
||||||
|
title: String,
|
||||||
|
description: String,
|
||||||
|
timestamp: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
let schema = {
|
||||||
|
let mut builder = SchemaBuilder::with_identifier("id");
|
||||||
|
builder.new_attribute("id", STORED);
|
||||||
|
builder.new_attribute("title", STORED | INDEXED);
|
||||||
|
builder.new_attribute("description", STORED | INDEXED);
|
||||||
|
builder.new_attribute("timestamp", STORED);
|
||||||
|
builder.build()
|
||||||
|
};
|
||||||
|
|
||||||
|
let database = Database::create(&rocksdb_path, schema.clone())?;
|
||||||
|
let tokenizer_builder = DefaultBuilder::new();
|
||||||
|
|
||||||
|
let update_path = dir.path().join("update.sst");
|
||||||
|
|
||||||
|
let doc0 = SimpleDoc {
|
||||||
|
id: 0,
|
||||||
|
title: String::from("I am a title"),
|
||||||
|
description: String::from("I am a description"),
|
||||||
|
timestamp: 1234567,
|
||||||
|
};
|
||||||
|
let doc1 = SimpleDoc {
|
||||||
|
id: 1,
|
||||||
|
title: String::from("I am the second title"),
|
||||||
|
description: String::from("I am the second description"),
|
||||||
|
timestamp: 7654321,
|
||||||
|
};
|
||||||
|
|
||||||
|
let docid0;
|
||||||
|
let docid1;
|
||||||
|
let mut update = {
|
||||||
|
let mut builder = PositiveUpdateBuilder::new(update_path, schema, tokenizer_builder);
|
||||||
|
|
||||||
|
docid0 = builder.update(&doc0).unwrap();
|
||||||
|
docid1 = builder.update(&doc1).unwrap();
|
||||||
|
|
||||||
|
builder.build()?
|
||||||
|
};
|
||||||
|
|
||||||
|
update.set_move(true);
|
||||||
|
database.ingest_update_file(update)?;
|
||||||
|
let view = database.view();
|
||||||
|
|
||||||
|
let de_doc0: SimpleDoc = view.document_by_id(docid0)?;
|
||||||
|
let de_doc1: SimpleDoc = view.document_by_id(docid1)?;
|
||||||
|
|
||||||
|
assert_eq!(doc0, de_doc0);
|
||||||
|
assert_eq!(doc1, de_doc1);
|
||||||
|
|
||||||
|
Ok(dir.close()?)
|
||||||
|
}
|
||||||
|
}
|
@ -75,15 +75,14 @@ where D: Deref<Target=DB>
|
|||||||
QueryBuilder::new(self)
|
QueryBuilder::new(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO create an enum error type
|
pub fn document_by_id<T>(&self, id: DocumentId) -> Result<T, Box<Error>>
|
||||||
pub fn retrieve_document<T>(&self, id: DocumentId) -> Result<T, Box<Error>>
|
|
||||||
where T: DeserializeOwned
|
where T: DeserializeOwned
|
||||||
{
|
{
|
||||||
let mut deserializer = Deserializer::new(&self.snapshot, &self.schema, id);
|
let mut deserializer = Deserializer::new(&self.snapshot, &self.schema, id);
|
||||||
Ok(T::deserialize(&mut deserializer)?)
|
Ok(T::deserialize(&mut deserializer)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn retrieve_documents<T, I>(&self, ids: I) -> DocumentIter<D, T, I::IntoIter>
|
pub fn documents_by_id<T, I>(&self, ids: I) -> DocumentIter<D, T, I::IntoIter>
|
||||||
where T: DeserializeOwned,
|
where T: DeserializeOwned,
|
||||||
I: IntoIterator<Item=DocumentId>,
|
I: IntoIterator<Item=DocumentId>,
|
||||||
{
|
{
|
||||||
@ -149,7 +148,7 @@ where D: Deref<Target=DB>,
|
|||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
match self.document_ids.next() {
|
match self.document_ids.next() {
|
||||||
Some(id) => Some(self.database_view.retrieve_document(id)),
|
Some(id) => Some(self.database_view.document_by_id(id)),
|
||||||
None => None
|
None => None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,7 +167,7 @@ where D: Deref<Target=DB>,
|
|||||||
{
|
{
|
||||||
fn next_back(&mut self) -> Option<Self::Item> {
|
fn next_back(&mut self) -> Option<Self::Item> {
|
||||||
match self.document_ids.next_back() {
|
match self.document_ids.next_back() {
|
||||||
Some(id) => Some(self.database_view.retrieve_document(id)),
|
Some(id) => Some(self.database_view.document_by_id(id)),
|
||||||
None => None
|
None => None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,30 +1,48 @@
|
|||||||
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::path::Path;
|
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
||||||
use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions};
|
use rocksdb::rocksdb::{DB, Snapshot};
|
||||||
use rocksdb::rocksdb::{Writable, Snapshot};
|
|
||||||
use rocksdb::{DB, DBVector, MergeOperands};
|
|
||||||
|
|
||||||
|
pub use self::update::{
|
||||||
|
Update, PositiveUpdateBuilder, NewState,
|
||||||
|
SerializerError, NegativeUpdateBuilder
|
||||||
|
};
|
||||||
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
||||||
pub use self::database_view::{DatabaseView, DocumentIter};
|
pub use self::database_view::{DatabaseView, DocumentIter};
|
||||||
|
pub use self::database::Database;
|
||||||
|
pub use self::schema::Schema;
|
||||||
use self::blob::positive::PositiveBlob;
|
use self::blob::positive::PositiveBlob;
|
||||||
use self::update::Update;
|
|
||||||
use self::schema::Schema;
|
|
||||||
use self::blob::Blob;
|
|
||||||
|
|
||||||
pub mod blob;
|
|
||||||
pub mod schema;
|
|
||||||
pub mod update;
|
|
||||||
mod document_key;
|
|
||||||
mod database_view;
|
|
||||||
mod deserializer;
|
|
||||||
|
|
||||||
const DATA_INDEX: &[u8] = b"data-index";
|
const DATA_INDEX: &[u8] = b"data-index";
|
||||||
const DATA_SCHEMA: &[u8] = b"data-schema";
|
const DATA_SCHEMA: &[u8] = b"data-schema";
|
||||||
|
|
||||||
pub fn retrieve_data_schema<D>(snapshot: &Snapshot<D>) -> Result<Schema, Box<Error>>
|
macro_rules! forward_to_unserializable_type {
|
||||||
|
($($ty:ident => $se_method:ident,)*) => {
|
||||||
|
$(
|
||||||
|
fn $se_method(self, _v: $ty) -> Result<Self::Ok, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "$ty" })
|
||||||
|
}
|
||||||
|
)*
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod blob;
|
||||||
|
pub mod schema;
|
||||||
|
mod update;
|
||||||
|
mod database;
|
||||||
|
mod document_key;
|
||||||
|
mod database_view;
|
||||||
|
mod deserializer;
|
||||||
|
|
||||||
|
fn calculate_hash<T: Hash>(t: &T) -> u64 {
|
||||||
|
let mut s = DefaultHasher::new();
|
||||||
|
t.hash(&mut s);
|
||||||
|
s.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn retrieve_data_schema<D>(snapshot: &Snapshot<D>) -> Result<Schema, Box<Error>>
|
||||||
where D: Deref<Target=DB>
|
where D: Deref<Target=DB>
|
||||||
{
|
{
|
||||||
match snapshot.get(DATA_SCHEMA)? {
|
match snapshot.get(DATA_SCHEMA)? {
|
||||||
@ -33,7 +51,7 @@ where D: Deref<Target=DB>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<PositiveBlob, Box<Error>>
|
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<PositiveBlob, Box<Error>>
|
||||||
where D: Deref<Target=DB>
|
where D: Deref<Target=DB>
|
||||||
{
|
{
|
||||||
match snapshot.get(DATA_INDEX)? {
|
match snapshot.get(DATA_INDEX)? {
|
||||||
@ -41,220 +59,3 @@ where D: Deref<Target=DB>
|
|||||||
None => Ok(PositiveBlob::default()),
|
None => Ok(PositiveBlob::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Database {
|
|
||||||
// DB is under a Mutex to sync update ingestions and separate DB update locking
|
|
||||||
// and DatabaseView acquiring locking in other words:
|
|
||||||
// "Block readers the minimum possible amount of time"
|
|
||||||
db: Mutex<Arc<DB>>,
|
|
||||||
|
|
||||||
// This view is updated each time the DB ingests an update
|
|
||||||
view: RwLock<DatabaseView<Arc<DB>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Database {
|
|
||||||
pub fn create<P: AsRef<Path>>(path: P, schema: Schema) -> Result<Database, Box<Error>> {
|
|
||||||
let path = path.as_ref();
|
|
||||||
if path.exists() {
|
|
||||||
return Err(format!("File already exists at path: {}, cannot create database.",
|
|
||||||
path.display()).into())
|
|
||||||
}
|
|
||||||
|
|
||||||
let path = path.to_string_lossy();
|
|
||||||
let mut opts = DBOptions::new();
|
|
||||||
opts.create_if_missing(true);
|
|
||||||
// opts.error_if_exists(true); // FIXME pull request that
|
|
||||||
|
|
||||||
let mut cf_opts = ColumnFamilyOptions::new();
|
|
||||||
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
|
|
||||||
|
|
||||||
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
|
|
||||||
|
|
||||||
let mut schema_bytes = Vec::new();
|
|
||||||
schema.write_to(&mut schema_bytes)?;
|
|
||||||
db.put(DATA_SCHEMA, &schema_bytes)?;
|
|
||||||
|
|
||||||
let db = Arc::new(db);
|
|
||||||
let snapshot = Snapshot::new(db.clone());
|
|
||||||
let view = RwLock::new(DatabaseView::new(snapshot)?);
|
|
||||||
|
|
||||||
Ok(Database { db: Mutex::new(db), view })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
|
|
||||||
let path = path.as_ref().to_string_lossy();
|
|
||||||
|
|
||||||
let mut opts = DBOptions::new();
|
|
||||||
opts.create_if_missing(false);
|
|
||||||
|
|
||||||
let mut cf_opts = ColumnFamilyOptions::new();
|
|
||||||
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
|
|
||||||
|
|
||||||
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
|
|
||||||
|
|
||||||
// FIXME create a generic function to do that !
|
|
||||||
let _schema = match db.get(DATA_SCHEMA)? {
|
|
||||||
Some(value) => Schema::read_from(&*value)?,
|
|
||||||
None => return Err(String::from("Database does not contain a schema").into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let db = Arc::new(db);
|
|
||||||
let snapshot = Snapshot::new(db.clone());
|
|
||||||
let view = RwLock::new(DatabaseView::new(snapshot)?);
|
|
||||||
|
|
||||||
Ok(Database { db: Mutex::new(db), view })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ingest_update_file(&self, update: Update) -> Result<(), Box<Error>> {
|
|
||||||
let snapshot = {
|
|
||||||
// We must have a mutex here to ensure that update ingestions and compactions
|
|
||||||
// are done atomatically and in the right order.
|
|
||||||
// This way update ingestions will block other update ingestions without blocking view
|
|
||||||
// creations while doing the "data-index" compaction
|
|
||||||
let db = match self.db.lock() {
|
|
||||||
Ok(db) => db,
|
|
||||||
Err(e) => return Err(e.to_string().into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let move_update = update.can_be_moved();
|
|
||||||
let path = update.into_path_buf();
|
|
||||||
let path = path.to_string_lossy();
|
|
||||||
|
|
||||||
let mut options = IngestExternalFileOptions::new();
|
|
||||||
options.move_files(move_update);
|
|
||||||
|
|
||||||
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
|
|
||||||
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
|
|
||||||
|
|
||||||
// Compacting to trigger the merge operator only one time
|
|
||||||
// while ingesting the update and not each time searching
|
|
||||||
db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX));
|
|
||||||
|
|
||||||
Snapshot::new(db.clone())
|
|
||||||
};
|
|
||||||
|
|
||||||
// Here we will block the view creation for the minimum amount of time:
|
|
||||||
// updating the DatabaseView itself with the new database snapshot
|
|
||||||
let view = DatabaseView::new(snapshot)?;
|
|
||||||
match self.view.write() {
|
|
||||||
Ok(mut lock) => *lock = view,
|
|
||||||
Err(e) => return Err(e.to_string().into()),
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> {
|
|
||||||
self.view().get(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn flush(&self) -> Result<(), Box<Error>> {
|
|
||||||
match self.db.lock() {
|
|
||||||
Ok(db) => Ok(db.flush(true)?),
|
|
||||||
Err(e) => Err(e.to_string().into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn view(&self) -> RwLockReadGuard<DatabaseView<Arc<DB>>> {
|
|
||||||
self.view.read().unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
|
||||||
if key != DATA_INDEX {
|
|
||||||
panic!("The merge operator only supports \"data-index\" merging")
|
|
||||||
}
|
|
||||||
|
|
||||||
let capacity = {
|
|
||||||
let remaining = operands.size_hint().0;
|
|
||||||
let already_exist = usize::from(existing_value.is_some());
|
|
||||||
remaining + already_exist
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut op = blob::OpBuilder::with_capacity(capacity);
|
|
||||||
if let Some(existing_value) = existing_value {
|
|
||||||
let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index");
|
|
||||||
op.push(Blob::Positive(blob));
|
|
||||||
}
|
|
||||||
|
|
||||||
for bytes in operands {
|
|
||||||
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blob");
|
|
||||||
op.push(blob);
|
|
||||||
}
|
|
||||||
|
|
||||||
let blob = op.merge().expect("BUG: could not merge blobs");
|
|
||||||
bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::error::Error;
|
|
||||||
|
|
||||||
use serde_derive::{Serialize, Deserialize};
|
|
||||||
use tempfile::tempdir;
|
|
||||||
|
|
||||||
use crate::DocumentId;
|
|
||||||
use crate::tokenizer::DefaultBuilder;
|
|
||||||
use crate::database::update::PositiveUpdateBuilder;
|
|
||||||
use crate::database::schema::{SchemaBuilder, STORED, INDEXED};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn ingest_update_file() -> Result<(), Box<Error>> {
|
|
||||||
let dir = tempdir()?;
|
|
||||||
|
|
||||||
let rocksdb_path = dir.path().join("rocksdb.rdb");
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
|
||||||
struct SimpleDoc {
|
|
||||||
title: String,
|
|
||||||
description: String,
|
|
||||||
timestamp: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
let schema = {
|
|
||||||
let mut builder = SchemaBuilder::new();
|
|
||||||
builder.new_attribute("title", STORED | INDEXED);
|
|
||||||
builder.new_attribute("description", STORED | INDEXED);
|
|
||||||
builder.new_attribute("timestamp", STORED);
|
|
||||||
builder.build()
|
|
||||||
};
|
|
||||||
|
|
||||||
let database = Database::create(&rocksdb_path, schema.clone())?;
|
|
||||||
let tokenizer_builder = DefaultBuilder::new();
|
|
||||||
|
|
||||||
let update_path = dir.path().join("update.sst");
|
|
||||||
|
|
||||||
let doc0 = SimpleDoc {
|
|
||||||
title: String::from("I am a title"),
|
|
||||||
description: String::from("I am a description"),
|
|
||||||
timestamp: 1234567,
|
|
||||||
};
|
|
||||||
let doc1 = SimpleDoc {
|
|
||||||
title: String::from("I am the second title"),
|
|
||||||
description: String::from("I am the second description"),
|
|
||||||
timestamp: 7654321,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut update = {
|
|
||||||
let mut builder = PositiveUpdateBuilder::new(update_path, schema, tokenizer_builder);
|
|
||||||
|
|
||||||
builder.update(DocumentId(0), &doc0).unwrap();
|
|
||||||
builder.update(DocumentId(1), &doc1).unwrap();
|
|
||||||
|
|
||||||
builder.build()?
|
|
||||||
};
|
|
||||||
|
|
||||||
update.set_move(true);
|
|
||||||
database.ingest_update_file(update)?;
|
|
||||||
let view = database.view();
|
|
||||||
|
|
||||||
let de_doc0: SimpleDoc = view.retrieve_document(DocumentId(0))?;
|
|
||||||
let de_doc1: SimpleDoc = view.retrieve_document(DocumentId(1))?;
|
|
||||||
|
|
||||||
assert_eq!(doc0, de_doc0);
|
|
||||||
assert_eq!(doc1, de_doc1);
|
|
||||||
|
|
||||||
Ok(dir.close()?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
|
use crate::database::update::SerializerError;
|
||||||
use std::collections::{HashMap, BTreeMap};
|
use std::collections::{HashMap, BTreeMap};
|
||||||
|
use crate::database::calculate_hash;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::{fmt, u16};
|
use std::{fmt, u16};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@ -7,8 +9,11 @@ use std::sync::Arc;
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
|
||||||
use serde_derive::{Serialize, Deserialize};
|
use serde_derive::{Serialize, Deserialize};
|
||||||
|
use serde::ser::{self, Serialize};
|
||||||
use linked_hash_map::LinkedHashMap;
|
use linked_hash_map::LinkedHashMap;
|
||||||
|
|
||||||
|
use crate::DocumentId;
|
||||||
|
|
||||||
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false };
|
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false };
|
||||||
pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true };
|
pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true };
|
||||||
|
|
||||||
@ -40,12 +45,16 @@ impl BitOr for SchemaProps {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct SchemaBuilder {
|
pub struct SchemaBuilder {
|
||||||
|
identifier: String,
|
||||||
attrs: LinkedHashMap<String, SchemaProps>,
|
attrs: LinkedHashMap<String, SchemaProps>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SchemaBuilder {
|
impl SchemaBuilder {
|
||||||
pub fn new() -> SchemaBuilder {
|
pub fn with_identifier<S: Into<String>>(name: S) -> SchemaBuilder {
|
||||||
SchemaBuilder { attrs: LinkedHashMap::new() }
|
SchemaBuilder {
|
||||||
|
identifier: name.into(),
|
||||||
|
attrs: LinkedHashMap::new(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_attribute<S: Into<String>>(&mut self, name: S, props: SchemaProps) -> SchemaAttr {
|
pub fn new_attribute<S: Into<String>>(&mut self, name: S, props: SchemaProps) -> SchemaAttr {
|
||||||
@ -65,7 +74,8 @@ impl SchemaBuilder {
|
|||||||
props.push((name, prop));
|
props.push((name, prop));
|
||||||
}
|
}
|
||||||
|
|
||||||
Schema { inner: Arc::new(InnerSchema { attrs, props }) }
|
let identifier = self.identifier;
|
||||||
|
Schema { inner: Arc::new(InnerSchema { identifier, attrs, props }) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,6 +86,7 @@ pub struct Schema {
|
|||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
struct InnerSchema {
|
struct InnerSchema {
|
||||||
|
identifier: String,
|
||||||
attrs: HashMap<String, SchemaAttr>,
|
attrs: HashMap<String, SchemaAttr>,
|
||||||
props: Vec<(String, SchemaProps)>,
|
props: Vec<(String, SchemaProps)>,
|
||||||
}
|
}
|
||||||
@ -87,8 +98,8 @@ impl Schema {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_from<R: Read>(reader: R) -> bincode::Result<Schema> {
|
pub fn read_from<R: Read>(reader: R) -> bincode::Result<Schema> {
|
||||||
let attrs = bincode::deserialize_from(reader)?;
|
let (identifier, attrs) = bincode::deserialize_from(reader)?;
|
||||||
let builder = SchemaBuilder { attrs };
|
let builder = SchemaBuilder { identifier, attrs };
|
||||||
Ok(builder.build())
|
Ok(builder.build())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,12 +110,22 @@ impl Schema {
|
|||||||
ordered.insert(attr.0, (name, props));
|
ordered.insert(attr.0, (name, props));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let identifier = &self.inner.identifier;
|
||||||
let mut attrs = LinkedHashMap::with_capacity(ordered.len());
|
let mut attrs = LinkedHashMap::with_capacity(ordered.len());
|
||||||
for (_, (name, props)) in ordered {
|
for (_, (name, props)) in ordered {
|
||||||
attrs.insert(name, props);
|
attrs.insert(name, props);
|
||||||
}
|
}
|
||||||
|
|
||||||
bincode::serialize_into(writer, &attrs)
|
bincode::serialize_into(writer, &(identifier, attrs))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn document_id<T>(&self, document: &T) -> Result<DocumentId, SerializerError>
|
||||||
|
where T: Serialize,
|
||||||
|
{
|
||||||
|
let find_document_id = FindDocumentIdSerializer {
|
||||||
|
id_attribute_name: self.identifier_name(),
|
||||||
|
};
|
||||||
|
document.serialize(find_document_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn props(&self, attr: SchemaAttr) -> SchemaProps {
|
pub fn props(&self, attr: SchemaAttr) -> SchemaProps {
|
||||||
@ -112,6 +133,10 @@ impl Schema {
|
|||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn identifier_name(&self) -> &str {
|
||||||
|
&self.inner.identifier
|
||||||
|
}
|
||||||
|
|
||||||
pub fn attribute<S: AsRef<str>>(&self, name: S) -> Option<SchemaAttr> {
|
pub fn attribute<S: AsRef<str>>(&self, name: S) -> Option<SchemaAttr> {
|
||||||
self.inner.attrs.get(name.as_ref()).cloned()
|
self.inner.attrs.get(name.as_ref()).cloned()
|
||||||
}
|
}
|
||||||
@ -141,13 +166,199 @@ impl fmt::Display for SchemaAttr {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct FindDocumentIdSerializer<'a> {
|
||||||
|
id_attribute_name: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ser::Serializer for FindDocumentIdSerializer<'a> {
|
||||||
|
type Ok = DocumentId;
|
||||||
|
type Error = SerializerError;
|
||||||
|
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
|
||||||
|
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
|
||||||
|
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
|
||||||
|
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
|
||||||
|
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
|
||||||
|
type SerializeStruct = FindDocumentIdStructSerializer<'a>;
|
||||||
|
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
|
||||||
|
|
||||||
|
forward_to_unserializable_type! {
|
||||||
|
bool => serialize_bool,
|
||||||
|
char => serialize_char,
|
||||||
|
|
||||||
|
i8 => serialize_i8,
|
||||||
|
i16 => serialize_i16,
|
||||||
|
i32 => serialize_i32,
|
||||||
|
i64 => serialize_i64,
|
||||||
|
|
||||||
|
u8 => serialize_u8,
|
||||||
|
u16 => serialize_u16,
|
||||||
|
u32 => serialize_u32,
|
||||||
|
u64 => serialize_u64,
|
||||||
|
|
||||||
|
f32 => serialize_f32,
|
||||||
|
f64 => serialize_f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_str(self, _v: &str) -> Result<Self::Ok, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "str" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "&[u8]" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "Option" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
|
||||||
|
where T: Serialize,
|
||||||
|
{
|
||||||
|
Err(SerializerError::UnserializableType { name: "Option" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "()" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "unit struct" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_unit_variant(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
_variant_index: u32,
|
||||||
|
_variant: &'static str
|
||||||
|
) -> Result<Self::Ok, Self::Error>
|
||||||
|
{
|
||||||
|
Err(SerializerError::UnserializableType { name: "unit variant" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_newtype_struct<T: ?Sized>(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
value: &T
|
||||||
|
) -> Result<Self::Ok, Self::Error>
|
||||||
|
where T: Serialize,
|
||||||
|
{
|
||||||
|
value.serialize(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_newtype_variant<T: ?Sized>(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
_variant_index: u32,
|
||||||
|
_variant: &'static str,
|
||||||
|
_value: &T
|
||||||
|
) -> Result<Self::Ok, Self::Error>
|
||||||
|
where T: Serialize,
|
||||||
|
{
|
||||||
|
Err(SerializerError::UnserializableType { name: "newtype variant" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "sequence" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
|
||||||
|
Err(SerializerError::UnserializableType { name: "tuple" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_tuple_struct(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
_len: usize
|
||||||
|
) -> Result<Self::SerializeTupleStruct, Self::Error>
|
||||||
|
{
|
||||||
|
Err(SerializerError::UnserializableType { name: "tuple struct" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_tuple_variant(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
_variant_index: u32,
|
||||||
|
_variant: &'static str,
|
||||||
|
_len: usize
|
||||||
|
) -> Result<Self::SerializeTupleVariant, Self::Error>
|
||||||
|
{
|
||||||
|
Err(SerializerError::UnserializableType { name: "tuple variant" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
|
||||||
|
// Ok(MapSerializer {
|
||||||
|
// schema: self.schema,
|
||||||
|
// document_id: self.document_id,
|
||||||
|
// new_states: self.new_states,
|
||||||
|
// })
|
||||||
|
Err(SerializerError::UnserializableType { name: "map" })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_struct(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
_len: usize
|
||||||
|
) -> Result<Self::SerializeStruct, Self::Error>
|
||||||
|
{
|
||||||
|
Ok(FindDocumentIdStructSerializer {
|
||||||
|
id_attribute_name: self.id_attribute_name,
|
||||||
|
document_id: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_struct_variant(
|
||||||
|
self,
|
||||||
|
_name: &'static str,
|
||||||
|
_variant_index: u32,
|
||||||
|
_variant: &'static str,
|
||||||
|
_len: usize
|
||||||
|
) -> Result<Self::SerializeStructVariant, Self::Error>
|
||||||
|
{
|
||||||
|
Err(SerializerError::UnserializableType { name: "struct variant" })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FindDocumentIdStructSerializer<'a> {
|
||||||
|
id_attribute_name: &'a str,
|
||||||
|
document_id: Option<DocumentId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ser::SerializeStruct for FindDocumentIdStructSerializer<'a> {
|
||||||
|
type Ok = DocumentId;
|
||||||
|
type Error = SerializerError;
|
||||||
|
|
||||||
|
fn serialize_field<T: ?Sized>(
|
||||||
|
&mut self,
|
||||||
|
key: &'static str,
|
||||||
|
value: &T
|
||||||
|
) -> Result<(), Self::Error>
|
||||||
|
where T: Serialize,
|
||||||
|
{
|
||||||
|
if self.id_attribute_name == key {
|
||||||
|
// TODO can it be possible to have multiple ids?
|
||||||
|
let id = bincode::serialize(value).unwrap();
|
||||||
|
let hash = calculate_hash(&id);
|
||||||
|
self.document_id = Some(DocumentId(hash));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end(self) -> Result<Self::Ok, Self::Error> {
|
||||||
|
match self.document_id {
|
||||||
|
Some(document_id) => Ok(document_id),
|
||||||
|
None => Err(SerializerError::DocumentIdNotFound)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn serialize_deserialize() -> bincode::Result<()> {
|
fn serialize_deserialize() -> bincode::Result<()> {
|
||||||
let mut builder = SchemaBuilder::new();
|
let mut builder = SchemaBuilder::with_identifier("id");
|
||||||
builder.new_attribute("alphabet", STORED);
|
builder.new_attribute("alphabet", STORED);
|
||||||
builder.new_attribute("beta", STORED | INDEXED);
|
builder.new_attribute("beta", STORED | INDEXED);
|
||||||
builder.new_attribute("gamma", INDEXED);
|
builder.new_attribute("gamma", INDEXED);
|
||||||
|
@ -4,7 +4,7 @@ use std::error::Error;
|
|||||||
mod negative;
|
mod negative;
|
||||||
mod positive;
|
mod positive;
|
||||||
|
|
||||||
pub use self::positive::{PositiveUpdateBuilder, NewState};
|
pub use self::positive::{PositiveUpdateBuilder, NewState, SerializerError};
|
||||||
pub use self::negative::NegativeUpdateBuilder;
|
pub use self::negative::NegativeUpdateBuilder;
|
||||||
|
|
||||||
pub struct Update {
|
pub struct Update {
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
mod update;
|
mod update;
|
||||||
mod unordered_builder;
|
mod unordered_builder;
|
||||||
|
|
||||||
pub use self::update::{PositiveUpdateBuilder, NewState};
|
pub use self::update::{PositiveUpdateBuilder, NewState, SerializerError};
|
||||||
|
@ -40,18 +40,21 @@ impl<B> PositiveUpdateBuilder<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update<T: Serialize>(&mut self, id: DocumentId, document: &T) -> Result<(), Box<Error>>
|
pub fn update<T: Serialize>(&mut self, document: &T) -> Result<DocumentId, SerializerError>
|
||||||
where B: TokenizerBuilder
|
where B: TokenizerBuilder
|
||||||
{
|
{
|
||||||
|
let document_id = self.schema.document_id(document)?;
|
||||||
|
|
||||||
let serializer = Serializer {
|
let serializer = Serializer {
|
||||||
schema: &self.schema,
|
schema: &self.schema,
|
||||||
document_id: id,
|
|
||||||
tokenizer_builder: &self.tokenizer_builder,
|
tokenizer_builder: &self.tokenizer_builder,
|
||||||
|
document_id: document_id,
|
||||||
builder: &mut self.builder,
|
builder: &mut self.builder,
|
||||||
new_states: &mut self.new_states
|
new_states: &mut self.new_states
|
||||||
};
|
};
|
||||||
|
document.serialize(serializer)?;
|
||||||
|
|
||||||
Ok(ser::Serialize::serialize(document, serializer)?)
|
Ok(document_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO value must be a field that can be indexed
|
// TODO value must be a field that can be indexed
|
||||||
@ -67,7 +70,7 @@ impl<B> PositiveUpdateBuilder<B> {
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SerializerError {
|
pub enum SerializerError {
|
||||||
SchemaDontMatch { attribute: String },
|
DocumentIdNotFound,
|
||||||
UnserializableType { name: &'static str },
|
UnserializableType { name: &'static str },
|
||||||
Custom(String),
|
Custom(String),
|
||||||
}
|
}
|
||||||
@ -81,10 +84,9 @@ impl ser::Error for SerializerError {
|
|||||||
impl fmt::Display for SerializerError {
|
impl fmt::Display for SerializerError {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
SerializerError::SchemaDontMatch { attribute } => {
|
SerializerError::DocumentIdNotFound => {
|
||||||
write!(f, "serialized document try to specify the \
|
write!(f, "serialized document does not have an id according to the schema")
|
||||||
{:?} attribute that is not known by the schema", attribute)
|
}
|
||||||
},
|
|
||||||
SerializerError::UnserializableType { name } => {
|
SerializerError::UnserializableType { name } => {
|
||||||
write!(f, "Only struct and map types are considered valid documents and
|
write!(f, "Only struct and map types are considered valid documents and
|
||||||
can be serialized, not {} types directly.", name)
|
can be serialized, not {} types directly.", name)
|
||||||
@ -104,16 +106,6 @@ struct Serializer<'a, B> {
|
|||||||
new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>,
|
new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! forward_to_unserializable_type {
|
|
||||||
($($ty:ident => $se_method:ident,)*) => {
|
|
||||||
$(
|
|
||||||
fn $se_method(self, _v: $ty) -> Result<Self::Ok, Self::Error> {
|
|
||||||
Err(SerializerError::UnserializableType { name: "$ty" })
|
|
||||||
}
|
|
||||||
)*
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, B> ser::Serializer for Serializer<'a, B>
|
impl<'a, B> ser::Serializer for Serializer<'a, B>
|
||||||
where B: TokenizerBuilder
|
where B: TokenizerBuilder
|
||||||
{
|
{
|
||||||
@ -288,27 +280,25 @@ where B: TokenizerBuilder
|
|||||||
) -> Result<(), Self::Error>
|
) -> Result<(), Self::Error>
|
||||||
where T: Serialize,
|
where T: Serialize,
|
||||||
{
|
{
|
||||||
match self.schema.attribute(key) {
|
if let Some(attr) = self.schema.attribute(key) {
|
||||||
Some(attr) => {
|
let props = self.schema.props(attr);
|
||||||
let props = self.schema.props(attr);
|
if props.is_stored() {
|
||||||
if props.is_stored() {
|
let value = bincode::serialize(value).unwrap();
|
||||||
let value = bincode::serialize(value).unwrap();
|
let key = DocumentKeyAttr::new(self.document_id, attr);
|
||||||
let key = DocumentKeyAttr::new(self.document_id, attr);
|
self.new_states.insert(key, NewState::Updated { value });
|
||||||
self.new_states.insert(key, NewState::Updated { value });
|
}
|
||||||
}
|
if props.is_indexed() {
|
||||||
if props.is_indexed() {
|
let serializer = IndexerSerializer {
|
||||||
let serializer = IndexerSerializer {
|
builder: self.builder,
|
||||||
builder: self.builder,
|
tokenizer_builder: self.tokenizer_builder,
|
||||||
tokenizer_builder: self.tokenizer_builder,
|
document_id: self.document_id,
|
||||||
document_id: self.document_id,
|
attribute: attr,
|
||||||
attribute: attr,
|
};
|
||||||
};
|
value.serialize(serializer)?;
|
||||||
value.serialize(serializer)?;
|
}
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
},
|
|
||||||
None => Err(SerializerError::SchemaDontMatch { attribute: key.to_owned() }),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn end(self) -> Result<Self::Ok, Self::Error> {
|
fn end(self) -> Result<Self::Ok, Self::Error> {
|
||||||
|
@ -18,7 +18,7 @@ pub use self::common_words::CommonWords;
|
|||||||
/// It is used to inform the database the document you want to deserialize.
|
/// It is used to inform the database the document you want to deserialize.
|
||||||
/// Helpful for custom ranking.
|
/// Helpful for custom ranking.
|
||||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
||||||
pub struct DocumentId(pub u64);
|
pub struct DocumentId(u64);
|
||||||
|
|
||||||
/// Represent an attribute number along with the word index
|
/// Represent an attribute number along with the word index
|
||||||
/// according to the tokenizer used.
|
/// according to the tokenizer used.
|
||||||
|
@ -62,12 +62,12 @@ where D: Deref<Target=DB>,
|
|||||||
T: DeserializeOwned + Ord,
|
T: DeserializeOwned + Ord,
|
||||||
{
|
{
|
||||||
fn evaluate(&self, lhs: &Document, rhs: &Document, view: &DatabaseView<D>) -> Ordering {
|
fn evaluate(&self, lhs: &Document, rhs: &Document, view: &DatabaseView<D>) -> Ordering {
|
||||||
let lhs = match view.retrieve_document::<T>(lhs.id) {
|
let lhs = match view.document_by_id::<T>(lhs.id) {
|
||||||
Ok(doc) => Some(doc),
|
Ok(doc) => Some(doc),
|
||||||
Err(e) => { eprintln!("{}", e); None },
|
Err(e) => { eprintln!("{}", e); None },
|
||||||
};
|
};
|
||||||
|
|
||||||
let rhs = match view.retrieve_document::<T>(rhs.id) {
|
let rhs = match view.document_by_id::<T>(rhs.id) {
|
||||||
Ok(doc) => Some(doc),
|
Ok(doc) => Some(doc),
|
||||||
Err(e) => { eprintln!("{}", e); None },
|
Err(e) => { eprintln!("{}", e); None },
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user