2019-02-05 14:48:55 +01:00
|
|
|
use std::collections::{HashSet, BTreeMap};
|
|
|
|
use std::error::Error;
|
|
|
|
|
2019-02-06 18:03:41 +01:00
|
|
|
use rocksdb::rocksdb::{Writable, WriteBatch};
|
2019-02-05 14:48:55 +01:00
|
|
|
use hashbrown::hash_map::HashMap;
|
2019-02-14 20:22:25 +01:00
|
|
|
use sdset::{Set, SetBuf};
|
2019-02-05 14:48:55 +01:00
|
|
|
use serde::Serialize;
|
2019-02-24 19:44:24 +01:00
|
|
|
use meilidb_core::write_to_bytes::WriteToBytes;
|
|
|
|
use meilidb_core::data::DocIds;
|
|
|
|
use meilidb_core::{IndexBuilder, DocumentId, DocIndex};
|
2019-02-25 18:24:46 +01:00
|
|
|
use meilidb_tokenizer::TokenizerBuilder;
|
2019-02-05 14:48:55 +01:00
|
|
|
|
|
|
|
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
|
|
|
|
use crate::database::serde::serializer::Serializer;
|
|
|
|
use crate::database::serde::SerializerError;
|
|
|
|
use crate::database::schema::SchemaAttr;
|
|
|
|
use crate::database::schema::Schema;
|
2019-02-08 15:17:42 +01:00
|
|
|
use crate::database::{DATA_INDEX, DATA_RANKED_MAP};
|
2019-02-11 16:58:44 +01:00
|
|
|
use crate::database::{RankedMap, Number};
|
2019-02-05 14:48:55 +01:00
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
pub use self::index_event::{ReadIndexEvent, WriteIndexEvent};
|
|
|
|
pub use self::ranked_map_event::{ReadRankedMapEvent, WriteRankedMapEvent};
|
|
|
|
|
|
|
|
mod index_event;
|
|
|
|
mod ranked_map_event;
|
|
|
|
|
2019-02-05 14:48:55 +01:00
|
|
|
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
|
|
|
|
|
2019-02-06 18:03:41 +01:00
|
|
|
pub struct Update {
|
2019-02-05 14:48:55 +01:00
|
|
|
schema: Schema,
|
|
|
|
raw_builder: RawUpdateBuilder,
|
|
|
|
}
|
|
|
|
|
2019-02-06 18:03:41 +01:00
|
|
|
impl Update {
|
|
|
|
pub(crate) fn new(schema: Schema) -> Update {
|
|
|
|
Update { schema, raw_builder: RawUpdateBuilder::new() }
|
2019-02-05 14:48:55 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn update_document<T, B>(
|
|
|
|
&mut self,
|
|
|
|
document: T,
|
|
|
|
tokenizer_builder: &B,
|
|
|
|
stop_words: &HashSet<String>,
|
|
|
|
) -> Result<DocumentId, SerializerError>
|
|
|
|
where T: Serialize,
|
|
|
|
B: TokenizerBuilder,
|
|
|
|
{
|
|
|
|
let document_id = self.schema.document_id(&document)?;
|
|
|
|
|
|
|
|
let serializer = Serializer {
|
|
|
|
schema: &self.schema,
|
|
|
|
document_id: document_id,
|
|
|
|
tokenizer_builder: tokenizer_builder,
|
|
|
|
update: &mut self.raw_builder.document_update(document_id)?,
|
|
|
|
stop_words: stop_words,
|
|
|
|
};
|
|
|
|
|
|
|
|
document.serialize(serializer)?;
|
|
|
|
|
|
|
|
Ok(document_id)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, SerializerError>
|
|
|
|
where T: Serialize,
|
|
|
|
{
|
|
|
|
let document_id = self.schema.document_id(&document)?;
|
|
|
|
self.raw_builder.document_update(document_id)?.remove()?;
|
|
|
|
Ok(document_id)
|
|
|
|
}
|
|
|
|
|
2019-02-06 18:03:41 +01:00
|
|
|
pub(crate) fn build(self) -> Result<WriteBatch, Box<Error>> {
|
|
|
|
self.raw_builder.build()
|
2019-02-05 14:48:55 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Copy, Clone, PartialEq, Eq)]
|
|
|
|
enum UpdateType {
|
|
|
|
Updated,
|
|
|
|
Deleted,
|
|
|
|
}
|
|
|
|
|
|
|
|
use UpdateType::{Updated, Deleted};
|
|
|
|
|
|
|
|
pub struct RawUpdateBuilder {
|
|
|
|
documents_update: HashMap<DocumentId, UpdateType>,
|
2019-02-09 13:49:18 +01:00
|
|
|
documents_ranked_fields: RankedMap,
|
2019-02-05 14:48:55 +01:00
|
|
|
indexed_words: BTreeMap<Token, Vec<DocIndex>>,
|
|
|
|
batch: WriteBatch,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl RawUpdateBuilder {
|
|
|
|
pub fn new() -> RawUpdateBuilder {
|
|
|
|
RawUpdateBuilder {
|
|
|
|
documents_update: HashMap::new(),
|
2019-02-08 15:17:42 +01:00
|
|
|
documents_ranked_fields: HashMap::new(),
|
2019-02-05 14:48:55 +01:00
|
|
|
indexed_words: BTreeMap::new(),
|
|
|
|
batch: WriteBatch::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn document_update(&mut self, document_id: DocumentId) -> Result<DocumentUpdate, SerializerError> {
|
|
|
|
use serde::ser::Error;
|
|
|
|
|
|
|
|
match self.documents_update.get(&document_id) {
|
|
|
|
Some(Deleted) | None => Ok(DocumentUpdate { document_id, inner: self }),
|
|
|
|
Some(Updated) => Err(SerializerError::custom(
|
|
|
|
"This document has already been removed and cannot be updated in the same update"
|
|
|
|
)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn build(self) -> Result<WriteBatch, Box<Error>> {
|
2019-02-14 20:22:25 +01:00
|
|
|
// create the list of all the removed documents
|
|
|
|
let removed_documents = {
|
|
|
|
let mut document_ids = Vec::new();
|
2019-02-05 14:48:55 +01:00
|
|
|
for (id, update_type) in self.documents_update {
|
|
|
|
if update_type == Deleted {
|
2019-02-14 20:22:25 +01:00
|
|
|
document_ids.push(id);
|
2019-02-05 14:48:55 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
document_ids.sort_unstable();
|
2019-02-17 16:33:42 +01:00
|
|
|
let setbuf = SetBuf::new_unchecked(document_ids);
|
|
|
|
DocIds::new(&setbuf)
|
2019-02-05 14:48:55 +01:00
|
|
|
};
|
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
// create the Index of all the document updates
|
|
|
|
let index = {
|
|
|
|
let mut builder = IndexBuilder::new();
|
2019-02-05 14:48:55 +01:00
|
|
|
for (key, mut indexes) in self.indexed_words {
|
|
|
|
indexes.sort_unstable();
|
|
|
|
let indexes = Set::new_unchecked(&indexes);
|
2019-02-14 20:22:25 +01:00
|
|
|
builder.insert(key, indexes).unwrap();
|
2019-02-05 14:48:55 +01:00
|
|
|
}
|
2019-02-14 20:22:25 +01:00
|
|
|
builder.build()
|
|
|
|
};
|
2019-02-05 14:48:55 +01:00
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
// WARN: removed documents must absolutely
|
|
|
|
// be merged *before* document updates
|
2019-02-05 14:48:55 +01:00
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
// === index ===
|
|
|
|
|
|
|
|
if !removed_documents.is_empty() {
|
|
|
|
// remove the documents using the appropriate IndexEvent
|
2019-02-17 16:33:42 +01:00
|
|
|
let event_bytes = WriteIndexEvent::RemovedDocuments(&removed_documents).into_bytes();
|
2019-02-14 20:22:25 +01:00
|
|
|
self.batch.merge(DATA_INDEX, &event_bytes)?;
|
|
|
|
}
|
2019-02-05 14:48:55 +01:00
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
// update the documents using the appropriate IndexEvent
|
2019-02-17 16:33:42 +01:00
|
|
|
let event_bytes = WriteIndexEvent::UpdatedDocuments(&index).into_bytes();
|
2019-02-14 20:22:25 +01:00
|
|
|
self.batch.merge(DATA_INDEX, &event_bytes)?;
|
2019-02-05 14:48:55 +01:00
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
// === ranked map ===
|
|
|
|
|
|
|
|
if !removed_documents.is_empty() {
|
|
|
|
// update the ranked map using the appropriate RankedMapEvent
|
2019-02-17 16:33:42 +01:00
|
|
|
let event_bytes = WriteRankedMapEvent::RemovedDocuments(&removed_documents).into_bytes();
|
2019-02-14 20:22:25 +01:00
|
|
|
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
|
|
|
|
}
|
2019-02-08 15:17:42 +01:00
|
|
|
|
2019-02-14 20:22:25 +01:00
|
|
|
// update the documents using the appropriate IndexEvent
|
2019-02-17 16:33:42 +01:00
|
|
|
let event_bytes = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields).into_bytes();
|
2019-02-14 20:22:25 +01:00
|
|
|
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
|
2019-02-05 14:48:55 +01:00
|
|
|
|
|
|
|
Ok(self.batch)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct DocumentUpdate<'a> {
|
|
|
|
document_id: DocumentId,
|
|
|
|
inner: &'a mut RawUpdateBuilder,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> DocumentUpdate<'a> {
|
|
|
|
pub fn remove(&mut self) -> Result<(), SerializerError> {
|
|
|
|
use serde::ser::Error;
|
|
|
|
|
|
|
|
if let Updated = self.inner.documents_update.entry(self.document_id).or_insert(Deleted) {
|
|
|
|
return Err(SerializerError::custom(
|
|
|
|
"This document has already been updated and cannot be removed in the same update"
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let start = DocumentKey::new(self.document_id).with_attribute_min();
|
|
|
|
let end = DocumentKey::new(self.document_id).with_attribute_max(); // FIXME max + 1
|
|
|
|
self.inner.batch.delete_range(start.as_ref(), end.as_ref())?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: &[u8]) -> Result<(), SerializerError> {
|
|
|
|
use serde::ser::Error;
|
|
|
|
|
|
|
|
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
|
|
|
|
return Err(SerializerError::custom(
|
|
|
|
"This document has already been deleted and cannot be updated in the same update"
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let key = DocumentKeyAttr::new(self.document_id, attr);
|
|
|
|
self.inner.batch.put(key.as_ref(), &value)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_doc_index(&mut self, token: Token, doc_index: DocIndex) -> Result<(), SerializerError> {
|
|
|
|
use serde::ser::Error;
|
|
|
|
|
|
|
|
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
|
|
|
|
return Err(SerializerError::custom(
|
|
|
|
"This document has already been deleted and cannot be updated in the same update"
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
self.inner.indexed_words.entry(token).or_insert_with(Vec::new).push(doc_index);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2019-02-08 15:17:42 +01:00
|
|
|
|
|
|
|
pub fn register_ranked_attribute(
|
|
|
|
&mut self,
|
|
|
|
attr: SchemaAttr,
|
2019-02-11 16:58:44 +01:00
|
|
|
number: Number,
|
2019-02-08 15:17:42 +01:00
|
|
|
) -> Result<(), SerializerError>
|
|
|
|
{
|
|
|
|
use serde::ser::Error;
|
|
|
|
|
|
|
|
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
|
|
|
|
return Err(SerializerError::custom(
|
|
|
|
"This document has already been deleted, ranked attributes cannot be added in the same update"
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
2019-02-11 16:58:44 +01:00
|
|
|
self.inner.documents_ranked_fields.insert((self.document_id, attr), number);
|
2019-02-08 15:17:42 +01:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2019-02-05 14:48:55 +01:00
|
|
|
}
|