feat: Change updates to be handled using the RocksDB WriteBatch feature

This commit is contained in:
Clément Renault 2019-02-05 14:48:55 +01:00
parent 0d2daf27f2
commit 1bfd51d6e9
No known key found for this signature in database
GPG key ID: 0151CDAB43460DAE
12 changed files with 289 additions and 840 deletions

View file

@ -38,6 +38,10 @@ impl DocumentKey {
DocumentKeyAttr::new(self.document_id(), attr)
}
pub fn with_attribute_min(&self) -> DocumentKeyAttr {
DocumentKeyAttr::new(self.document_id(), SchemaAttr::min())
}
pub fn with_attribute_max(&self) -> DocumentKeyAttr {
DocumentKeyAttr::new(self.document_id(), SchemaAttr::max())
}

View file

@ -1,17 +1,17 @@
use std::sync::{Arc, Mutex};
use std::error::Error;
use std::ops::Deref;
use std::path::Path;
use std::ops::Deref;
use std::sync::Arc;
use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions};
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, DBVector, MergeOperands};
use rocksdb::{DB, MergeOperands};
use crossbeam::atomic::ArcCell;
use log::info;
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
pub use self::view::{DatabaseView, DocumentIter};
pub use self::update::{Update, UpdateBuilder};
pub use self::update::Update;
pub use self::serde::SerializerError;
pub use self::schema::Schema;
pub use self::index::Index;
@ -78,11 +78,7 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan
}
pub struct Database {
// DB is under a Mutex to sync update ingestions and separate DB update locking
// and DatabaseView acquiring locking in other words:
// "Block readers the minimum possible amount of time"
db: Mutex<Arc<DB>>,
db: Arc<DB>,
// This view is updated each time the DB ingests an update
view: ArcCell<DatabaseView<Arc<DB>>>,
}
@ -113,7 +109,7 @@ impl Database {
let snapshot = Snapshot::new(db.clone());
let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?));
Ok(Database { db: Mutex::new(db), view })
Ok(Database { db, view })
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
@ -137,49 +133,16 @@ impl Database {
let snapshot = Snapshot::new(db.clone());
let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?));
Ok(Database { db: Mutex::new(db), view })
Ok(Database { db, view })
}
pub fn ingest_update_file(&self, update: Update) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
let snapshot = {
// We must have a mutex here to ensure that update ingestions and compactions
// are done atomatically and in the right order.
// This way update ingestions will block other update ingestions without blocking view
// creations while doing the "data-index" compaction
let db = match self.db.lock() {
Ok(db) => db,
Err(e) => return Err(e.to_string().into()),
};
let path = update.path().to_string_lossy();
let options = IngestExternalFileOptions::new();
// options.move_files(move_update);
let (elapsed, result) = elapsed::measure_time(|| {
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])
});
let _ = result?;
info!("ingesting update file took {}", elapsed);
Snapshot::new(db.clone())
pub fn update(&self) -> Result<Update, Box<Error>> {
let schema = match self.db.get(DATA_SCHEMA)? {
Some(value) => Schema::read_from_bin(&*value)?,
None => panic!("Database does not contain a schema"),
};
let view = Arc::new(DatabaseView::new(snapshot)?);
self.view.set(view.clone());
Ok(view)
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> {
self.view().get(key)
}
pub fn flush(&self) -> Result<(), Box<Error>> {
match self.db.lock() {
Ok(db) => Ok(db.flush(true)?),
Err(e) => Err(e.to_string().into()),
}
Ok(Update::new(self, schema))
}
pub fn view(&self) -> Arc<DatabaseView<Arc<DB>>> {
@ -193,20 +156,18 @@ mod tests {
use std::error::Error;
use serde_derive::{Serialize, Deserialize};
use tempfile::tempdir;
use crate::database::schema::{SchemaBuilder, STORED, INDEXED};
use crate::database::update::UpdateBuilder;
use crate::tokenizer::DefaultBuilder;
use super::*;
#[test]
fn ingest_one_update_file() -> Result<(), Box<Error>> {
let dir = tempdir()?;
fn ingest_one_easy_update() -> Result<(), Box<Error>> {
let dir = tempfile::tempdir()?;
let stop_words = HashSet::new();
let rocksdb_path = dir.path().join("rocksdb.rdb");
let meilidb_path = dir.path().join("meilidb.mdb");
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct SimpleDoc {
@ -225,9 +186,7 @@ mod tests {
builder.build()
};
let database = Database::create(&rocksdb_path, &schema)?;
let update_path = dir.path().join("update.sst");
let database = Database::create(&meilidb_path, &schema)?;
let doc0 = SimpleDoc {
id: 0,
@ -242,20 +201,13 @@ mod tests {
timestamp: 7654321,
};
let docid0;
let docid1;
let update = {
let tokenizer_builder = DefaultBuilder::new();
let mut builder = UpdateBuilder::new(update_path, schema);
let tokenizer_builder = DefaultBuilder::new();
let mut builder = database.update()?;
docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
builder.build()?
};
database.ingest_update_file(update)?;
let view = database.view();
let view = builder.commit()?;
let de_doc0: SimpleDoc = view.document_by_id(docid0)?;
let de_doc1: SimpleDoc = view.document_by_id(docid1)?;
@ -267,11 +219,11 @@ mod tests {
}
#[test]
fn ingest_two_update_files() -> Result<(), Box<Error>> {
let dir = tempdir()?;
fn ingest_two_easy_updates() -> Result<(), Box<Error>> {
let dir = tempfile::tempdir()?;
let stop_words = HashSet::new();
let rocksdb_path = dir.path().join("rocksdb.rdb");
let meilidb_path = dir.path().join("meilidb.mdb");
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct SimpleDoc {
@ -290,7 +242,7 @@ mod tests {
builder.build()
};
let database = Database::create(&rocksdb_path, &schema)?;
let database = Database::create(&meilidb_path, &schema)?;
let doc0 = SimpleDoc {
id: 0,
@ -317,36 +269,17 @@ mod tests {
timestamp: 7654321,
};
let docid0;
let docid1;
let update1 = {
let tokenizer_builder = DefaultBuilder::new();
let update_path = dir.path().join("update-000.sst");
let mut builder = UpdateBuilder::new(update_path, schema.clone());
let tokenizer_builder = DefaultBuilder::new();
docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
let mut builder = database.update()?;
let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
builder.commit()?;
builder.build()?
};
let docid2;
let docid3;
let update2 = {
let tokenizer_builder = DefaultBuilder::new();
let update_path = dir.path().join("update-001.sst");
let mut builder = UpdateBuilder::new(update_path, schema);
docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?;
docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?;
builder.build()?
};
database.ingest_update_file(update1)?;
database.ingest_update_file(update2)?;
let view = database.view();
let mut builder = database.update()?;
let docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?;
let docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?;
let view = builder.commit()?;
let de_doc0: SimpleDoc = view.document_by_id(docid0)?;
let de_doc1: SimpleDoc = view.document_by_id(docid1)?;
@ -380,7 +313,6 @@ mod bench {
use rand::seq::SliceRandom;
use crate::tokenizer::DefaultBuilder;
use crate::database::update::UpdateBuilder;
use crate::database::schema::*;
use super::*;
@ -425,9 +357,8 @@ mod bench {
description: String,
}
let path = dir.path().join("update-000.sst");
let tokenizer_builder = DefaultBuilder;
let mut builder = UpdateBuilder::new(path, schema);
let mut builder = database.update()?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..300 {
@ -439,8 +370,7 @@ mod bench {
builder.update_document(&document, &tokenizer_builder, &stop_words)?;
}
let update = builder.build()?;
database.ingest_update_file(update)?;
builder.commit()?;
drop(database);
@ -472,9 +402,8 @@ mod bench {
description: String,
}
let path = dir.path().join("update-000.sst");
let tokenizer_builder = DefaultBuilder;
let mut builder = UpdateBuilder::new(path, schema);
let mut builder = database.update()?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..3000 {
@ -486,8 +415,7 @@ mod bench {
builder.update_document(&document, &tokenizer_builder, &stop_words)?;
}
let update = builder.build()?;
database.ingest_update_file(update)?;
builder.commit()?;
drop(database);
@ -520,9 +448,8 @@ mod bench {
description: String,
}
let path = dir.path().join("update-000.sst");
let tokenizer_builder = DefaultBuilder;
let mut builder = UpdateBuilder::new(path, schema);
let mut builder = database.update()?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..30_000 {
@ -534,8 +461,7 @@ mod bench {
builder.update_document(&document, &tokenizer_builder, &stop_words)?;
}
let update = builder.build()?;
database.ingest_update_file(update)?;
builder.commit()?;
drop(database);
@ -567,9 +493,8 @@ mod bench {
description: String,
}
let path = dir.path().join("update-000.sst");
let tokenizer_builder = DefaultBuilder;
let mut builder = UpdateBuilder::new(path, schema);
let mut builder = database.update()?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..300 {
@ -581,8 +506,7 @@ mod bench {
builder.update_document(&document, &tokenizer_builder, &stop_words)?;
}
let update = builder.build()?;
let view = database.ingest_update_file(update)?;
let view = builder.commit()?;
bench.iter(|| {
for q in &["a", "b", "c", "d", "e"] {
@ -614,9 +538,8 @@ mod bench {
description: String,
}
let path = dir.path().join("update-000.sst");
let tokenizer_builder = DefaultBuilder;
let mut builder = UpdateBuilder::new(path, schema);
let mut builder = database.update()?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..3000 {
@ -628,8 +551,7 @@ mod bench {
builder.update_document(&document, &tokenizer_builder, &stop_words)?;
}
let update = builder.build()?;
let view = database.ingest_update_file(update)?;
let view = builder.commit()?;
bench.iter(|| {
for q in &["a", "b", "c", "d", "e"] {
@ -662,9 +584,8 @@ mod bench {
description: String,
}
let path = dir.path().join("update-000.sst");
let tokenizer_builder = DefaultBuilder;
let mut builder = UpdateBuilder::new(path, schema);
let mut builder = database.update()?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..30_000 {
@ -676,8 +597,7 @@ mod bench {
builder.update_document(&document, &tokenizer_builder, &stop_words)?;
}
let update = builder.build()?;
let view = database.ingest_update_file(update)?;
let view = builder.commit()?;
bench.iter(|| {
for q in &["a", "b", "c", "d", "e"] {

View file

@ -10,15 +10,15 @@ use crate::tokenizer::TokenizerBuilder;
use crate::tokenizer::Token;
use crate::{DocumentId, DocIndex};
pub struct IndexerSerializer<'a, B> {
pub struct IndexerSerializer<'a, 'b, B> {
pub tokenizer_builder: &'a B,
pub update: &'a mut DocumentUpdate,
pub update: &'a mut DocumentUpdate<'b>,
pub document_id: DocumentId,
pub attribute: SchemaAttr,
pub stop_words: &'a HashSet<String>,
}
impl<'a, B> ser::Serializer for IndexerSerializer<'a, B>
impl<'a, 'b, B> ser::Serializer for IndexerSerializer<'a, 'b, B>
where B: TokenizerBuilder
{
type Ok = ();
@ -71,14 +71,14 @@ where B: TokenizerBuilder
let char_length = length;
let doc_index = DocIndex { document_id, attribute, word_index, char_index, char_length };
self.update.insert_doc_index(word_unidecoded.into_bytes(), doc_index);
self.update.insert_doc_index(word_unidecoded.into_bytes(), doc_index)?;
}
let char_index = char_index as u32;
let char_length = length;
let doc_index = DocIndex { document_id, attribute, word_index, char_index, char_length };
self.update.insert_doc_index(word_lower.into_bytes(), doc_index);
self.update.insert_doc_index(word_lower.into_bytes(), doc_index)?;
}
Ok(())
}

View file

@ -56,3 +56,9 @@ impl fmt::Display for SerializerError {
}
impl Error for SerializerError {}
impl From<String> for SerializerError {
fn from(value: String) -> SerializerError {
SerializerError::Custom(value)
}
}

View file

@ -11,15 +11,15 @@ use crate::tokenizer::TokenizerBuilder;
use crate::database::schema::Schema;
use crate::DocumentId;
pub struct Serializer<'a, B> {
pub struct Serializer<'a, 'b, B> {
pub schema: &'a Schema,
pub update: &'a mut DocumentUpdate,
pub update: &'a mut DocumentUpdate<'b>,
pub document_id: DocumentId,
pub tokenizer_builder: &'a B,
pub stop_words: &'a HashSet<String>,
}
impl<'a, B> ser::Serializer for Serializer<'a, B>
impl<'a, 'b, B> ser::Serializer for Serializer<'a, 'b, B>
where B: TokenizerBuilder
{
type Ok = ();
@ -28,8 +28,8 @@ where B: TokenizerBuilder
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
type SerializeMap = MapSerializer<'a, B>;
type SerializeStruct = StructSerializer<'a, B>;
type SerializeMap = MapSerializer<'a, 'b, B>;
type SerializeStruct = StructSerializer<'a, 'b, B>;
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
forward_to_unserializable_type! {
@ -174,16 +174,16 @@ where B: TokenizerBuilder
}
}
pub struct MapSerializer<'a, B> {
pub struct MapSerializer<'a, 'b, B> {
pub schema: &'a Schema,
pub document_id: DocumentId,
pub update: &'a mut DocumentUpdate,
pub update: &'a mut DocumentUpdate<'b>,
pub tokenizer_builder: &'a B,
pub stop_words: &'a HashSet<String>,
pub current_key_name: Option<String>,
}
impl<'a, B> ser::SerializeMap for MapSerializer<'a, B>
impl<'a, 'b, B> ser::SerializeMap for MapSerializer<'a, 'b, B>
where B: TokenizerBuilder
{
type Ok = ();
@ -207,7 +207,7 @@ where B: TokenizerBuilder
fn serialize_entry<K: ?Sized, V: ?Sized>(
&mut self,
key: &K,
value: &V
value: &V,
) -> Result<(), Self::Error>
where K: Serialize, V: Serialize,
{
@ -217,7 +217,7 @@ where B: TokenizerBuilder
let props = self.schema.props(attr);
if props.is_stored() {
let value = bincode::serialize(value).unwrap();
self.update.insert_attribute_value(attr, value);
self.update.insert_attribute_value(attr, &value)?;
}
if props.is_indexed() {
let serializer = IndexerSerializer {
@ -239,15 +239,15 @@ where B: TokenizerBuilder
}
}
pub struct StructSerializer<'a, B> {
pub struct StructSerializer<'a, 'b, B> {
pub schema: &'a Schema,
pub document_id: DocumentId,
pub update: &'a mut DocumentUpdate,
pub update: &'a mut DocumentUpdate<'b>,
pub tokenizer_builder: &'a B,
pub stop_words: &'a HashSet<String>,
}
impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B>
impl<'a, 'b, B> ser::SerializeStruct for StructSerializer<'a, 'b, B>
where B: TokenizerBuilder
{
type Ok = ();
@ -264,7 +264,7 @@ where B: TokenizerBuilder
let props = self.schema.props(attr);
if props.is_stored() {
let value = bincode::serialize(value).unwrap();
self.update.insert_attribute_value(attr, value);
self.update.insert_attribute_value(attr, &value)?;
}
if props.is_indexed() {
let serializer = IndexerSerializer {

209
src/database/update.rs Normal file
View file

@ -0,0 +1,209 @@
use std::collections::{HashSet, BTreeMap};
use std::error::Error;
use std::sync::Arc;
use rocksdb::rocksdb::{DB, Writable, Snapshot, WriteBatch};
use hashbrown::hash_map::HashMap;
use serde::Serialize;
use fst::map::Map;
use sdset::Set;
use crate::database::{DATA_INDEX, Database, DatabaseView};
use crate::database::index::{Positive, PositiveBuilder, Negative};
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
use crate::database::serde::serializer::Serializer;
use crate::database::serde::SerializerError;
use crate::database::schema::SchemaAttr;
use crate::tokenizer::TokenizerBuilder;
use crate::data::{DocIds, DocIndexes};
use crate::database::schema::Schema;
use crate::{DocumentId, DocIndex};
use crate::database::index::Index;
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
pub struct Update<'a> {
database: &'a Database,
schema: Schema,
raw_builder: RawUpdateBuilder,
}
impl<'a> Update<'a> {
pub(crate) fn new(database: &'a Database, schema: Schema) -> Update<'a> {
Update { database, schema, raw_builder: RawUpdateBuilder::new() }
}
pub fn update_document<T, B>(
&mut self,
document: T,
tokenizer_builder: &B,
stop_words: &HashSet<String>,
) -> Result<DocumentId, SerializerError>
where T: Serialize,
B: TokenizerBuilder,
{
let document_id = self.schema.document_id(&document)?;
let serializer = Serializer {
schema: &self.schema,
document_id: document_id,
tokenizer_builder: tokenizer_builder,
update: &mut self.raw_builder.document_update(document_id)?,
stop_words: stop_words,
};
document.serialize(serializer)?;
Ok(document_id)
}
pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, SerializerError>
where T: Serialize,
{
let document_id = self.schema.document_id(&document)?;
self.raw_builder.document_update(document_id)?.remove()?;
Ok(document_id)
}
pub fn commit(self) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
let batch = self.raw_builder.build()?;
self.database.db.write(batch)?;
let snapshot = Snapshot::new(self.database.db.clone());
let view = Arc::new(DatabaseView::new(snapshot)?);
self.database.view.set(view.clone());
Ok(view)
}
pub fn abort(self) { }
}
#[derive(Copy, Clone, PartialEq, Eq)]
enum UpdateType {
Updated,
Deleted,
}
use UpdateType::{Updated, Deleted};
pub struct RawUpdateBuilder {
documents_update: HashMap<DocumentId, UpdateType>,
indexed_words: BTreeMap<Token, Vec<DocIndex>>,
batch: WriteBatch,
}
impl RawUpdateBuilder {
pub fn new() -> RawUpdateBuilder {
RawUpdateBuilder {
documents_update: HashMap::new(),
indexed_words: BTreeMap::new(),
batch: WriteBatch::new(),
}
}
pub fn document_update(&mut self, document_id: DocumentId) -> Result<DocumentUpdate, SerializerError> {
use serde::ser::Error;
match self.documents_update.get(&document_id) {
Some(Deleted) | None => Ok(DocumentUpdate { document_id, inner: self }),
Some(Updated) => Err(SerializerError::custom(
"This document has already been removed and cannot be updated in the same update"
)),
}
}
pub fn build(self) -> Result<WriteBatch, Box<Error>> {
let negative = {
let mut removed_document_ids = Vec::new();
for (id, update_type) in self.documents_update {
if update_type == Deleted {
removed_document_ids.push(id);
}
}
removed_document_ids.sort_unstable();
let removed_document_ids = Set::new_unchecked(&removed_document_ids);
let doc_ids = DocIds::new(removed_document_ids);
Negative::new(doc_ids)
};
let positive = {
let mut positive_builder = PositiveBuilder::memory();
for (key, mut indexes) in self.indexed_words {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
positive_builder.insert(key, indexes)?;
}
let (map, indexes) = positive_builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
};
let index = Index { negative, positive };
// write the data-index
let mut bytes = Vec::new();
index.write_to_bytes(&mut bytes);
self.batch.merge(DATA_INDEX, &bytes)?;
Ok(self.batch)
}
}
pub struct DocumentUpdate<'a> {
document_id: DocumentId,
inner: &'a mut RawUpdateBuilder,
}
impl<'a> DocumentUpdate<'a> {
pub fn remove(&mut self) -> Result<(), SerializerError> {
use serde::ser::Error;
if let Updated = self.inner.documents_update.entry(self.document_id).or_insert(Deleted) {
return Err(SerializerError::custom(
"This document has already been updated and cannot be removed in the same update"
));
}
let start = DocumentKey::new(self.document_id).with_attribute_min();
let end = DocumentKey::new(self.document_id).with_attribute_max(); // FIXME max + 1
self.inner.batch.delete_range(start.as_ref(), end.as_ref())?;
Ok(())
}
pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: &[u8]) -> Result<(), SerializerError> {
use serde::ser::Error;
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
return Err(SerializerError::custom(
"This document has already been deleted and cannot be updated in the same update"
));
}
let key = DocumentKeyAttr::new(self.document_id, attr);
self.inner.batch.put(key.as_ref(), &value)?;
Ok(())
}
pub fn insert_doc_index(&mut self, token: Token, doc_index: DocIndex) -> Result<(), SerializerError> {
use serde::ser::Error;
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
return Err(SerializerError::custom(
"This document has already been deleted and cannot be updated in the same update"
));
}
self.inner.indexed_words.entry(token).or_insert_with(Vec::new).push(doc_index);
Ok(())
}
}

View file

@ -1,64 +0,0 @@
use std::collections::HashSet;
use std::path::PathBuf;
use std::error::Error;
use serde::Serialize;
use crate::database::serde::serializer::Serializer;
use crate::database::serde::SerializerError;
use crate::tokenizer::TokenizerBuilder;
use crate::database::Schema;
use crate::DocumentId;
use super::{Update, RawUpdateBuilder};
pub struct UpdateBuilder {
schema: Schema,
raw_builder: RawUpdateBuilder,
}
impl UpdateBuilder {
pub fn new(path: PathBuf, schema: Schema) -> UpdateBuilder {
UpdateBuilder {
schema: schema,
raw_builder: RawUpdateBuilder::new(path),
}
}
pub fn update_document<T, B>(
&mut self,
document: T,
tokenizer_builder: &B,
stop_words: &HashSet<String>,
) -> Result<DocumentId, SerializerError>
where T: Serialize,
B: TokenizerBuilder,
{
let document_id = self.schema.document_id(&document)?;
let update = self.raw_builder.document_update(document_id);
let serializer = Serializer {
schema: &self.schema,
document_id: document_id,
tokenizer_builder: tokenizer_builder,
update: update,
stop_words: stop_words,
};
document.serialize(serializer)?;
Ok(document_id)
}
pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, SerializerError>
where T: Serialize,
{
let document_id = self.schema.document_id(&document)?;
self.raw_builder.document_update(document_id).remove();
Ok(document_id)
}
pub fn build(self) -> Result<Update, Box<Error>> {
self.raw_builder.build()
}
}

View file

@ -1,17 +0,0 @@
use std::path::{Path, PathBuf};
mod builder;
mod raw_builder;
pub use self::builder::UpdateBuilder;
pub use self::raw_builder::{RawUpdateBuilder, DocumentUpdate};
pub struct Update {
sst_file: PathBuf,
}
impl Update {
pub fn path(&self) -> &Path {
&self.sst_file
}
}

View file

@ -1,170 +0,0 @@
use std::collections::btree_map::{BTreeMap, Entry};
use std::path::PathBuf;
use std::error::Error;
use rocksdb::rocksdb_options;
use hashbrown::HashMap;
use fst::map::Map;
use sdset::Set;
use log::warn;
use crate::database::index::{Index, Positive, PositiveBuilder, Negative};
use crate::database::{DATA_INDEX, DocumentKeyAttr};
use crate::database::schema::SchemaAttr;
use crate::data::{DocIds, DocIndexes};
use crate::{DocumentId, DocIndex};
use super::Update;
type Token = Vec<u8>; // TODO could be replaced by a SmallVec
type Value = Vec<u8>;
pub struct RawUpdateBuilder {
sst_file: PathBuf,
document_updates: BTreeMap<DocumentId, DocumentUpdate>,
}
pub struct DocumentUpdate {
cleared: bool,
words_indexes: HashMap<Token, Vec<DocIndex>>,
attributes: BTreeMap<SchemaAttr, Value>,
}
impl DocumentUpdate {
pub fn new() -> DocumentUpdate {
DocumentUpdate {
cleared: false,
words_indexes: HashMap::new(),
attributes: BTreeMap::new(),
}
}
pub fn remove(&mut self) {
self.cleared = true;
self.clear();
}
pub fn clear(&mut self) {
self.words_indexes.clear();
self.attributes.clear();
}
pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: Vec<u8>) {
self.attributes.insert(attr, value);
}
pub fn insert_doc_index(&mut self, token: Vec<u8>, doc_index: DocIndex) {
self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index)
}
}
impl RawUpdateBuilder {
pub fn new(path: PathBuf) -> RawUpdateBuilder {
RawUpdateBuilder {
sst_file: path,
document_updates: BTreeMap::new(),
}
}
pub fn document_update(&mut self, document_id: DocumentId) -> &mut DocumentUpdate {
match self.document_updates.entry(document_id) {
Entry::Occupied(mut occupied) => {
warn!("Already updated document {:?}, clearing it", document_id);
occupied.get_mut().clear();
occupied.into_mut()
},
Entry::Vacant(vacant) => vacant.insert(DocumentUpdate::new()),
}
}
pub fn build(mut self) -> Result<Update, Box<Error>> {
let mut removed_document_ids = Vec::new();
let mut words_indexes = BTreeMap::new();
for (&id, update) in self.document_updates.iter_mut() {
if update.cleared { removed_document_ids.push(id) }
for (token, indexes) in &update.words_indexes {
words_indexes.entry(token).or_insert_with(Vec::new).extend_from_slice(indexes)
}
}
let negative = {
let removed_document_ids = Set::new_unchecked(&removed_document_ids);
let doc_ids = DocIds::new(removed_document_ids);
Negative::new(doc_ids)
};
let positive = {
let mut positive_builder = PositiveBuilder::memory();
for (key, mut indexes) in words_indexes {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
positive_builder.insert(key, indexes)?;
}
let (map, indexes) = positive_builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
};
let index = Index { negative, positive };
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.sst_file.to_string_lossy())?;
// write the data-index
let mut bytes = Vec::new();
index.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents attributes updates
for (id, update) in self.document_updates {
let mut last_attr: Option<SchemaAttr> = None;
for (attr, value) in update.attributes {
if update.cleared {
// if there is no last attribute, remove from the first attribute
let start_attr = match last_attr {
Some(attr) => attr.next(),
None => Some(SchemaAttr::min())
};
let start = start_attr.map(|a| DocumentKeyAttr::new(id, a));
let end = attr.prev().map(|a| DocumentKeyAttr::new(id, a));
// delete_range between (last_attr + 1) and (attr - 1)
if let (Some(start), Some(end)) = (start, end) {
file_writer.delete_range(start.as_ref(), end.as_ref())?;
}
}
let key = DocumentKeyAttr::new(id, attr);
file_writer.put(key.as_ref(), &value)?;
last_attr = Some(attr);
}
if update.cleared {
// if there is no last attribute, remove from the first attribute
let start_attr = match last_attr {
Some(attr) => attr.next(),
None => Some(SchemaAttr::min())
};
let start = start_attr.map(|a| DocumentKeyAttr::new(id, a));
let end = DocumentKeyAttr::with_attribute_max(id);
// delete_range between (last_attr + 1) and attr_max
if let Some(start) = start {
file_writer.delete_range(start.as_ref(), end.as_ref())?;
}
}
}
file_writer.finish()?;
Ok(Update { sst_file: self.sst_file })
}
}