mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-11 07:28:56 +01:00
210 lines
6.5 KiB
Rust
210 lines
6.5 KiB
Rust
|
use std::collections::{HashSet, BTreeMap};
|
||
|
use std::error::Error;
|
||
|
use std::sync::Arc;
|
||
|
|
||
|
use rocksdb::rocksdb::{DB, Writable, Snapshot, WriteBatch};
|
||
|
use hashbrown::hash_map::HashMap;
|
||
|
use serde::Serialize;
|
||
|
use fst::map::Map;
|
||
|
use sdset::Set;
|
||
|
|
||
|
use crate::database::{DATA_INDEX, Database, DatabaseView};
|
||
|
use crate::database::index::{Positive, PositiveBuilder, Negative};
|
||
|
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
|
||
|
use crate::database::serde::serializer::Serializer;
|
||
|
use crate::database::serde::SerializerError;
|
||
|
use crate::database::schema::SchemaAttr;
|
||
|
use crate::tokenizer::TokenizerBuilder;
|
||
|
use crate::data::{DocIds, DocIndexes};
|
||
|
use crate::database::schema::Schema;
|
||
|
use crate::{DocumentId, DocIndex};
|
||
|
use crate::database::index::Index;
|
||
|
|
||
|
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
|
||
|
|
||
|
pub struct Update<'a> {
|
||
|
database: &'a Database,
|
||
|
schema: Schema,
|
||
|
raw_builder: RawUpdateBuilder,
|
||
|
}
|
||
|
|
||
|
impl<'a> Update<'a> {
|
||
|
pub(crate) fn new(database: &'a Database, schema: Schema) -> Update<'a> {
|
||
|
Update { database, schema, raw_builder: RawUpdateBuilder::new() }
|
||
|
}
|
||
|
|
||
|
pub fn update_document<T, B>(
|
||
|
&mut self,
|
||
|
document: T,
|
||
|
tokenizer_builder: &B,
|
||
|
stop_words: &HashSet<String>,
|
||
|
) -> Result<DocumentId, SerializerError>
|
||
|
where T: Serialize,
|
||
|
B: TokenizerBuilder,
|
||
|
{
|
||
|
let document_id = self.schema.document_id(&document)?;
|
||
|
|
||
|
let serializer = Serializer {
|
||
|
schema: &self.schema,
|
||
|
document_id: document_id,
|
||
|
tokenizer_builder: tokenizer_builder,
|
||
|
update: &mut self.raw_builder.document_update(document_id)?,
|
||
|
stop_words: stop_words,
|
||
|
};
|
||
|
|
||
|
document.serialize(serializer)?;
|
||
|
|
||
|
Ok(document_id)
|
||
|
}
|
||
|
|
||
|
pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, SerializerError>
|
||
|
where T: Serialize,
|
||
|
{
|
||
|
let document_id = self.schema.document_id(&document)?;
|
||
|
self.raw_builder.document_update(document_id)?.remove()?;
|
||
|
Ok(document_id)
|
||
|
}
|
||
|
|
||
|
pub fn commit(self) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
|
||
|
let batch = self.raw_builder.build()?;
|
||
|
self.database.db.write(batch)?;
|
||
|
|
||
|
let snapshot = Snapshot::new(self.database.db.clone());
|
||
|
let view = Arc::new(DatabaseView::new(snapshot)?);
|
||
|
self.database.view.set(view.clone());
|
||
|
|
||
|
Ok(view)
|
||
|
}
|
||
|
|
||
|
pub fn abort(self) { }
|
||
|
}
|
||
|
|
||
|
#[derive(Copy, Clone, PartialEq, Eq)]
|
||
|
enum UpdateType {
|
||
|
Updated,
|
||
|
Deleted,
|
||
|
}
|
||
|
|
||
|
use UpdateType::{Updated, Deleted};
|
||
|
|
||
|
pub struct RawUpdateBuilder {
|
||
|
documents_update: HashMap<DocumentId, UpdateType>,
|
||
|
indexed_words: BTreeMap<Token, Vec<DocIndex>>,
|
||
|
batch: WriteBatch,
|
||
|
}
|
||
|
|
||
|
impl RawUpdateBuilder {
|
||
|
pub fn new() -> RawUpdateBuilder {
|
||
|
RawUpdateBuilder {
|
||
|
documents_update: HashMap::new(),
|
||
|
indexed_words: BTreeMap::new(),
|
||
|
batch: WriteBatch::new(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn document_update(&mut self, document_id: DocumentId) -> Result<DocumentUpdate, SerializerError> {
|
||
|
use serde::ser::Error;
|
||
|
|
||
|
match self.documents_update.get(&document_id) {
|
||
|
Some(Deleted) | None => Ok(DocumentUpdate { document_id, inner: self }),
|
||
|
Some(Updated) => Err(SerializerError::custom(
|
||
|
"This document has already been removed and cannot be updated in the same update"
|
||
|
)),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn build(self) -> Result<WriteBatch, Box<Error>> {
|
||
|
let negative = {
|
||
|
let mut removed_document_ids = Vec::new();
|
||
|
for (id, update_type) in self.documents_update {
|
||
|
if update_type == Deleted {
|
||
|
removed_document_ids.push(id);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
removed_document_ids.sort_unstable();
|
||
|
let removed_document_ids = Set::new_unchecked(&removed_document_ids);
|
||
|
let doc_ids = DocIds::new(removed_document_ids);
|
||
|
|
||
|
Negative::new(doc_ids)
|
||
|
};
|
||
|
|
||
|
let positive = {
|
||
|
let mut positive_builder = PositiveBuilder::memory();
|
||
|
|
||
|
for (key, mut indexes) in self.indexed_words {
|
||
|
indexes.sort_unstable();
|
||
|
let indexes = Set::new_unchecked(&indexes);
|
||
|
positive_builder.insert(key, indexes)?;
|
||
|
}
|
||
|
|
||
|
let (map, indexes) = positive_builder.into_inner()?;
|
||
|
let map = Map::from_bytes(map)?;
|
||
|
let indexes = DocIndexes::from_bytes(indexes)?;
|
||
|
|
||
|
Positive::new(map, indexes)
|
||
|
};
|
||
|
|
||
|
let index = Index { negative, positive };
|
||
|
|
||
|
// write the data-index
|
||
|
let mut bytes = Vec::new();
|
||
|
index.write_to_bytes(&mut bytes);
|
||
|
self.batch.merge(DATA_INDEX, &bytes)?;
|
||
|
|
||
|
Ok(self.batch)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub struct DocumentUpdate<'a> {
|
||
|
document_id: DocumentId,
|
||
|
inner: &'a mut RawUpdateBuilder,
|
||
|
}
|
||
|
|
||
|
impl<'a> DocumentUpdate<'a> {
|
||
|
pub fn remove(&mut self) -> Result<(), SerializerError> {
|
||
|
use serde::ser::Error;
|
||
|
|
||
|
if let Updated = self.inner.documents_update.entry(self.document_id).or_insert(Deleted) {
|
||
|
return Err(SerializerError::custom(
|
||
|
"This document has already been updated and cannot be removed in the same update"
|
||
|
));
|
||
|
}
|
||
|
|
||
|
let start = DocumentKey::new(self.document_id).with_attribute_min();
|
||
|
let end = DocumentKey::new(self.document_id).with_attribute_max(); // FIXME max + 1
|
||
|
self.inner.batch.delete_range(start.as_ref(), end.as_ref())?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: &[u8]) -> Result<(), SerializerError> {
|
||
|
use serde::ser::Error;
|
||
|
|
||
|
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
|
||
|
return Err(SerializerError::custom(
|
||
|
"This document has already been deleted and cannot be updated in the same update"
|
||
|
));
|
||
|
}
|
||
|
|
||
|
let key = DocumentKeyAttr::new(self.document_id, attr);
|
||
|
self.inner.batch.put(key.as_ref(), &value)?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
pub fn insert_doc_index(&mut self, token: Token, doc_index: DocIndex) -> Result<(), SerializerError> {
|
||
|
use serde::ser::Error;
|
||
|
|
||
|
if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) {
|
||
|
return Err(SerializerError::custom(
|
||
|
"This document has already been deleted and cannot be updated in the same update"
|
||
|
));
|
||
|
}
|
||
|
|
||
|
self.inner.indexed_words.entry(token).or_insert_with(Vec::new).push(doc_index);
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
}
|