mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
chore: Update the module hierarchy
This commit is contained in:
parent
2c3d71dd8f
commit
8bee31078d
26 changed files with 33 additions and 296 deletions
110
src/database/blob/mod.rs
Normal file
110
src/database/blob/mod.rs
Normal file
|
@ -0,0 +1,110 @@
|
|||
mod ops;
|
||||
pub mod positive;
|
||||
pub mod negative;
|
||||
|
||||
pub use self::positive::{PositiveBlob, PositiveBlobBuilder};
|
||||
pub use self::negative::NegativeBlob;
|
||||
pub use self::ops::OpBuilder;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use serde_derive::{Serialize, Deserialize};
|
||||
use serde::ser::{Serialize, Serializer, SerializeTuple};
|
||||
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Blob {
|
||||
Positive(PositiveBlob),
|
||||
Negative(NegativeBlob),
|
||||
}
|
||||
|
||||
impl Blob {
|
||||
pub fn is_negative(&self) -> bool {
|
||||
self.sign() == Sign::Negative
|
||||
}
|
||||
|
||||
pub fn is_positive(&self) -> bool {
|
||||
self.sign() == Sign::Positive
|
||||
}
|
||||
|
||||
pub fn sign(&self) -> Sign {
|
||||
match self {
|
||||
Blob::Positive(_) => Sign::Positive,
|
||||
Blob::Negative(_) => Sign::Negative,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Blob {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
match self {
|
||||
Blob::Positive(blob) => {
|
||||
let mut tuple = serializer.serialize_tuple(2)?;
|
||||
tuple.serialize_element(&Sign::Positive)?;
|
||||
tuple.serialize_element(&blob)?;
|
||||
tuple.end()
|
||||
},
|
||||
Blob::Negative(blob) => {
|
||||
let mut tuple = serializer.serialize_tuple(2)?;
|
||||
tuple.serialize_element(&Sign::Negative)?;
|
||||
tuple.serialize_element(&blob)?;
|
||||
tuple.end()
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Blob {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Blob, D::Error> {
|
||||
struct TupleVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for TupleVisitor {
|
||||
type Value = Blob;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a Blob struct")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
|
||||
let sign = match seq.next_element()? {
|
||||
Some(value) => value,
|
||||
None => return Err(de::Error::invalid_length(0, &self)),
|
||||
};
|
||||
match sign {
|
||||
Sign::Positive => {
|
||||
let blob = match seq.next_element()? {
|
||||
Some(value) => value,
|
||||
None => return Err(de::Error::invalid_length(1, &self)),
|
||||
};
|
||||
Ok(Blob::Positive(blob))
|
||||
},
|
||||
Sign::Negative => {
|
||||
let blob = match seq.next_element()? {
|
||||
Some(value) => value,
|
||||
None => return Err(de::Error::invalid_length(1, &self)),
|
||||
};
|
||||
Ok(Blob::Negative(blob))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_tuple(2, TupleVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum Sign {
|
||||
Positive,
|
||||
Negative,
|
||||
}
|
||||
|
||||
impl Sign {
|
||||
pub fn invert(self) -> Sign {
|
||||
match self {
|
||||
Sign::Positive => Sign::Negative,
|
||||
Sign::Negative => Sign::Positive,
|
||||
}
|
||||
}
|
||||
}
|
66
src/database/blob/negative/blob.rs
Normal file
66
src/database/blob/negative/blob.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
use std::error::Error;
|
||||
use std::path::Path;
|
||||
use std::fmt;
|
||||
|
||||
use serde::de::{self, Deserialize, Deserializer};
|
||||
use serde::ser::{Serialize, Serializer};
|
||||
use crate::data::DocIds;
|
||||
use crate::DocumentId;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NegativeBlob {
|
||||
doc_ids: DocIds,
|
||||
}
|
||||
|
||||
impl NegativeBlob {
|
||||
pub unsafe fn from_path<P>(doc_ids: P) -> Result<Self, Box<Error>>
|
||||
where P: AsRef<Path>,
|
||||
{
|
||||
let doc_ids = DocIds::from_path(doc_ids)?;
|
||||
Ok(NegativeBlob { doc_ids })
|
||||
}
|
||||
|
||||
pub fn from_bytes(doc_ids: Vec<u8>) -> Result<Self, Box<Error>> {
|
||||
let doc_ids = DocIds::from_bytes(doc_ids)?;
|
||||
Ok(NegativeBlob { doc_ids })
|
||||
}
|
||||
|
||||
pub fn from_raw(doc_ids: DocIds) -> Self {
|
||||
NegativeBlob { doc_ids }
|
||||
}
|
||||
|
||||
pub fn as_ids(&self) -> &DocIds {
|
||||
&self.doc_ids
|
||||
}
|
||||
|
||||
pub fn into_doc_ids(self) -> DocIds {
|
||||
self.doc_ids
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[DocumentId]> for NegativeBlob {
|
||||
fn as_ref(&self) -> &[DocumentId] {
|
||||
self.as_ids().doc_ids()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NegativeBlob {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "NegativeBlob(")?;
|
||||
f.debug_list().entries(self.as_ref()).finish()?;
|
||||
write!(f, ")")
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for NegativeBlob {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
self.doc_ids.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for NegativeBlob {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<NegativeBlob, D::Error> {
|
||||
let bytes = Vec::deserialize(deserializer)?;
|
||||
NegativeBlob::from_bytes(bytes).map_err(de::Error::custom)
|
||||
}
|
||||
}
|
5
src/database/blob/negative/mod.rs
Normal file
5
src/database/blob/negative/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
|||
mod blob;
|
||||
mod ops;
|
||||
|
||||
pub use self::blob::NegativeBlob;
|
||||
pub use self::ops::OpBuilder;
|
73
src/database/blob/negative/ops.rs
Normal file
73
src/database/blob/negative/ops.rs
Normal file
|
@ -0,0 +1,73 @@
|
|||
use sdset::multi::OpBuilder as SdOpBuilder;
|
||||
use sdset::Set;
|
||||
|
||||
use crate::database::blob::NegativeBlob;
|
||||
use crate::data::DocIds;
|
||||
use crate::DocumentId;
|
||||
|
||||
pub struct OpBuilder<'a> {
|
||||
inner: SdOpBuilder<'a, DocumentId>,
|
||||
}
|
||||
|
||||
/// Do a set operation on multiple negative blobs.
|
||||
impl<'a> OpBuilder<'a> {
|
||||
pub fn new() -> Self {
|
||||
Self { inner: SdOpBuilder::new() }
|
||||
}
|
||||
|
||||
pub fn with_capacity(cap: usize) -> Self {
|
||||
Self { inner: SdOpBuilder::with_capacity(cap) }
|
||||
}
|
||||
|
||||
pub fn add(mut self, blob: &'a NegativeBlob) -> Self {
|
||||
self.push(blob);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn push(&mut self, blob: &'a NegativeBlob) {
|
||||
let set = Set::new_unchecked(blob.as_ref());
|
||||
self.inner.push(set);
|
||||
}
|
||||
|
||||
pub fn union(self) -> Union<'a> {
|
||||
Union::new(self.inner.union())
|
||||
}
|
||||
|
||||
pub fn intersection(self) -> Intersection<'a> {
|
||||
Intersection::new(self.inner.intersection())
|
||||
}
|
||||
|
||||
pub fn difference(self) -> Difference<'a> {
|
||||
Difference::new(self.inner.difference())
|
||||
}
|
||||
|
||||
pub fn symmetric_difference(self) -> SymmetricDifference<'a> {
|
||||
SymmetricDifference::new(self.inner.symmetric_difference())
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! logical_operation {
|
||||
(struct $name:ident, $operation:ident) => {
|
||||
|
||||
pub struct $name<'a> {
|
||||
op: sdset::multi::$name<'a, DocumentId>,
|
||||
}
|
||||
|
||||
impl<'a> $name<'a> {
|
||||
fn new(op: sdset::multi::$name<'a, DocumentId>) -> Self {
|
||||
$name { op }
|
||||
}
|
||||
|
||||
pub fn into_negative_blob(self) -> NegativeBlob {
|
||||
let document_ids = sdset::SetOperation::into_set_buf(self.op);
|
||||
let doc_ids = DocIds::from_document_ids(document_ids.into_vec());
|
||||
NegativeBlob::from_raw(doc_ids)
|
||||
}
|
||||
}
|
||||
|
||||
}}
|
||||
|
||||
logical_operation!(struct Union, union);
|
||||
logical_operation!(struct Intersection, intersection);
|
||||
logical_operation!(struct Difference, difference);
|
||||
logical_operation!(struct SymmetricDifference, symmetric_difference);
|
111
src/database/blob/ops.rs
Normal file
111
src/database/blob/ops.rs
Normal file
|
@ -0,0 +1,111 @@
|
|||
use std::error::Error;
|
||||
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use group_by::GroupBy;
|
||||
use sdset::duo::DifferenceByKey;
|
||||
use sdset::{Set, SetOperation};
|
||||
|
||||
use crate::database::blob::{Blob, Sign, PositiveBlob, PositiveBlobBuilder, NegativeBlob};
|
||||
use crate::database::blob::{positive, negative};
|
||||
|
||||
fn blob_same_sign(a: &Blob, b: &Blob) -> bool {
|
||||
a.sign() == b.sign()
|
||||
}
|
||||
|
||||
fn unwrap_positive(blob: &Blob) -> &PositiveBlob {
|
||||
match blob {
|
||||
Blob::Positive(blob) => blob,
|
||||
Blob::Negative(_) => panic!("called `unwrap_positive()` on a `Negative` value"),
|
||||
}
|
||||
}
|
||||
|
||||
fn unwrap_negative(blob: &Blob) -> &NegativeBlob {
|
||||
match blob {
|
||||
Blob::Negative(blob) => blob,
|
||||
Blob::Positive(_) => panic!("called `unwrap_negative()` on a `Positive` value"),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpBuilder {
|
||||
blobs: Vec<Blob>,
|
||||
}
|
||||
|
||||
impl OpBuilder {
|
||||
pub fn new() -> OpBuilder {
|
||||
OpBuilder { blobs: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn with_capacity(cap: usize) -> OpBuilder {
|
||||
OpBuilder { blobs: Vec::with_capacity(cap) }
|
||||
}
|
||||
|
||||
pub fn push(&mut self, blob: Blob) {
|
||||
if self.blobs.is_empty() && blob.is_negative() { return }
|
||||
self.blobs.push(blob);
|
||||
}
|
||||
|
||||
pub fn merge(self) -> Result<PositiveBlob, Box<Error>> {
|
||||
let groups = GroupBy::new(&self.blobs, blob_same_sign);
|
||||
let mut aggregated = Vec::new();
|
||||
|
||||
for blobs in groups {
|
||||
match blobs[0].sign() {
|
||||
Sign::Positive => {
|
||||
let mut op_builder = positive::OpBuilder::with_capacity(blobs.len());
|
||||
for blob in blobs {
|
||||
op_builder.push(unwrap_positive(blob));
|
||||
}
|
||||
|
||||
let mut stream = op_builder.union().into_stream();
|
||||
let mut builder = PositiveBlobBuilder::memory();
|
||||
while let Some((input, doc_indexes)) = stream.next() {
|
||||
// FIXME empty doc_indexes must be handled by OpBuilder
|
||||
if !doc_indexes.is_empty() {
|
||||
builder.insert(input, doc_indexes).unwrap();
|
||||
}
|
||||
}
|
||||
let (map, doc_indexes) = builder.into_inner().unwrap();
|
||||
let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap();
|
||||
aggregated.push(Blob::Positive(blob));
|
||||
},
|
||||
Sign::Negative => {
|
||||
let mut op_builder = negative::OpBuilder::with_capacity(blobs.len());
|
||||
for blob in blobs {
|
||||
op_builder.push(unwrap_negative(blob));
|
||||
}
|
||||
let blob = op_builder.union().into_negative_blob();
|
||||
aggregated.push(Blob::Negative(blob));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
aggregated.chunks(2).try_fold(PositiveBlob::default(), |base, slice| {
|
||||
let negative = NegativeBlob::default();
|
||||
let (positive, negative) = match slice {
|
||||
[a, b] => (unwrap_positive(a), unwrap_negative(b)),
|
||||
[a] => (unwrap_positive(a), &negative),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let mut builder = PositiveBlobBuilder::memory();
|
||||
let doc_ids = Set::new_unchecked(negative.as_ref());
|
||||
|
||||
let op_builder = positive::OpBuilder::new().add(&base).add(&positive);
|
||||
let mut stream = op_builder.union().into_stream();
|
||||
while let Some((input, doc_indexes)) = stream.next() {
|
||||
let doc_indexes = Set::new_unchecked(doc_indexes);
|
||||
let op = DifferenceByKey::new(doc_indexes, doc_ids, |x| x.document_id, |x| *x);
|
||||
|
||||
buffer.clear();
|
||||
op.extend_vec(&mut buffer);
|
||||
if !buffer.is_empty() {
|
||||
builder.insert(input, &buffer)?;
|
||||
}
|
||||
}
|
||||
|
||||
let (map, doc_indexes) = builder.into_inner()?;
|
||||
PositiveBlob::from_bytes(map, doc_indexes)
|
||||
})
|
||||
}
|
||||
}
|
253
src/database/blob/positive/blob.rs
Normal file
253
src/database/blob/positive/blob.rs
Normal file
|
@ -0,0 +1,253 @@
|
|||
use std::fmt;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::error::Error;
|
||||
|
||||
use fst::{map, Map, Streamer, IntoStreamer};
|
||||
|
||||
use crate::DocIndex;
|
||||
use crate::data::{DocIndexes, DocIndexesBuilder};
|
||||
use serde::ser::{Serialize, Serializer, SerializeTuple};
|
||||
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct PositiveBlob {
|
||||
map: Map,
|
||||
indexes: DocIndexes,
|
||||
}
|
||||
|
||||
impl PositiveBlob {
|
||||
pub unsafe fn from_paths<P, Q>(map: P, indexes: Q) -> Result<Self, Box<Error>>
|
||||
where P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
{
|
||||
let map = Map::from_path(map)?;
|
||||
let indexes = DocIndexes::from_path(indexes)?;
|
||||
Ok(PositiveBlob { map, indexes })
|
||||
}
|
||||
|
||||
pub fn from_bytes(map: Vec<u8>, indexes: Vec<u8>) -> Result<Self, Box<Error>> {
|
||||
let map = Map::from_bytes(map)?;
|
||||
let indexes = DocIndexes::from_bytes(indexes)?;
|
||||
Ok(PositiveBlob { map, indexes })
|
||||
}
|
||||
|
||||
pub fn from_raw(map: Map, indexes: DocIndexes) -> Self {
|
||||
PositiveBlob { map, indexes }
|
||||
}
|
||||
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<&[DocIndex]> {
|
||||
self.map.get(key).map(|index| &self.indexes[index as usize])
|
||||
}
|
||||
|
||||
pub fn as_map(&self) -> &Map {
|
||||
&self.map
|
||||
}
|
||||
|
||||
pub fn as_indexes(&self) -> &DocIndexes {
|
||||
&self.indexes
|
||||
}
|
||||
|
||||
pub fn explode(self) -> (Map, DocIndexes) {
|
||||
(self.map, self.indexes)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for PositiveBlob {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "PositiveBlob([")?;
|
||||
let mut stream = self.into_stream();
|
||||
let mut first = true;
|
||||
while let Some((k, v)) = stream.next() {
|
||||
if !first {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
first = false;
|
||||
write!(f, "({}, {:?})", String::from_utf8_lossy(k), v)?;
|
||||
}
|
||||
write!(f, "])")
|
||||
}
|
||||
}
|
||||
|
||||
impl<'m, 'a> IntoStreamer<'a> for &'m PositiveBlob {
|
||||
type Item = (&'a [u8], &'a [DocIndex]);
|
||||
/// The type of the stream to be constructed.
|
||||
type Into = PositiveBlobStream<'m>;
|
||||
|
||||
/// Construct a stream from `Self`.
|
||||
fn into_stream(self) -> Self::Into {
|
||||
PositiveBlobStream {
|
||||
map_stream: self.map.into_stream(),
|
||||
doc_indexes: &self.indexes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PositiveBlobStream<'m> {
|
||||
map_stream: map::Stream<'m>,
|
||||
doc_indexes: &'m DocIndexes,
|
||||
}
|
||||
|
||||
impl<'m, 'a> Streamer<'a> for PositiveBlobStream<'m> {
|
||||
type Item = (&'a [u8], &'a [DocIndex]);
|
||||
|
||||
fn next(&'a mut self) -> Option<Self::Item> {
|
||||
match self.map_stream.next() {
|
||||
Some((input, index)) => {
|
||||
let doc_indexes = &self.doc_indexes[index as usize];
|
||||
Some((input, doc_indexes))
|
||||
},
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for PositiveBlob {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
let mut tuple = serializer.serialize_tuple(2)?;
|
||||
tuple.serialize_element(&self.map.as_fst().to_vec())?;
|
||||
tuple.serialize_element(&self.indexes.to_vec())?;
|
||||
tuple.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for PositiveBlob {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<PositiveBlob, D::Error> {
|
||||
struct TupleVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for TupleVisitor {
|
||||
type Value = PositiveBlob;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a PositiveBlob struct")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
|
||||
let map = match seq.next_element()? {
|
||||
Some(bytes) => match Map::from_bytes(bytes) {
|
||||
Ok(value) => value,
|
||||
Err(err) => return Err(de::Error::custom(err)),
|
||||
},
|
||||
None => return Err(de::Error::invalid_length(0, &self)),
|
||||
};
|
||||
|
||||
let indexes = match seq.next_element()? {
|
||||
Some(bytes) => match DocIndexes::from_bytes(bytes) {
|
||||
Ok(value) => value,
|
||||
Err(err) => return Err(de::Error::custom(err)),
|
||||
},
|
||||
None => return Err(de::Error::invalid_length(1, &self)),
|
||||
};
|
||||
|
||||
Ok(PositiveBlob { map, indexes })
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_tuple(2, TupleVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PositiveBlobBuilder<W, X> {
|
||||
map: fst::MapBuilder<W>,
|
||||
indexes: DocIndexesBuilder<X>,
|
||||
value: u64,
|
||||
}
|
||||
|
||||
impl PositiveBlobBuilder<Vec<u8>, Vec<u8>> {
|
||||
pub fn memory() -> Self {
|
||||
PositiveBlobBuilder {
|
||||
map: fst::MapBuilder::memory(),
|
||||
indexes: DocIndexesBuilder::memory(),
|
||||
value: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: Write, X: Write> PositiveBlobBuilder<W, X> {
|
||||
pub fn new(map: W, indexes: X) -> Result<Self, Box<Error>> {
|
||||
Ok(PositiveBlobBuilder {
|
||||
map: fst::MapBuilder::new(map)?,
|
||||
indexes: DocIndexesBuilder::new(indexes),
|
||||
value: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// If a key is inserted that is less than or equal to any previous key added,
|
||||
/// then an error is returned. Similarly, if there was a problem writing
|
||||
/// to the underlying writer, an error is returned.
|
||||
// FIXME what if one write doesn't work but the other do ?
|
||||
pub fn insert<K>(&mut self, key: K, doc_indexes: &[DocIndex]) -> Result<(), Box<Error>>
|
||||
where K: AsRef<[u8]>,
|
||||
{
|
||||
self.map.insert(key, self.value)?;
|
||||
self.indexes.insert(doc_indexes)?;
|
||||
self.value += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Result<(), Box<Error>> {
|
||||
self.into_inner().map(drop)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
|
||||
let map = self.map.into_inner()?;
|
||||
let indexes = self.indexes.into_inner()?;
|
||||
Ok((map, indexes))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::error::Error;
|
||||
|
||||
#[test]
|
||||
fn serialize_deserialize() -> Result<(), Box<Error>> {
|
||||
let a = DocIndex { document_id: 0, attribute: 3, attribute_index: 11 };
|
||||
let b = DocIndex { document_id: 1, attribute: 4, attribute_index: 21 };
|
||||
let c = DocIndex { document_id: 2, attribute: 8, attribute_index: 2 };
|
||||
|
||||
let mut builder = PositiveBlobBuilder::memory();
|
||||
|
||||
builder.insert("aaa", &[a])?;
|
||||
builder.insert("aab", &[a, b, c])?;
|
||||
builder.insert("aac", &[a, c])?;
|
||||
|
||||
let (map_bytes, indexes_bytes) = builder.into_inner()?;
|
||||
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
|
||||
|
||||
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
|
||||
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
|
||||
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));
|
||||
assert_eq!(positive_blob.get("aad"), None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_serialize_deserialize() -> Result<(), Box<Error>> {
|
||||
let a = DocIndex { document_id: 0, attribute: 3, attribute_index: 11 };
|
||||
let b = DocIndex { document_id: 1, attribute: 4, attribute_index: 21 };
|
||||
let c = DocIndex { document_id: 2, attribute: 8, attribute_index: 2 };
|
||||
|
||||
let mut builder = PositiveBlobBuilder::memory();
|
||||
|
||||
builder.insert("aaa", &[a])?;
|
||||
builder.insert("aab", &[a, b, c])?;
|
||||
builder.insert("aac", &[a, c])?;
|
||||
|
||||
let (map_bytes, indexes_bytes) = builder.into_inner()?;
|
||||
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
|
||||
|
||||
let bytes = bincode::serialize(&positive_blob)?;
|
||||
let positive_blob: PositiveBlob = bincode::deserialize(&bytes)?;
|
||||
|
||||
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
|
||||
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
|
||||
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));
|
||||
assert_eq!(positive_blob.get("aad"), None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
5
src/database/blob/positive/mod.rs
Normal file
5
src/database/blob/positive/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
|||
mod blob;
|
||||
mod ops;
|
||||
|
||||
pub use self::blob::{PositiveBlob, PositiveBlobBuilder};
|
||||
pub use self::ops::OpBuilder;
|
128
src/database/blob/positive/ops.rs
Normal file
128
src/database/blob/positive/ops.rs
Normal file
|
@ -0,0 +1,128 @@
|
|||
use sdset::multi::OpBuilder as SdOpBuilder;
|
||||
use sdset::{SetOperation, Set};
|
||||
|
||||
use crate::database::blob::PositiveBlob;
|
||||
use crate::data::DocIndexes;
|
||||
use crate::DocIndex;
|
||||
|
||||
pub struct OpBuilder<'m> {
|
||||
// the operation on the maps is always an union.
|
||||
map_op: fst::map::OpBuilder<'m>,
|
||||
indexes: Vec<&'m DocIndexes>,
|
||||
}
|
||||
|
||||
/// Do a set operation on multiple positive blobs.
|
||||
impl<'m> OpBuilder<'m> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
map_op: fst::map::OpBuilder::new(),
|
||||
indexes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_capacity(cap: usize) -> Self {
|
||||
Self {
|
||||
map_op: fst::map::OpBuilder::new(), // TODO patch fst to add with_capacity
|
||||
indexes: Vec::with_capacity(cap),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(mut self, blob: &'m PositiveBlob) -> Self {
|
||||
self.push(blob);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn push(&mut self, blob: &'m PositiveBlob) {
|
||||
self.map_op.push(blob.as_map());
|
||||
self.indexes.push(blob.as_indexes());
|
||||
}
|
||||
|
||||
pub fn union(self) -> Union<'m> {
|
||||
Union::new(self.map_op.union(), self.indexes)
|
||||
}
|
||||
|
||||
pub fn intersection(self) -> Intersection<'m> {
|
||||
Intersection::new(self.map_op.union(), self.indexes)
|
||||
}
|
||||
|
||||
pub fn difference(self) -> Difference<'m> {
|
||||
Difference::new(self.map_op.union(), self.indexes)
|
||||
}
|
||||
|
||||
pub fn symmetric_difference(self) -> SymmetricDifference<'m> {
|
||||
SymmetricDifference::new(self.map_op.union(), self.indexes)
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! logical_operation {
|
||||
(struct $name:ident, $operation:ident) => {
|
||||
|
||||
pub struct $name<'m> {
|
||||
stream: fst::map::Union<'m>,
|
||||
indexes: Vec<&'m DocIndexes>,
|
||||
outs: Vec<DocIndex>,
|
||||
}
|
||||
|
||||
impl<'m> $name<'m> {
|
||||
fn new(stream: fst::map::Union<'m>, indexes: Vec<&'m DocIndexes>) -> Self {
|
||||
$name {
|
||||
stream: stream,
|
||||
indexes: indexes,
|
||||
outs: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'m, 'a> fst::Streamer<'a> for $name<'m> {
|
||||
type Item = (&'a [u8], &'a [DocIndex]);
|
||||
|
||||
fn next(&'a mut self) -> Option<Self::Item> {
|
||||
// loop {
|
||||
// let (input, ivalues) = match self.stream.next() {
|
||||
// Some(value) => value,
|
||||
// None => return None,
|
||||
// };
|
||||
|
||||
// self.outs.clear();
|
||||
|
||||
// let mut builder = SdOpBuilder::with_capacity(ivalues.len());
|
||||
// for ivalue in ivalues {
|
||||
// let indexes = self.indexes[ivalue.index];
|
||||
// let indexes = indexes.get(ivalue.value).expect("BUG: could not find document indexes");
|
||||
// let set = Set::new_unchecked(indexes);
|
||||
// builder.push(set);
|
||||
// }
|
||||
|
||||
// builder.$operation().extend_vec(&mut self.outs);
|
||||
|
||||
// if self.outs.is_empty() { continue }
|
||||
// return Some((input, &self.outs))
|
||||
// }
|
||||
|
||||
// FIXME make the above code compile
|
||||
match self.stream.next() {
|
||||
Some((input, ivalues)) => {
|
||||
self.outs.clear();
|
||||
|
||||
let mut builder = SdOpBuilder::with_capacity(ivalues.len());
|
||||
for ivalue in ivalues {
|
||||
let doc_indexes = &self.indexes[ivalue.index][ivalue.value as usize];
|
||||
let set = Set::new_unchecked(doc_indexes);
|
||||
builder.push(set);
|
||||
}
|
||||
|
||||
builder.$operation().extend_vec(&mut self.outs);
|
||||
|
||||
if self.outs.is_empty() { return None }
|
||||
return Some((input, &self.outs))
|
||||
},
|
||||
None => None
|
||||
}
|
||||
}
|
||||
}
|
||||
}}
|
||||
|
||||
logical_operation!(struct Union, union);
|
||||
logical_operation!(struct Intersection, intersection);
|
||||
logical_operation!(struct Difference, difference);
|
||||
logical_operation!(struct SymmetricDifference, symmetric_difference);
|
|
@ -5,11 +5,9 @@ use rocksdb::rocksdb::{DB, DBVector, Snapshot, SeekKey};
|
|||
use rocksdb::rocksdb_options::ReadOptions;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use crate::database::deserializer::{Deserializer, DeserializerError};
|
||||
use crate::database::{DATA_INDEX, DATA_SCHEMA};
|
||||
use crate::blob::positive::PositiveBlob;
|
||||
use crate::index::schema::Schema;
|
||||
use crate::database::{retrieve_data_schema, DocumentKey, DocumentKeyAttr};
|
||||
use crate::database::deserializer::Deserializer;
|
||||
use crate::database::schema::Schema;
|
||||
use crate::DocumentId;
|
||||
|
||||
pub struct DatabaseView<'a> {
|
||||
|
|
|
@ -8,7 +8,7 @@ use serde::de::value::MapDeserializer;
|
|||
use serde::de::{self, Visitor, IntoDeserializer};
|
||||
|
||||
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
|
||||
use crate::index::schema::Schema;
|
||||
use crate::database::schema::Schema;
|
||||
use crate::DocumentId;
|
||||
|
||||
pub struct Deserializer<'a> {
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::fmt;
|
|||
|
||||
use byteorder::{NativeEndian, WriteBytesExt, ReadBytesExt};
|
||||
|
||||
use crate::index::schema::SchemaAttr;
|
||||
use crate::database::schema::SchemaAttr;
|
||||
use crate::DocumentId;
|
||||
|
||||
const DOC_KEY_LEN: usize = 4 + size_of::<u64>();
|
||||
|
|
|
@ -7,13 +7,16 @@ use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamil
|
|||
use rocksdb::{DB, DBVector, MergeOperands, SeekKey};
|
||||
use rocksdb::rocksdb::{Writable, Snapshot};
|
||||
|
||||
pub use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
|
||||
pub use crate::database::database_view::DatabaseView;
|
||||
use crate::index::update::Update;
|
||||
use crate::index::schema::Schema;
|
||||
use crate::blob::positive::PositiveBlob;
|
||||
use crate::blob::{self, Blob};
|
||||
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
||||
pub use self::database_view::DatabaseView;
|
||||
use self::blob::positive::PositiveBlob;
|
||||
use self::update::Update;
|
||||
use self::schema::Schema;
|
||||
use self::blob::Blob;
|
||||
|
||||
pub mod blob;
|
||||
pub mod schema;
|
||||
pub mod update;
|
||||
mod document_key;
|
||||
mod database_view;
|
||||
mod deserializer;
|
||||
|
@ -163,14 +166,13 @@ fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut Merge
|
|||
mod tests {
|
||||
use super::*;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use serde_derive::{Serialize, Deserialize};
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::tokenizer::DefaultBuilder;
|
||||
use crate::index::update::PositiveUpdateBuilder;
|
||||
use crate::index::schema::{Schema, SchemaBuilder, STORED, INDEXED};
|
||||
use crate::database::update::PositiveUpdateBuilder;
|
||||
use crate::database::schema::{SchemaBuilder, STORED, INDEXED};
|
||||
|
||||
#[test]
|
||||
fn ingest_update_file() -> Result<(), Box<Error>> {
|
||||
|
|
162
src/database/schema.rs
Normal file
162
src/database/schema.rs
Normal file
|
@ -0,0 +1,162 @@
|
|||
use std::collections::{HashMap, BTreeMap};
|
||||
use std::io::{Read, Write};
|
||||
use std::path::Path;
|
||||
use std::ops::BitOr;
|
||||
use std::fs::File;
|
||||
use std::fmt;
|
||||
|
||||
use serde_derive::{Serialize, Deserialize};
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
|
||||
pub const STORED: SchemaProps = SchemaProps { stored: true, indexed: false };
|
||||
pub const INDEXED: SchemaProps = SchemaProps { stored: false, indexed: true };
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct SchemaProps {
|
||||
stored: bool,
|
||||
indexed: bool,
|
||||
}
|
||||
|
||||
impl SchemaProps {
|
||||
pub fn is_stored(&self) -> bool {
|
||||
self.stored
|
||||
}
|
||||
|
||||
pub fn is_indexed(&self) -> bool {
|
||||
self.indexed
|
||||
}
|
||||
}
|
||||
|
||||
impl BitOr for SchemaProps {
|
||||
type Output = Self;
|
||||
|
||||
fn bitor(self, other: Self) -> Self::Output {
|
||||
SchemaProps {
|
||||
stored: self.stored | other.stored,
|
||||
indexed: self.indexed | other.indexed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SchemaBuilder {
|
||||
attrs: LinkedHashMap<String, SchemaProps>,
|
||||
}
|
||||
|
||||
impl SchemaBuilder {
|
||||
pub fn new() -> SchemaBuilder {
|
||||
SchemaBuilder { attrs: LinkedHashMap::new() }
|
||||
}
|
||||
|
||||
pub fn new_attribute<S: Into<String>>(&mut self, name: S, props: SchemaProps) -> SchemaAttr {
|
||||
let len = self.attrs.len();
|
||||
if self.attrs.insert(name.into(), props).is_some() {
|
||||
panic!("Field already inserted.")
|
||||
}
|
||||
SchemaAttr(len as u32)
|
||||
}
|
||||
|
||||
pub fn build(self) -> Schema {
|
||||
let mut attrs = HashMap::new();
|
||||
let mut props = Vec::new();
|
||||
|
||||
for (i, (name, prop)) in self.attrs.into_iter().enumerate() {
|
||||
attrs.insert(name, SchemaAttr(i as u32));
|
||||
props.push(prop);
|
||||
}
|
||||
|
||||
Schema { attrs, props }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Schema {
|
||||
attrs: HashMap<String, SchemaAttr>,
|
||||
props: Vec<SchemaProps>,
|
||||
}
|
||||
|
||||
impl Schema {
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> bincode::Result<Schema> {
|
||||
let file = File::open(path)?;
|
||||
Schema::read_from(file)
|
||||
}
|
||||
|
||||
pub fn read_from<R: Read>(reader: R) -> bincode::Result<Schema> {
|
||||
let attrs = bincode::deserialize_from(reader)?;
|
||||
let builder = SchemaBuilder { attrs };
|
||||
Ok(builder.build())
|
||||
}
|
||||
|
||||
pub fn write_to<W: Write>(&self, writer: W) -> bincode::Result<()> {
|
||||
let mut ordered = BTreeMap::new();
|
||||
for (name, field) in &self.attrs {
|
||||
let index = field.as_u32();
|
||||
let props = self.props[index as usize];
|
||||
ordered.insert(index, (name, props));
|
||||
}
|
||||
|
||||
let mut attrs = LinkedHashMap::with_capacity(ordered.len());
|
||||
for (_, (name, props)) in ordered {
|
||||
attrs.insert(name, props);
|
||||
}
|
||||
|
||||
bincode::serialize_into(writer, &attrs)
|
||||
}
|
||||
|
||||
pub fn props(&self, attr: SchemaAttr) -> SchemaProps {
|
||||
self.props[attr.as_u32() as usize]
|
||||
}
|
||||
|
||||
pub fn attribute<S: AsRef<str>>(&self, name: S) -> Option<SchemaAttr> {
|
||||
self.attrs.get(name.as_ref()).cloned()
|
||||
}
|
||||
|
||||
pub fn attribute_name(&self, attr: SchemaAttr) -> &str {
|
||||
// FIXME complexity is insane !
|
||||
for (key, &value) in &self.attrs {
|
||||
if value == attr { return &key }
|
||||
}
|
||||
panic!("schema attribute name not found for {:?}", attr)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialOrd, Ord, PartialEq, Eq)]
|
||||
pub struct SchemaAttr(u32);
|
||||
|
||||
impl SchemaAttr {
|
||||
pub fn new(value: u32) -> SchemaAttr {
|
||||
SchemaAttr(value)
|
||||
}
|
||||
|
||||
pub fn as_u32(&self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SchemaAttr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn serialize_deserialize() -> bincode::Result<()> {
|
||||
let mut builder = SchemaBuilder::new();
|
||||
builder.new_attribute("alphabet", STORED);
|
||||
builder.new_attribute("beta", STORED | INDEXED);
|
||||
builder.new_attribute("gamma", INDEXED);
|
||||
let schema = builder.build();
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
schema.write_to(&mut buffer)?;
|
||||
let schema2 = Schema::read_from(buffer.as_slice())?;
|
||||
|
||||
assert_eq!(schema, schema2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
35
src/database/update/mod.rs
Normal file
35
src/database/update/mod.rs
Normal file
|
@ -0,0 +1,35 @@
|
|||
use std::path::PathBuf;
|
||||
use std::error::Error;
|
||||
|
||||
mod negative;
|
||||
mod positive;
|
||||
|
||||
pub use self::positive::{PositiveUpdateBuilder, NewState};
|
||||
pub use self::negative::NegativeUpdateBuilder;
|
||||
|
||||
pub struct Update {
|
||||
path: PathBuf,
|
||||
can_be_moved: bool,
|
||||
}
|
||||
|
||||
impl Update {
|
||||
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
|
||||
Ok(Update { path: path.into(), can_be_moved: false })
|
||||
}
|
||||
|
||||
pub fn open_and_move<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
|
||||
Ok(Update { path: path.into(), can_be_moved: true })
|
||||
}
|
||||
|
||||
pub fn set_move(&mut self, can_be_moved: bool) {
|
||||
self.can_be_moved = can_be_moved
|
||||
}
|
||||
|
||||
pub fn can_be_moved(&self) -> bool {
|
||||
self.can_be_moved
|
||||
}
|
||||
|
||||
pub fn into_path_buf(self) -> PathBuf {
|
||||
self.path
|
||||
}
|
||||
}
|
4
src/database/update/negative/mod.rs
Normal file
4
src/database/update/negative/mod.rs
Normal file
|
@ -0,0 +1,4 @@
|
|||
mod update;
|
||||
mod unordered_builder;
|
||||
|
||||
pub use self::update::NegativeUpdateBuilder;
|
37
src/database/update/negative/unordered_builder.rs
Normal file
37
src/database/update/negative/unordered_builder.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::io;
|
||||
|
||||
use byteorder::{NativeEndian, WriteBytesExt};
|
||||
|
||||
use crate::DocumentId;
|
||||
|
||||
pub struct UnorderedNegativeBlobBuilder<W> {
|
||||
doc_ids: BTreeSet<DocumentId>, // TODO: prefer a linked-list
|
||||
wrt: W,
|
||||
}
|
||||
|
||||
impl UnorderedNegativeBlobBuilder<Vec<u8>> {
|
||||
pub fn memory() -> Self {
|
||||
UnorderedNegativeBlobBuilder::new(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: io::Write> UnorderedNegativeBlobBuilder<W> {
|
||||
pub fn new(wrt: W) -> Self {
|
||||
Self {
|
||||
doc_ids: BTreeSet::new(),
|
||||
wrt: wrt,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, doc: DocumentId) -> bool {
|
||||
self.doc_ids.insert(doc)
|
||||
}
|
||||
|
||||
pub fn into_inner(mut self) -> io::Result<W> {
|
||||
for id in self.doc_ids {
|
||||
self.wrt.write_u64::<NativeEndian>(id)?;
|
||||
}
|
||||
Ok(self.wrt)
|
||||
}
|
||||
}
|
60
src/database/update/negative/update.rs
Normal file
60
src/database/update/negative/update.rs
Normal file
|
@ -0,0 +1,60 @@
|
|||
use std::path::PathBuf;
|
||||
use std::error::Error;
|
||||
|
||||
use ::rocksdb::rocksdb_options;
|
||||
|
||||
use crate::database::update::negative::unordered_builder::UnorderedNegativeBlobBuilder;
|
||||
use crate::database::blob::{Blob, NegativeBlob};
|
||||
use crate::database::update::Update;
|
||||
use crate::database::DocumentKey;
|
||||
use crate::database::DATA_INDEX;
|
||||
use crate::DocumentId;
|
||||
|
||||
pub struct NegativeUpdateBuilder {
|
||||
path: PathBuf,
|
||||
doc_ids: UnorderedNegativeBlobBuilder<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl NegativeUpdateBuilder {
|
||||
pub fn new<P: Into<PathBuf>>(path: P) -> NegativeUpdateBuilder {
|
||||
NegativeUpdateBuilder {
|
||||
path: path.into(),
|
||||
doc_ids: UnorderedNegativeBlobBuilder::memory(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, id: DocumentId) -> bool {
|
||||
self.doc_ids.insert(id)
|
||||
}
|
||||
|
||||
pub fn build(self) -> Result<Update, Box<Error>> {
|
||||
let env_options = rocksdb_options::EnvOptions::new();
|
||||
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
|
||||
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
|
||||
file_writer.open(&self.path.to_string_lossy())?;
|
||||
|
||||
let bytes = self.doc_ids.into_inner()?;
|
||||
let negative_blob = NegativeBlob::from_bytes(bytes)?;
|
||||
let blob = Blob::Negative(negative_blob);
|
||||
|
||||
// write the data-index aka negative blob
|
||||
let bytes = bincode::serialize(&blob)?;
|
||||
file_writer.merge(DATA_INDEX, &bytes)?;
|
||||
|
||||
// FIXME remove this ugly thing !
|
||||
// let Blob::Negative(negative_blob) = blob;
|
||||
let negative_blob = match blob {
|
||||
Blob::Negative(blob) => blob,
|
||||
Blob::Positive(_) => unreachable!(),
|
||||
};
|
||||
|
||||
for &document_id in negative_blob.as_ref() {
|
||||
let start = DocumentKey::new(document_id);
|
||||
let end = DocumentKey::new(document_id + 1);
|
||||
file_writer.delete_range(start.as_ref(), end.as_ref())?;
|
||||
}
|
||||
|
||||
file_writer.finish()?;
|
||||
Update::open(self.path)
|
||||
}
|
||||
}
|
4
src/database/update/positive/mod.rs
Normal file
4
src/database/update/positive/mod.rs
Normal file
|
@ -0,0 +1,4 @@
|
|||
mod update;
|
||||
mod unordered_builder;
|
||||
|
||||
pub use self::update::{PositiveUpdateBuilder, NewState};
|
45
src/database/update/positive/unordered_builder.rs
Normal file
45
src/database/update/positive/unordered_builder.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::error::Error;
|
||||
use std::io::Write;
|
||||
|
||||
use crate::database::blob::positive::PositiveBlobBuilder;
|
||||
use crate::DocIndex;
|
||||
|
||||
pub struct UnorderedPositiveBlobBuilder<W, X> {
|
||||
builder: PositiveBlobBuilder<W, X>,
|
||||
map: BTreeMap<Vec<u8>, Vec<DocIndex>>,
|
||||
}
|
||||
|
||||
impl UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>> {
|
||||
pub fn memory() -> Self {
|
||||
Self {
|
||||
builder: PositiveBlobBuilder::memory(),
|
||||
map: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: Write, X: Write> UnorderedPositiveBlobBuilder<W, X> {
|
||||
pub fn new(map_wtr: W, doc_wtr: X) -> Result<Self, Box<Error>> {
|
||||
Ok(UnorderedPositiveBlobBuilder {
|
||||
builder: PositiveBlobBuilder::new(map_wtr, doc_wtr)?,
|
||||
map: BTreeMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn insert<K: Into<Vec<u8>>>(&mut self, input: K, doc_index: DocIndex) {
|
||||
self.map.entry(input.into()).or_insert_with(Vec::new).push(doc_index);
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Result<(), Box<Error>> {
|
||||
self.into_inner().map(drop)
|
||||
}
|
||||
|
||||
pub fn into_inner(mut self) -> Result<(W, X), Box<Error>> {
|
||||
for (key, mut doc_indexes) in self.map {
|
||||
doc_indexes.sort_unstable();
|
||||
self.builder.insert(&key, &doc_indexes)?;
|
||||
}
|
||||
self.builder.into_inner()
|
||||
}
|
||||
}
|
515
src/database/update/positive/update.rs
Normal file
515
src/database/update/positive/update.rs
Normal file
|
@ -0,0 +1,515 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::path::PathBuf;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
|
||||
use ::rocksdb::rocksdb_options;
|
||||
use serde::ser::{self, Serialize};
|
||||
|
||||
use crate::database::update::positive::unordered_builder::UnorderedPositiveBlobBuilder;
|
||||
use crate::database::blob::positive::PositiveBlob;
|
||||
use crate::database::schema::{Schema, SchemaAttr};
|
||||
use crate::tokenizer::TokenizerBuilder;
|
||||
use crate::database::DocumentKeyAttr;
|
||||
use crate::database::update::Update;
|
||||
use crate::{DocumentId, DocIndex};
|
||||
use crate::database::DATA_INDEX;
|
||||
use crate::database::blob::Blob;
|
||||
|
||||
pub enum NewState {
|
||||
Updated { value: Vec<u8> },
|
||||
Removed,
|
||||
}
|
||||
|
||||
pub struct PositiveUpdateBuilder<B> {
|
||||
path: PathBuf,
|
||||
schema: Schema,
|
||||
tokenizer_builder: B,
|
||||
builder: UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
|
||||
new_states: BTreeMap<(DocumentId, SchemaAttr), NewState>,
|
||||
}
|
||||
|
||||
impl<B> PositiveUpdateBuilder<B> {
|
||||
pub fn new<P: Into<PathBuf>>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder<B> {
|
||||
PositiveUpdateBuilder {
|
||||
path: path.into(),
|
||||
schema: schema,
|
||||
tokenizer_builder: tokenizer_builder,
|
||||
builder: UnorderedPositiveBlobBuilder::memory(),
|
||||
new_states: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update<T: Serialize>(&mut self, id: DocumentId, document: &T) -> Result<(), Box<Error>>
|
||||
where B: TokenizerBuilder
|
||||
{
|
||||
let serializer = Serializer {
|
||||
schema: &self.schema,
|
||||
document_id: id,
|
||||
tokenizer_builder: &self.tokenizer_builder,
|
||||
builder: &mut self.builder,
|
||||
new_states: &mut self.new_states
|
||||
};
|
||||
|
||||
Ok(ser::Serialize::serialize(document, serializer)?)
|
||||
}
|
||||
|
||||
// TODO value must be a field that can be indexed
|
||||
pub fn update_field(&mut self, id: DocumentId, field: SchemaAttr, value: String) {
|
||||
let value = bincode::serialize(&value).unwrap();
|
||||
self.new_states.insert((id, field), NewState::Updated { value });
|
||||
}
|
||||
|
||||
pub fn remove_field(&mut self, id: DocumentId, field: SchemaAttr) {
|
||||
self.new_states.insert((id, field), NewState::Removed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SerializerError {
|
||||
SchemaDontMatch { attribute: String },
|
||||
UnserializableType { name: &'static str },
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
impl ser::Error for SerializerError {
|
||||
fn custom<T: fmt::Display>(msg: T) -> Self {
|
||||
SerializerError::Custom(msg.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SerializerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
SerializerError::SchemaDontMatch { attribute } => {
|
||||
write!(f, "serialized document try to specify the \
|
||||
{:?} attribute that is not known by the schema", attribute)
|
||||
},
|
||||
SerializerError::UnserializableType { name } => {
|
||||
write!(f, "Only struct and map types are considered valid documents and
|
||||
can be serialized, not {} types directly.", name)
|
||||
},
|
||||
SerializerError::Custom(s) => f.write_str(&s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for SerializerError {}
|
||||
|
||||
struct Serializer<'a, B> {
|
||||
schema: &'a Schema,
|
||||
tokenizer_builder: &'a B,
|
||||
document_id: DocumentId,
|
||||
builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
|
||||
new_states: &'a mut BTreeMap<(DocumentId, SchemaAttr), NewState>,
|
||||
}
|
||||
|
||||
macro_rules! forward_to_unserializable_type {
|
||||
($($ty:ident => $se_method:ident,)*) => {
|
||||
$(
|
||||
fn $se_method(self, v: $ty) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "$ty" })
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, B> ser::Serializer for Serializer<'a, B>
|
||||
where B: TokenizerBuilder
|
||||
{
|
||||
type Ok = ();
|
||||
type Error = SerializerError;
|
||||
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeStruct = StructSerializer<'a, B>;
|
||||
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
|
||||
|
||||
forward_to_unserializable_type! {
|
||||
bool => serialize_bool,
|
||||
char => serialize_char,
|
||||
|
||||
i8 => serialize_i8,
|
||||
i16 => serialize_i16,
|
||||
i32 => serialize_i32,
|
||||
i64 => serialize_i64,
|
||||
|
||||
u8 => serialize_u8,
|
||||
u16 => serialize_u16,
|
||||
u32 => serialize_u32,
|
||||
u64 => serialize_u64,
|
||||
|
||||
f32 => serialize_f32,
|
||||
f64 => serialize_f64,
|
||||
}
|
||||
|
||||
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "str" })
|
||||
}
|
||||
|
||||
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "&[u8]" })
|
||||
}
|
||||
|
||||
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "Option" })
|
||||
}
|
||||
|
||||
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "Option" })
|
||||
}
|
||||
|
||||
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "()" })
|
||||
}
|
||||
|
||||
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "unit struct" })
|
||||
}
|
||||
|
||||
fn serialize_unit_variant(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str
|
||||
) -> Result<Self::Ok, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "unit variant" })
|
||||
}
|
||||
|
||||
fn serialize_newtype_struct<T: ?Sized>(
|
||||
self,
|
||||
_name: &'static str,
|
||||
value: &T
|
||||
) -> Result<Self::Ok, Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
value.serialize(self)
|
||||
}
|
||||
|
||||
fn serialize_newtype_variant<T: ?Sized>(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str,
|
||||
_value: &T
|
||||
) -> Result<Self::Ok, Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "newtype variant" })
|
||||
}
|
||||
|
||||
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "sequence" })
|
||||
}
|
||||
|
||||
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "tuple" })
|
||||
}
|
||||
|
||||
fn serialize_tuple_struct(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeTupleStruct, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "tuple struct" })
|
||||
}
|
||||
|
||||
fn serialize_tuple_variant(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeTupleVariant, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "tuple variant" })
|
||||
}
|
||||
|
||||
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
|
||||
// Ok(MapSerializer {
|
||||
// schema: self.schema,
|
||||
// document_id: self.document_id,
|
||||
// new_states: self.new_states,
|
||||
// })
|
||||
Err(SerializerError::UnserializableType { name: "map" })
|
||||
}
|
||||
|
||||
fn serialize_struct(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeStruct, Self::Error>
|
||||
{
|
||||
Ok(StructSerializer {
|
||||
schema: self.schema,
|
||||
tokenizer_builder: self.tokenizer_builder,
|
||||
document_id: self.document_id,
|
||||
builder: self.builder,
|
||||
new_states: self.new_states,
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_struct_variant(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeStructVariant, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "struct variant" })
|
||||
}
|
||||
}
|
||||
|
||||
struct StructSerializer<'a, B> {
|
||||
schema: &'a Schema,
|
||||
tokenizer_builder: &'a B,
|
||||
document_id: DocumentId,
|
||||
builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
|
||||
new_states: &'a mut BTreeMap<(DocumentId, SchemaAttr), NewState>,
|
||||
}
|
||||
|
||||
impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B>
|
||||
where B: TokenizerBuilder
|
||||
{
|
||||
type Ok = ();
|
||||
type Error = SerializerError;
|
||||
|
||||
fn serialize_field<T: ?Sized>(
|
||||
&mut self,
|
||||
key: &'static str,
|
||||
value: &T
|
||||
) -> Result<(), Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
match self.schema.attribute(key) {
|
||||
Some(attr) => {
|
||||
let props = self.schema.props(attr);
|
||||
if props.is_stored() {
|
||||
let value = bincode::serialize(value).unwrap();
|
||||
self.new_states.insert((self.document_id, attr), NewState::Updated { value });
|
||||
}
|
||||
if props.is_indexed() {
|
||||
let serializer = IndexerSerializer {
|
||||
builder: self.builder,
|
||||
tokenizer_builder: self.tokenizer_builder,
|
||||
document_id: self.document_id,
|
||||
attribute: attr,
|
||||
};
|
||||
value.serialize(serializer)?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
None => Err(SerializerError::SchemaDontMatch { attribute: key.to_owned() }),
|
||||
}
|
||||
}
|
||||
|
||||
fn end(self) -> Result<Self::Ok, Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct IndexerSerializer<'a, B> {
|
||||
tokenizer_builder: &'a B,
|
||||
builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
|
||||
document_id: DocumentId,
|
||||
attribute: SchemaAttr,
|
||||
}
|
||||
|
||||
impl<'a, B> ser::Serializer for IndexerSerializer<'a, B>
|
||||
where B: TokenizerBuilder
|
||||
{
|
||||
type Ok = ();
|
||||
type Error = SerializerError;
|
||||
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeStruct = ser::Impossible<Self::Ok, Self::Error>;
|
||||
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
|
||||
|
||||
forward_to_unserializable_type! {
|
||||
bool => serialize_bool,
|
||||
char => serialize_char,
|
||||
|
||||
i8 => serialize_i8,
|
||||
i16 => serialize_i16,
|
||||
i32 => serialize_i32,
|
||||
i64 => serialize_i64,
|
||||
|
||||
u8 => serialize_u8,
|
||||
u16 => serialize_u16,
|
||||
u32 => serialize_u32,
|
||||
u64 => serialize_u64,
|
||||
|
||||
f32 => serialize_f32,
|
||||
f64 => serialize_f64,
|
||||
}
|
||||
|
||||
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
|
||||
for (index, word) in self.tokenizer_builder.build(v) {
|
||||
let doc_index = DocIndex {
|
||||
document_id: self.document_id,
|
||||
attribute: self.attribute.as_u32() as u8,
|
||||
attribute_index: index as u32,
|
||||
};
|
||||
|
||||
// insert the exact representation
|
||||
let word_lower = word.to_lowercase();
|
||||
|
||||
// and the unidecoded lowercased version
|
||||
let word_unidecoded = unidecode::unidecode(word).to_lowercase();
|
||||
if word_lower != word_unidecoded {
|
||||
self.builder.insert(word_unidecoded, doc_index);
|
||||
}
|
||||
|
||||
self.builder.insert(word_lower, doc_index);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "&[u8]" })
|
||||
}
|
||||
|
||||
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "Option" })
|
||||
}
|
||||
|
||||
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "Option" })
|
||||
}
|
||||
|
||||
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "()" })
|
||||
}
|
||||
|
||||
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "unit struct" })
|
||||
}
|
||||
|
||||
fn serialize_unit_variant(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str
|
||||
) -> Result<Self::Ok, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "unit variant" })
|
||||
}
|
||||
|
||||
fn serialize_newtype_struct<T: ?Sized>(
|
||||
self,
|
||||
_name: &'static str,
|
||||
value: &T
|
||||
) -> Result<Self::Ok, Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
value.serialize(self)
|
||||
}
|
||||
|
||||
fn serialize_newtype_variant<T: ?Sized>(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str,
|
||||
_value: &T
|
||||
) -> Result<Self::Ok, Self::Error>
|
||||
where T: Serialize,
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "newtype variant" })
|
||||
}
|
||||
|
||||
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "seq" })
|
||||
}
|
||||
|
||||
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "tuple" })
|
||||
}
|
||||
|
||||
fn serialize_tuple_struct(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeTupleStruct, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "tuple struct" })
|
||||
}
|
||||
|
||||
fn serialize_tuple_variant(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeTupleVariant, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "tuple variant" })
|
||||
}
|
||||
|
||||
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
|
||||
Err(SerializerError::UnserializableType { name: "map" })
|
||||
}
|
||||
|
||||
fn serialize_struct(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeStruct, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "struct" })
|
||||
}
|
||||
|
||||
fn serialize_struct_variant(
|
||||
self,
|
||||
_name: &'static str,
|
||||
_variant_index: u32,
|
||||
_variant: &'static str,
|
||||
_len: usize
|
||||
) -> Result<Self::SerializeStructVariant, Self::Error>
|
||||
{
|
||||
Err(SerializerError::UnserializableType { name: "struct variant" })
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> PositiveUpdateBuilder<B> {
|
||||
pub fn build(self) -> Result<Update, Box<Error>> {
|
||||
let env_options = rocksdb_options::EnvOptions::new();
|
||||
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
|
||||
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
|
||||
file_writer.open(&self.path.to_string_lossy())?;
|
||||
|
||||
let (blob_fst_map, blob_doc_idx) = self.builder.into_inner()?;
|
||||
let positive_blob = PositiveBlob::from_bytes(blob_fst_map, blob_doc_idx)?;
|
||||
let blob = Blob::Positive(positive_blob);
|
||||
|
||||
// write the data-index aka positive blob
|
||||
let bytes = bincode::serialize(&blob)?;
|
||||
file_writer.merge(DATA_INDEX, &bytes)?;
|
||||
|
||||
// write all the documents fields updates
|
||||
for ((id, attr), state) in self.new_states {
|
||||
let key = DocumentKeyAttr::new(id, attr);
|
||||
let props = self.schema.props(attr);
|
||||
match state {
|
||||
NewState::Updated { value } => if props.is_stored() {
|
||||
file_writer.put(key.as_ref(), &value)?
|
||||
},
|
||||
NewState::Removed => file_writer.delete(key.as_ref())?,
|
||||
}
|
||||
}
|
||||
|
||||
file_writer.finish()?;
|
||||
Update::open(self.path)
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue