From 612a8d9d44d30af0ffdf8f6d33765b38db4d3d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 28 Nov 2018 17:12:24 +0100 Subject: [PATCH] feat: Make the OpBuilder work only for PositiveBlob --- Cargo.toml | 2 +- src/blob/merge.rs | 504 ------------------ src/blob/mod.rs | 31 +- .../{negative_blob.rs => negative/blob.rs} | 6 + src/blob/negative/mod.rs | 5 + src/blob/negative/ops.rs | 73 +++ src/blob/ops.rs | 381 +++---------- src/blob/ops_indexed_value.rs | 203 ------- .../{positive_blob.rs => positive/blob.rs} | 84 ++- src/blob/positive/mod.rs | 5 + src/blob/positive/ops.rs | 128 +++++ src/data/doc_ids.rs | 2 +- src/data/doc_indexes.rs | 39 +- src/data/mod.rs | 24 +- src/index/identifier.rs | 64 --- src/index/mod.rs | 37 +- src/index/update/mod.rs | 14 +- src/index/update/negative_update.rs | 46 +- src/index/update/positive_update.rs | 104 ++-- src/lib.rs | 2 +- src/rank/distinct_map.rs | 65 +++ src/rank/mod.rs | 3 +- src/rank/ranked_stream.rs | 179 ++----- src/{database.rs => retrieve.rs} | 30 +- 24 files changed, 658 insertions(+), 1373 deletions(-) delete mode 100644 src/blob/merge.rs rename src/blob/{negative_blob.rs => negative/blob.rs} (94%) create mode 100644 src/blob/negative/mod.rs create mode 100644 src/blob/negative/ops.rs delete mode 100644 src/blob/ops_indexed_value.rs rename src/blob/{positive_blob.rs => positive/blob.rs} (63%) create mode 100644 src/blob/positive/mod.rs create mode 100644 src/blob/positive/ops.rs delete mode 100644 src/index/identifier.rs create mode 100644 src/rank/distinct_map.rs rename src/{database.rs => retrieve.rs} (57%) diff --git a/Cargo.toml b/Cargo.toml index c95d91bee..4e0799aab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,13 @@ authors = ["Kerollmops "] bincode = "1.0" byteorder = "1.2" fnv = "1.0" +itertools = "0.7" lazy_static = "1.1" linked-hash-map = { version = "0.5", features = ["serde_impl"] } sdset = "0.3" serde = "1.0" serde_derive = "1.0" unidecode = "0.3" -uuid = { version = "0.7", features = ["serde", "v4"] } [dependencies.fst] git = "https://github.com/Kerollmops/fst.git" diff --git a/src/blob/merge.rs b/src/blob/merge.rs deleted file mode 100644 index c16e62b27..000000000 --- a/src/blob/merge.rs +++ /dev/null @@ -1,504 +0,0 @@ -use std::collections::BinaryHeap; -use std::rc::Rc; -use std::cmp; - -use fst::{Automaton, Streamer}; -use fst::automaton::AlwaysMatch; -use sdset::{Set, SetOperation}; -use sdset::duo::OpBuilder as SdOpBuilder; -use group_by::GroupBy; - -use crate::DocIndex; -use crate::blob::{Blob, Sign}; -use crate::vec_read_only::VecReadOnly; -use crate::blob::ops::{OpBuilder, Union, IndexedDocIndexes}; - -fn group_is_negative(blobs: &&[Blob]) -> bool { - blobs[0].sign() == Sign::Negative -} - -fn blob_same_sign(a: &Blob, b: &Blob) -> bool { - a.sign() == b.sign() -} - -fn sign_from_group_index(group: usize) -> Sign { - if group % 2 == 0 { - Sign::Positive - } else { - Sign::Negative - } -} - -pub struct Merge<'b> { - heap: GroupHeap<'b>, - outs: Vec, - cur_slot: Option, -} - -impl<'b> Merge<'b> { - pub fn always_match(blobs: &'b [Blob]) -> Self { - Self::with_automatons(vec![AlwaysMatch], blobs) - } -} - -impl<'b> Merge<'b> { - pub fn with_automatons(automatons: Vec, blobs: &'b [Blob]) -> Self - where A: 'b + Automaton + Clone - { - let mut groups = Vec::new(); - // We can skip blobs that are negative: they didn't remove anything at the start - for blobs in GroupBy::new(blobs, blob_same_sign).skip_while(group_is_negative) { - let mut builder = OpBuilder::with_automatons(automatons.clone()); - for blob in blobs { - builder.push(blob); - } - groups.push(builder.union()); - } - - let mut heap = GroupHeap::new(groups); - heap.refill(); - - Merge { - heap: heap, - outs: Vec::new(), - cur_slot: None, - } - } -} - -impl<'b, 'a> Streamer<'a> for Merge<'b> { - type Item = (&'a [u8], &'a [IndexedDocIndexes]); - - fn next(&'a mut self) -> Option { - self.outs.clear(); - loop { - if let Some(slot) = self.cur_slot.take() { - self.heap.refill(); - } - let slot = match self.heap.pop() { - None => return None, - Some(slot) => { - self.cur_slot = Some(slot); - self.cur_slot.as_ref().unwrap() - } - }; - - let mut doc_indexes = Vec::new(); - let mut doc_indexes_slots = Vec::with_capacity(self.heap.num_groups()); - - let len = match sign_from_group_index(slot.grp_index) { - Sign::Positive => { - doc_indexes.extend_from_slice(&slot.output); - slot.output.len() - }, - Sign::Negative => 0, - }; - - let mut slotidi = SlotIndexedDocIndexes { - index: slot.aut_index, - start: 0, - len: len, - }; - - let mut buffer = Vec::new(); - while let Some(slot2) = self.heap.pop_if_equal(slot.input()) { - if slotidi.index == slot2.aut_index { - buffer.clear(); - buffer.extend(doc_indexes.drain(slotidi.start..)); - - let a = Set::new_unchecked(&buffer); - let b = Set::new_unchecked(&slot2.output); - match sign_from_group_index(slot2.grp_index) { - Sign::Positive => { SdOpBuilder::new(a, b).union().extend_vec(&mut doc_indexes) }, - Sign::Negative => SdOpBuilder::new(a, b).difference().extend_vec(&mut doc_indexes), - } - slotidi.len = doc_indexes.len() - slotidi.start; - - } else { - if slotidi.len != 0 { - doc_indexes_slots.push(slotidi); - } - slotidi = SlotIndexedDocIndexes { - index: slot2.aut_index, - start: doc_indexes.len(), - len: slot2.output.len(), - }; - buffer.extend_from_slice(&slot2.output); - } - } - - if slotidi.len != 0 { - doc_indexes_slots.push(slotidi); - } - - let read_only = VecReadOnly::new(doc_indexes); - self.outs.reserve(doc_indexes_slots.len()); - for slot in doc_indexes_slots { - let indexes = IndexedDocIndexes { - index: slot.index, - doc_indexes: read_only.range(slot.start, slot.len), - }; - self.outs.push(indexes); - } - - if !self.outs.is_empty() { - let slot = self.cur_slot.as_ref().unwrap(); // FIXME - return Some((slot.input(), &self.outs)) - } - } - } -} - -struct SlotIndexedDocIndexes { - index: usize, - start: usize, - len: usize, -} - -#[derive(Debug, Eq, PartialEq)] -struct Slot { - grp_index: usize, - aut_index: usize, - input: Rc>, - output: VecReadOnly, -} - -impl Slot { - fn input(&self) -> &[u8] { - &self.input - } -} - -impl PartialOrd for Slot { - fn partial_cmp(&self, other: &Slot) -> Option { - (&self.input, self.aut_index, self.grp_index, &self.output) - .partial_cmp(&(&other.input, other.aut_index, other.grp_index, &other.output)) - .map(|ord| ord.reverse()) - } -} - -impl Ord for Slot { - fn cmp(&self, other: &Slot) -> cmp::Ordering { - self.partial_cmp(other).unwrap() - } -} - -struct GroupHeap<'b> { - groups: Vec>, - heap: BinaryHeap, -} - -impl<'b> GroupHeap<'b> { - fn new(groups: Vec>) -> GroupHeap<'b> { - GroupHeap { - groups: groups, - heap: BinaryHeap::new(), - } - } - - fn num_groups(&self) -> usize { - self.groups.len() - } - - fn pop(&mut self) -> Option { - self.heap.pop() - } - - fn peek_is_duplicate(&self, key: &[u8]) -> bool { - self.heap.peek().map(|s| *s.input == key).unwrap_or(false) - } - - fn pop_if_equal(&mut self, key: &[u8]) -> Option { - if self.peek_is_duplicate(key) { self.pop() } else { None } - } - - fn refill(&mut self) { - for (i, group) in self.groups.iter_mut().enumerate() { - if let Some((input, doc_indexes)) = group.next() { - let input = Rc::new(input.to_vec()); - for doc_index in doc_indexes { - let slot = Slot { - input: input.clone(), - grp_index: i, - aut_index: doc_index.index, - output: doc_index.doc_indexes.clone(), - }; - self.heap.push(slot); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::blob::{PositiveBlobBuilder, NegativeBlobBuilder}; - use crate::DocIndex; - - fn get_all<'m, I, S>(stream: I) -> Vec<(String, VecReadOnly)> - where - I: for<'a> fst::IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [IndexedDocIndexes])>, - S: 'm + for<'a> fst::Streamer<'a, Item=(&'a [u8], &'a [IndexedDocIndexes])>, - { - let mut result = Vec::new(); - - let mut stream = stream.into_stream(); - while let Some((string, indexes)) = stream.next() { - let string = String::from_utf8(string.to_owned()).unwrap(); - result.push((string, indexes[0].doc_indexes.clone())) - } - - result - } - - #[test] - fn single_positive_blob() { - let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 }; - let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 }; - let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 }; - let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 }; - - let a = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("hell", doc2); - builder.insert("hello", doc3); - builder.insert("wor", doc4); - - Blob::Positive(builder.build().unwrap()) - }; - - let blobs = &[a]; - let merge = Merge::always_match(blobs); - - let value = get_all(merge); - assert_eq!(value.len(), 3); - - assert_eq!(value[0].0, "hell"); - assert_eq!(&*value[0].1, &[doc1, doc2][..]); - - assert_eq!(value[1].0, "hello"); - assert_eq!(&*value[1].1, &[doc3][..]); - - assert_eq!(value[2].0, "wor"); - assert_eq!(&*value[2].1, &[doc4][..]); - } - - #[test] - fn single_negative_blob() { - let a = { - let mut builder = NegativeBlobBuilder::new(Vec::new()); - - builder.insert(1); - builder.insert(2); - builder.insert(3); - builder.insert(4); - - Blob::Negative(builder.build().unwrap()) - }; - - let blobs = &[a]; - let merge = Merge::always_match(blobs); - - let value = get_all(merge); - assert_eq!(value.len(), 0); - } - - #[test] - fn two_positive_blobs() { - let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 }; - let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 }; - let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 }; - let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 }; - - let a = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("wor", doc4); - - Blob::Positive(builder.build().unwrap()) - }; - - let b = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc2); - builder.insert("hello", doc3); - - Blob::Positive(builder.build().unwrap()) - }; - - let blobs = &[a, b]; - let merge = Merge::always_match(blobs); - - let value = get_all(merge); - assert_eq!(value.len(), 3); - - assert_eq!(value[0].0, "hell"); - assert_eq!(&*value[0].1, &[doc1, doc2][..]); - - assert_eq!(value[1].0, "hello"); - assert_eq!(&*value[1].1, &[doc3][..]); - - assert_eq!(value[2].0, "wor"); - assert_eq!(&*value[2].1, &[doc4][..]); - } - - #[test] - fn one_positive_one_negative_blobs() { - let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 }; - let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 }; - let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 }; - let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 }; - - let a = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("hell", doc2); - builder.insert("hello", doc3); - builder.insert("wor", doc4); - - Blob::Positive(builder.build().unwrap()) - }; - - let b = { - let mut builder = NegativeBlobBuilder::new(Vec::new()); - - builder.insert(2); - builder.insert(3); - - Blob::Negative(builder.build().unwrap()) - }; - - let blobs = &[a, b]; - let merge = Merge::always_match(blobs); - - let value = get_all(merge); - assert_eq!(value.len(), 2); - - assert_eq!(value[0].0, "hell"); - assert_eq!(&*value[0].1, &[doc1][..]); - - assert_eq!(value[1].0, "wor"); - assert_eq!(&*value[1].1, &[doc4][..]); - } - - #[test] - fn alternate_positive_negative_blobs() { - let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 }; - let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 }; - let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 }; - let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 }; - - let a = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("hell", doc2); - builder.insert("hello", doc3); - - Blob::Positive(builder.build().unwrap()) - }; - - let b = { - let mut builder = NegativeBlobBuilder::new(Vec::new()); - - builder.insert(1); - builder.insert(4); - - Blob::Negative(builder.build().unwrap()) - }; - - let c = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("wor", doc4); - - Blob::Positive(builder.build().unwrap()) - }; - - let d = { - let mut builder = NegativeBlobBuilder::new(Vec::new()); - - builder.insert(1); - - Blob::Negative(builder.build().unwrap()) - }; - - let blobs = &[a, b, c, d]; - let merge = Merge::always_match(blobs); - - let value = get_all(merge); - assert_eq!(value.len(), 3); - - assert_eq!(value[0].0, "hell"); - assert_eq!(&*value[0].1, &[doc2][..]); - - assert_eq!(value[1].0, "hello"); - assert_eq!(&*value[1].1, &[doc3][..]); - - assert_eq!(value[2].0, "wor"); - assert_eq!(&*value[2].1, &[doc4][..]); - } - - #[test] - fn alternate_multiple_positive_negative_blobs() { - let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 }; - let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 }; - let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 }; - let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 }; - - let a = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("hell", doc2); - builder.insert("hello", doc3); - - Blob::Positive(builder.build().unwrap()) - }; - - let b = { - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - - builder.insert("hell", doc1); - builder.insert("wor", doc4); - - Blob::Positive(builder.build().unwrap()) - }; - - let c = { - let mut builder = NegativeBlobBuilder::new(Vec::new()); - - builder.insert(1); - builder.insert(4); - - Blob::Negative(builder.build().unwrap()) - }; - - let d = { - let mut builder = NegativeBlobBuilder::new(Vec::new()); - - builder.insert(1); - - Blob::Negative(builder.build().unwrap()) - }; - - let blobs = &[a, b, c, d]; - let merge = Merge::always_match(blobs); - - let value = get_all(merge); - assert_eq!(value.len(), 2); - - assert_eq!(value[0].0, "hell"); - assert_eq!(&*value[0].1, &[doc2][..]); - - assert_eq!(value[1].0, "hello"); - assert_eq!(&*value[1].1, &[doc3][..]); - } -} diff --git a/src/blob/mod.rs b/src/blob/mod.rs index af0b52625..daafcf1c8 100644 --- a/src/blob/mod.rs +++ b/src/blob/mod.rs @@ -1,31 +1,30 @@ -mod merge; -pub mod ops; -mod ops_indexed_value; -mod positive_blob; -mod negative_blob; +mod ops; +pub mod positive; +pub mod negative; -pub use self::merge::Merge; -pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder}; -pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder}; +pub use self::positive::{PositiveBlob, RawPositiveBlobBuilder, PositiveBlobBuilder}; +pub use self::negative::{NegativeBlob, NegativeBlobBuilder}; +pub use self::ops::OpBuilder; -use std::error::Error; -use std::io::{Write, Read}; -use std::{io, fmt, mem}; +use std::fmt; -use uuid::Uuid; -use rocksdb::rocksdb::{DB, Snapshot}; use serde::ser::{Serialize, Serializer, SerializeTuple}; use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor}; -use crate::index::identifier::Identifier; -use crate::data::DocIndexes; - pub enum Blob { Positive(PositiveBlob), Negative(NegativeBlob), } impl Blob { + pub fn is_negative(&self) -> bool { + self.sign() == Sign::Negative + } + + pub fn is_positive(&self) -> bool { + self.sign() == Sign::Positive + } + pub fn sign(&self) -> Sign { match self { Blob::Positive(_) => Sign::Positive, diff --git a/src/blob/negative_blob.rs b/src/blob/negative/blob.rs similarity index 94% rename from src/blob/negative_blob.rs rename to src/blob/negative/blob.rs index e81ad2616..53e50b021 100644 --- a/src/blob/negative_blob.rs +++ b/src/blob/negative/blob.rs @@ -37,6 +37,12 @@ impl NegativeBlob { } } +impl AsRef<[DocumentId]> for NegativeBlob { + fn as_ref(&self) -> &[DocumentId] { + self.as_ids().doc_ids() + } +} + impl Serialize for NegativeBlob { fn serialize(&self, serializer: S) -> Result { self.doc_ids.serialize(serializer) diff --git a/src/blob/negative/mod.rs b/src/blob/negative/mod.rs new file mode 100644 index 000000000..56c9f4ef4 --- /dev/null +++ b/src/blob/negative/mod.rs @@ -0,0 +1,5 @@ +mod blob; +mod ops; + +pub use self::blob::{NegativeBlob, NegativeBlobBuilder}; +pub use self::ops::OpBuilder; diff --git a/src/blob/negative/ops.rs b/src/blob/negative/ops.rs new file mode 100644 index 000000000..136f23533 --- /dev/null +++ b/src/blob/negative/ops.rs @@ -0,0 +1,73 @@ +use sdset::multi::OpBuilder as SdOpBuilder; +use sdset::Set; + +use crate::blob::NegativeBlob; +use crate::data::DocIds; +use crate::DocumentId; + +pub struct OpBuilder<'a> { + inner: SdOpBuilder<'a, DocumentId>, +} + +/// Do a set operation on multiple negative blobs. +impl<'a> OpBuilder<'a> { + pub fn new() -> Self { + Self { inner: SdOpBuilder::new() } + } + + pub fn with_capacity(cap: usize) -> Self { + Self { inner: SdOpBuilder::with_capacity(cap) } + } + + pub fn add(mut self, blob: &'a NegativeBlob) -> Self { + self.push(blob); + self + } + + pub fn push(&mut self, blob: &'a NegativeBlob) { + let set = Set::new_unchecked(blob.as_ref()); + self.inner.push(set); + } + + pub fn union(self) -> Union<'a> { + Union::new(self.inner.union()) + } + + pub fn intersection(self) -> Intersection<'a> { + Intersection::new(self.inner.intersection()) + } + + pub fn difference(self) -> Difference<'a> { + Difference::new(self.inner.difference()) + } + + pub fn symmetric_difference(self) -> SymmetricDifference<'a> { + SymmetricDifference::new(self.inner.symmetric_difference()) + } +} + +macro_rules! logical_operation { + (struct $name:ident, $operation:ident) => { + +pub struct $name<'a> { + op: sdset::multi::$name<'a, DocumentId>, +} + +impl<'a> $name<'a> { + fn new(op: sdset::multi::$name<'a, DocumentId>) -> Self { + $name { op } + } + + pub fn into_negative_blob(self) -> NegativeBlob { + let document_ids = sdset::SetOperation::into_set_buf(self.op); + let doc_ids = DocIds::from_document_ids(document_ids.into_vec()); + NegativeBlob::from_raw(doc_ids) + } +} + +}} + +logical_operation!(struct Union, union); +logical_operation!(struct Intersection, intersection); +logical_operation!(struct Difference, difference); +logical_operation!(struct SymmetricDifference, symmetric_difference); diff --git a/src/blob/ops.rs b/src/blob/ops.rs index f4d4fa1da..4ab0ef564 100644 --- a/src/blob/ops.rs +++ b/src/blob/ops.rs @@ -1,334 +1,109 @@ -use std::collections::BTreeMap; +use std::error::Error; -use fst::{map, Streamer, Automaton}; -use fst::automaton::AlwaysMatch; -use sdset::multi::OpBuilder as SdOpBuilder; -use sdset::{SetOperation, Set}; +use fst::{IntoStreamer, Streamer}; +use group_by::GroupBy; +use itertools::{Itertools, Either}; +use sdset::duo::DifferenceByKey; +use sdset::{Set, SetOperation}; -use crate::blob::ops_indexed_value::{ - OpIndexedValueBuilder, UnionIndexedValue, -}; -use crate::blob::Blob; -use crate::data::DocIndexes; -use crate::vec_read_only::VecReadOnly; -use crate::DocIndex; +use crate::blob::{Blob, Sign, PositiveBlob, RawPositiveBlobBuilder, NegativeBlob}; +use crate::blob::{positive, negative}; -pub struct OpBuilder<'m, A: Automaton> { - // the operation on the maps is always an union. - maps: OpIndexedValueBuilder<'m>, - automatons: Vec, - indexes: Vec<&'m DocIndexes>, +fn blob_same_sign(a: &Blob, b: &Blob) -> bool { + a.sign() == b.sign() } -impl<'m> OpBuilder<'m, AlwaysMatch> { - pub fn new() -> Self { - Self { - maps: OpIndexedValueBuilder::new(), - automatons: vec![AlwaysMatch], - indexes: Vec::new(), - } +fn unwrap_positive(blob: &Blob) -> &PositiveBlob { + match blob { + Blob::Positive(blob) => blob, + Blob::Negative(_) => panic!("called `Blob::unwrap_positive()` on a `Negative` value"), } } -/// Do a set operation on multiple maps with the same automatons. -impl<'m, A: 'm + Automaton> OpBuilder<'m, A> { - pub fn with_automatons(automatons: Vec) -> Self { - Self { - maps: OpIndexedValueBuilder::new(), - automatons: automatons, - indexes: Vec::new(), - } - } - - pub fn add(mut self, blob: &'m Blob) -> Self - where A: Clone - { - self.push(blob); - self - } - - pub fn push(&mut self, blob: &'m Blob) - where A: Clone - { - match blob { - Blob::Positive(blob) => { - let mut op = map::OpBuilder::new(); - for automaton in self.automatons.iter().cloned() { - let stream = blob.as_map().search(automaton); - op.push(stream); - } - - let stream = op.union(); - let indexes = blob.as_indexes(); - - self.maps.push(stream); - self.indexes.push(indexes); - }, - Blob::Negative(blob) => { - unimplemented!() - }, - } - } - - pub fn union(self) -> Union<'m> { - Union::new(self.maps, self.indexes, self.automatons.len()) - } - - pub fn intersection(self) -> Intersection<'m> { - Intersection::new(self.maps, self.indexes, self.automatons.len()) - } - - pub fn difference(self) -> Difference<'m> { - Difference::new(self.maps, self.indexes, self.automatons.len()) - } - - pub fn symmetric_difference(self) -> SymmetricDifference<'m> { - SymmetricDifference::new(self.maps, self.indexes, self.automatons.len()) +fn unwrap_negative(blob: &Blob) -> &NegativeBlob { + match blob { + Blob::Negative(blob) => blob, + Blob::Positive(_) => panic!("called `Blob::unwrap_negative()` on a `Positive` value"), } } -#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct IndexedDocIndexes { - pub index: usize, - pub doc_indexes: VecReadOnly, +pub struct OpBuilder { + blobs: Vec, } -struct SlotIndexedDocIndexes { - index: usize, - start: usize, - len: usize, -} - -macro_rules! logical_operation { - (struct $name:ident, $operation:ident) => { - -pub struct $name<'m> { - maps: UnionIndexedValue<'m>, - indexes: Vec<&'m DocIndexes>, - number_automatons: usize, - outs: Vec, -} - -impl<'m> $name<'m> { - fn new(maps: OpIndexedValueBuilder<'m>, indexes: Vec<&'m DocIndexes>, number_automatons: usize) -> Self { - $name { - maps: maps.union(), - indexes: indexes, - number_automatons: number_automatons, - outs: Vec::new(), - } +impl OpBuilder { + pub fn new() -> OpBuilder { + OpBuilder { blobs: Vec::new() } } -} -impl<'m, 'a> fst::Streamer<'a> for $name<'m> { - type Item = (&'a [u8], &'a [IndexedDocIndexes]); + pub fn with_capacity(cap: usize) -> OpBuilder { + OpBuilder { blobs: Vec::with_capacity(cap) } + } - fn next(&'a mut self) -> Option { - match self.maps.next() { - Some((input, ivalues)) => { - self.outs.clear(); + pub fn push(&mut self, blob: Blob) { + if self.blobs.is_empty() && blob.is_negative() { return } + self.blobs.push(blob); + } - let mut builders = vec![BTreeMap::new(); self.number_automatons]; - for iv in ivalues { - let builder = &mut builders[iv.aut_index]; - builder.insert(iv.rdr_index, iv.value); - } - - let mut doc_indexes = Vec::new(); - let mut doc_indexes_slots = Vec::with_capacity(builders.len()); - for (aut_index, values) in builders.into_iter().enumerate() { - let mut builder = SdOpBuilder::with_capacity(values.len()); - for (rdr_index, value) in values { - let indexes = self.indexes[rdr_index].get(value).expect("could not find indexes"); - let indexes = Set::new_unchecked(indexes); - builder.push(indexes); + pub fn merge(self) -> Result> { + let groups = GroupBy::new(&self.blobs, blob_same_sign); + let (positives, negatives): (Vec<_>, Vec<_>) = groups.partition_map(|blobs| { + match blobs[0].sign() { + Sign::Positive => { + let mut op_builder = positive::OpBuilder::with_capacity(blobs.len()); + for blob in blobs { + op_builder.push(unwrap_positive(blob)); } - let start = doc_indexes.len(); - builder.$operation().extend_vec(&mut doc_indexes); - let len = doc_indexes.len() - start; - if len != 0 { - let slot = SlotIndexedDocIndexes { - index: aut_index, - start: start, - len: len, - }; - doc_indexes_slots.push(slot); + let mut stream = op_builder.union().into_stream(); + let mut builder = RawPositiveBlobBuilder::memory(); + + while let Some((input, doc_indexes)) = stream.next() { + // FIXME empty doc_indexes must be handled by OpBuilder + if !doc_indexes.is_empty() { + builder.insert(input, doc_indexes).unwrap(); + } } - } - let read_only = VecReadOnly::new(doc_indexes); - self.outs.reserve(doc_indexes_slots.len()); - for slot in doc_indexes_slots { - let indexes = IndexedDocIndexes { - index: slot.index, - doc_indexes: read_only.range(slot.start, slot.len), - }; - self.outs.push(indexes); - } + let (map, doc_indexes) = builder.into_inner().unwrap(); + let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap(); + Either::Left(blob) + }, + Sign::Negative => { + let mut op_builder = negative::OpBuilder::with_capacity(blobs.len()); + for blob in blobs { + op_builder.push(unwrap_negative(blob)); + } - if self.outs.is_empty() { return None } - Some((input, &self.outs)) - }, - None => None, - } - } -} -}} - -logical_operation!(struct Union, union); -logical_operation!(struct Intersection, intersection); -logical_operation!(struct Difference, difference); -logical_operation!(struct SymmetricDifference, symmetric_difference); - -#[cfg(test)] -mod tests { - use super::*; - use crate::blob::PositiveBlobBuilder; - - fn get_exact_key<'m, I, S>(stream: I, key: &[u8]) -> Option> - where - I: for<'a> fst::IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [IndexedDocIndexes])>, - S: 'm + for<'a> fst::Streamer<'a, Item=(&'a [u8], &'a [IndexedDocIndexes])>, - { - let mut stream = stream.into_stream(); - while let Some((string, indexes)) = stream.next() { - if string == key { - return Some(indexes[0].doc_indexes.clone()) + let blob = op_builder.union().into_negative_blob(); + Either::Right(blob) + }, } - } - None - } + }); - #[test] - fn union_two_blobs() { - let doc1 = DocIndex { document_id: 12, attribute: 1, attribute_index: 22 }; - let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 }; + let mut zipped = positives.into_iter().zip(negatives); + let mut buffer = Vec::new(); - let meta1 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); + zipped.try_fold(PositiveBlob::default(), |base, (positive, negative)| { + let mut builder = RawPositiveBlobBuilder::memory(); + let doc_ids = Set::new_unchecked(negative.as_ref()); - builder.insert("chameau", doc1); + let op_builder = positive::OpBuilder::new().add(&base).add(&positive); + let mut stream = op_builder.union().into_stream(); - Blob::Positive(builder.build().unwrap()) - }; + while let Some((input, doc_indexes)) = stream.next() { + let doc_indexes = Set::new_unchecked(doc_indexes); + let op = DifferenceByKey::new(doc_indexes, doc_ids, |x| x.document_id, |x| *x); - let meta2 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); + buffer.clear(); + op.extend_vec(&mut buffer); + if !buffer.is_empty() { + builder.insert(input, &buffer)?; + } + } - builder.insert("chameau", doc2); - - Blob::Positive(builder.build().unwrap()) - }; - - let metas = OpBuilder::new().add(&meta1).add(&meta2).union(); - let value = get_exact_key(metas, b"chameau"); - - assert_eq!(&*value.unwrap(), &[doc1, doc2][..]); - } - - #[test] - fn intersection_two_blobs() { - let doc1 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 }; - let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 }; - - let meta1 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); - - builder.insert("chameau", doc1); - - Blob::Positive(builder.build().unwrap()) - }; - - let meta2 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); - - builder.insert("chameau", doc2); - - Blob::Positive(builder.build().unwrap()) - }; - - let metas = OpBuilder::new().add(&meta1).add(&meta2).intersection(); - let value = get_exact_key(metas, b"chameau"); - - assert_eq!(&*value.unwrap(), &[doc1][..]); - } - - #[test] - fn difference_two_blobs() { - let doc1 = DocIndex { document_id: 12, attribute: 1, attribute_index: 22 }; - let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 }; - let doc3 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 }; - - let meta1 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); - - builder.insert("chameau", doc1); - builder.insert("chameau", doc2); - - Blob::Positive(builder.build().unwrap()) - }; - - let meta2 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); - - builder.insert("chameau", doc3); - - Blob::Positive(builder.build().unwrap()) - }; - - let metas = OpBuilder::new().add(&meta1).add(&meta2).difference(); - let value = get_exact_key(metas, b"chameau"); - - assert_eq!(&*value.unwrap(), &[doc1][..]); - } - - #[test] - fn symmetric_difference_two_blobs() { - let doc1 = DocIndex { document_id: 12, attribute: 1, attribute_index: 22 }; - let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 }; - let doc3 = DocIndex { document_id: 32, attribute: 0, attribute_index: 1 }; - let doc4 = DocIndex { document_id: 34, attribute: 12, attribute_index: 1 }; - - let meta1 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); - - builder.insert("chameau", doc1); - builder.insert("chameau", doc2); - builder.insert("chameau", doc3); - - Blob::Positive(builder.build().unwrap()) - }; - - let meta2 = { - let mapw = Vec::new(); - let indexesw = Vec::new(); - let mut builder = PositiveBlobBuilder::new(mapw, indexesw); - - builder.insert("chameau", doc2); - builder.insert("chameau", doc3); - builder.insert("chameau", doc4); - - Blob::Positive(builder.build().unwrap()) - }; - - let metas = OpBuilder::new().add(&meta1).add(&meta2).symmetric_difference(); - let value = get_exact_key(metas, b"chameau"); - - assert_eq!(&*value.unwrap(), &[doc1, doc4][..]); + let (map, doc_indexes) = builder.into_inner()?; + PositiveBlob::from_bytes(map, doc_indexes) + }) } } diff --git a/src/blob/ops_indexed_value.rs b/src/blob/ops_indexed_value.rs deleted file mode 100644 index 2c557f61c..000000000 --- a/src/blob/ops_indexed_value.rs +++ /dev/null @@ -1,203 +0,0 @@ -use std::collections::BinaryHeap; -use std::rc::Rc; -use std::cmp; -use fst::raw::{self, Output}; -use fst::{self, IntoStreamer, Streamer}; - -type BoxedStream<'f> = Box Streamer<'a, Item=(&'a [u8], &'a [raw::IndexedValue])> + 'f>; - -pub struct OpIndexedValueBuilder<'f> { - streams: Vec>, -} - -impl<'f> OpIndexedValueBuilder<'f> { - pub fn new() -> Self { - Self { streams: Vec::new() } - } - - pub fn push(&mut self, stream: I) - where - I: for<'a> IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [raw::IndexedValue])>, - S: 'f + for<'a> Streamer<'a, Item=(&'a [u8], &'a [raw::IndexedValue])>, - { - self.streams.push(Box::new(stream.into_stream())); - } - - pub fn union(self) -> UnionIndexedValue<'f> { - UnionIndexedValue { - heap: StreamIndexedValueHeap::new(self.streams), - outs: Vec::new(), - cur_slot: None, - } - } -} - -pub struct UnionIndexedValue<'f> { - heap: StreamIndexedValueHeap<'f>, - outs: Vec, - cur_slot: Option, -} - -impl<'f> UnionIndexedValue<'f> { - pub fn len(&self) -> usize { - self.heap.num_slots() - } -} - -impl<'a, 'm> fst::Streamer<'a> for UnionIndexedValue<'m> { - type Item = (&'a [u8], &'a [IndexedValue]); - - fn next(&'a mut self) -> Option { - if let Some(slot) = self.cur_slot.take() { - self.heap.refill(slot); - } - let slot = match self.heap.pop() { - None => return None, - Some(slot) => { - self.cur_slot = Some(slot); - self.cur_slot.as_mut().unwrap() - } - }; - self.outs.clear(); - self.outs.push(slot.indexed_value()); - while let Some(slot2) = self.heap.pop_if_equal(slot.input()) { - self.outs.push(slot2.indexed_value()); - self.heap.refill(slot2); - } - Some((slot.input(), &self.outs)) - } -} - -struct StreamIndexedValueHeap<'f> { - rdrs: Vec>, - heap: BinaryHeap, -} - -impl<'f> StreamIndexedValueHeap<'f> { - fn new(streams: Vec>) -> StreamIndexedValueHeap<'f> { - let mut u = StreamIndexedValueHeap { - rdrs: streams, - heap: BinaryHeap::new(), - }; - for i in 0..u.rdrs.len() { - u.refill(SlotIndexedValue::new(i)); - } - u - } - - fn pop(&mut self) -> Option { - self.heap.pop() - } - - fn peek_is_duplicate(&self, key: &[u8]) -> bool { - self.heap.peek().map(|s| s.input() == key).unwrap_or(false) - } - - fn pop_if_equal(&mut self, key: &[u8]) -> Option { - if self.peek_is_duplicate(key) { - self.pop() - } else { - None - } - } - - fn pop_if_le(&mut self, key: &[u8]) -> Option { - if self.heap.peek().map(|s| s.input() <= key).unwrap_or(false) { - self.pop() - } else { - None - } - } - - fn num_slots(&self) -> usize { - self.rdrs.len() - } - - fn refill(&mut self, mut slot: SlotIndexedValue) { - if let Some((input, ivalues)) = self.rdrs[slot.rdr_index].next() { - slot.set_input(input); - for values in ivalues { - slot.set_aut_index(values.index); - slot.set_output(values.value); - self.heap.push(slot.clone()); - } - } - } -} - -#[derive(Debug, Clone)] -struct SlotIndexedValue { - rdr_index: usize, - aut_index: usize, - input: Rc>, - output: Output, -} - -#[derive(Debug)] -pub struct IndexedValue { - pub rdr_index: usize, - pub aut_index: usize, - pub value: u64, -} - -impl PartialEq for SlotIndexedValue { - fn eq(&self, other: &Self) -> bool { - (&self.input, self.rdr_index, self.aut_index, self.output) - .eq(&(&other.input, other.rdr_index, other.aut_index, other.output)) - } -} - -impl Eq for SlotIndexedValue { } - -impl PartialOrd for SlotIndexedValue { - fn partial_cmp(&self, other: &Self) -> Option { - (&self.input, self.rdr_index, self.aut_index, self.output) - .partial_cmp(&(&other.input, other.rdr_index, other.aut_index, other.output)) - .map(|ord| ord.reverse()) - } -} - -impl Ord for SlotIndexedValue { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.partial_cmp(other).unwrap() - } -} - -impl SlotIndexedValue { - fn new(rdr_index: usize) -> SlotIndexedValue { - SlotIndexedValue { - rdr_index: rdr_index, - aut_index: 0, - input: Rc::new(Vec::with_capacity(64)), - output: Output::zero(), - } - } - - fn indexed_value(&self) -> IndexedValue { - IndexedValue { - rdr_index: self.rdr_index, - aut_index: self.aut_index, - value: self.output.value(), - } - } - - fn input(&self) -> &[u8] { - &self.input - } - - fn set_aut_index(&mut self, aut_index: usize) { - self.aut_index = aut_index; - } - - fn set_input(&mut self, input: &[u8]) { - if *self.input != input { - let inner = Rc::make_mut(&mut self.input); - inner.clear(); - inner.extend(input); - } - } - - fn set_output(&mut self, output: u64) { - self.output = Output::new(output); - } -} diff --git a/src/blob/positive_blob.rs b/src/blob/positive/blob.rs similarity index 63% rename from src/blob/positive_blob.rs rename to src/blob/positive/blob.rs index baf71df5f..7cbcf47ab 100644 --- a/src/blob/positive_blob.rs +++ b/src/blob/positive/blob.rs @@ -3,10 +3,10 @@ use std::io::Write; use std::path::Path; use std::error::Error; -use fst::{Map, MapBuilder}; +use fst::{map, Map, Streamer, IntoStreamer}; use crate::DocIndex; -use crate::data::{DocIndexes, DocIndexesBuilder}; +use crate::data::{DocIndexes, RawDocIndexesBuilder, DocIndexesBuilder}; use serde::ser::{Serialize, Serializer, SerializeTuple}; use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor}; @@ -53,6 +53,40 @@ impl PositiveBlob { } } +impl<'m, 'a> IntoStreamer<'a> for &'m PositiveBlob { + type Item = (&'a [u8], &'a [DocIndex]); + /// The type of the stream to be constructed. + type Into = PositiveBlobStream<'m>; + + /// Construct a stream from `Self`. + fn into_stream(self) -> Self::Into { + PositiveBlobStream { + map_stream: self.map.into_stream(), + doc_indexes: &self.indexes, + } + } +} + +pub struct PositiveBlobStream<'m> { + map_stream: map::Stream<'m>, + doc_indexes: &'m DocIndexes, +} + +impl<'m, 'a> Streamer<'a> for PositiveBlobStream<'m> { + type Item = (&'a [u8], &'a [DocIndex]); + + fn next(&'a mut self) -> Option { + match self.map_stream.next() { + Some((input, index)) => { + let doc_indexes = self.doc_indexes.get(index); + let doc_indexes = doc_indexes.expect("BUG: could not find document indexes"); + Some((input, doc_indexes)) + }, + None => None, + } + } +} + impl Serialize for PositiveBlob { fn serialize(&self, serializer: S) -> Result { let mut tuple = serializer.serialize_tuple(2)?; @@ -99,6 +133,50 @@ impl<'de> Deserialize<'de> for PositiveBlob { } } +pub struct RawPositiveBlobBuilder { + map: fst::MapBuilder, + indexes: RawDocIndexesBuilder, + value: u64, +} + +impl RawPositiveBlobBuilder, Vec> { + pub fn memory() -> Self { + RawPositiveBlobBuilder { + map: fst::MapBuilder::memory(), + indexes: RawDocIndexesBuilder::memory(), + value: 0, + } + } +} + +impl RawPositiveBlobBuilder { + pub fn new(map: W, indexes: X) -> Result> { + Ok(RawPositiveBlobBuilder { + map: fst::MapBuilder::new(map)?, + indexes: RawDocIndexesBuilder::new(indexes), + value: 0, + }) + } + + // FIXME what if one write doesn't work but the other do ? + pub fn insert(&mut self, key: &[u8], doc_indexes: &[DocIndex]) -> Result<(), Box> { + self.map.insert(key, self.value)?; + self.indexes.insert(doc_indexes)?; + self.value += 1; + Ok(()) + } + + pub fn finish(self) -> Result<(), Box> { + self.into_inner().map(drop) + } + + pub fn into_inner(self) -> Result<(W, X), Box> { + let map = self.map.into_inner()?; + let indexes = self.indexes.into_inner()?; + Ok((map, indexes)) + } +} + pub struct PositiveBlobBuilder { map: W, indexes: DocIndexesBuilder, @@ -122,7 +200,7 @@ impl PositiveBlobBuilder { // of the input is the same as the machine that is reading it. let map = { - let mut keys_builder = MapBuilder::new(self.map)?; + let mut keys_builder = fst::MapBuilder::new(self.map)?; let keys = self.indexes.keys().map(|(s, v)| (s, *v)); keys_builder.extend_iter(keys)?; keys_builder.into_inner()? diff --git a/src/blob/positive/mod.rs b/src/blob/positive/mod.rs new file mode 100644 index 000000000..be895b80b --- /dev/null +++ b/src/blob/positive/mod.rs @@ -0,0 +1,5 @@ +mod blob; +mod ops; + +pub use self::blob::{PositiveBlob, RawPositiveBlobBuilder, PositiveBlobBuilder}; +pub use self::ops::OpBuilder; diff --git a/src/blob/positive/ops.rs b/src/blob/positive/ops.rs new file mode 100644 index 000000000..78ae7adbb --- /dev/null +++ b/src/blob/positive/ops.rs @@ -0,0 +1,128 @@ +use sdset::multi::OpBuilder as SdOpBuilder; +use sdset::{SetOperation, Set}; + +use crate::blob::PositiveBlob; +use crate::data::DocIndexes; +use crate::DocIndex; + +pub struct OpBuilder<'m> { + // the operation on the maps is always an union. + map_op: fst::map::OpBuilder<'m>, + indexes: Vec<&'m DocIndexes>, +} + +/// Do a set operation on multiple positive blobs. +impl<'m> OpBuilder<'m> { + pub fn new() -> Self { + Self { + map_op: fst::map::OpBuilder::new(), + indexes: Vec::new(), + } + } + + pub fn with_capacity(cap: usize) -> Self { + Self { + map_op: fst::map::OpBuilder::new(), // TODO patch fst to add with_capacity + indexes: Vec::with_capacity(cap), + } + } + + pub fn add(mut self, blob: &'m PositiveBlob) -> Self { + self.push(blob); + self + } + + pub fn push(&mut self, blob: &'m PositiveBlob) { + self.map_op.push(blob.as_map()); + self.indexes.push(blob.as_indexes()); + } + + pub fn union(self) -> Union<'m> { + Union::new(self.map_op.union(), self.indexes) + } + + pub fn intersection(self) -> Intersection<'m> { + Intersection::new(self.map_op.union(), self.indexes) + } + + pub fn difference(self) -> Difference<'m> { + Difference::new(self.map_op.union(), self.indexes) + } + + pub fn symmetric_difference(self) -> SymmetricDifference<'m> { + SymmetricDifference::new(self.map_op.union(), self.indexes) + } +} + +macro_rules! logical_operation { + (struct $name:ident, $operation:ident) => { + +pub struct $name<'m> { + stream: fst::map::Union<'m>, + indexes: Vec<&'m DocIndexes>, + outs: Vec, +} + +impl<'m> $name<'m> { + fn new(stream: fst::map::Union<'m>, indexes: Vec<&'m DocIndexes>) -> Self { + $name { + stream: stream, + indexes: indexes, + outs: Vec::new(), + } + } +} + +impl<'m, 'a> fst::Streamer<'a> for $name<'m> { + type Item = (&'a [u8], &'a [DocIndex]); + + fn next(&'a mut self) -> Option { + // loop { + // let (input, ivalues) = match self.stream.next() { + // Some(value) => value, + // None => return None, + // }; + + // self.outs.clear(); + + // let mut builder = SdOpBuilder::with_capacity(ivalues.len()); + // for ivalue in ivalues { + // let indexes = self.indexes[ivalue.index]; + // let indexes = indexes.get(ivalue.value).expect("BUG: could not find document indexes"); + // let set = Set::new_unchecked(indexes); + // builder.push(set); + // } + + // builder.$operation().extend_vec(&mut self.outs); + + // if self.outs.is_empty() { continue } + // return Some((input, &self.outs)) + // } + + // FIXME make the above code compile + match self.stream.next() { + Some((input, ivalues)) => { + self.outs.clear(); + + let mut builder = SdOpBuilder::with_capacity(ivalues.len()); + for ivalue in ivalues { + let indexes = self.indexes[ivalue.index].get(ivalue.value).expect(""); + let set = Set::new_unchecked(indexes); + builder.push(set); + } + + builder.$operation().extend_vec(&mut self.outs); + + if self.outs.is_empty() { return None } + return Some((input, &self.outs)) + }, + None => None + } + } +} +}} + +logical_operation!(struct Union, union); +logical_operation!(struct Intersection, intersection); +logical_operation!(struct Difference, difference); +logical_operation!(struct SymmetricDifference, symmetric_difference); diff --git a/src/data/doc_ids.rs b/src/data/doc_ids.rs index afc476e92..f93fe495e 100644 --- a/src/data/doc_ids.rs +++ b/src/data/doc_ids.rs @@ -28,7 +28,7 @@ impl DocIds { // FIXME check if modulo DocumentId let len = vec.len(); let data = Data::Shared { - vec: Arc::new(vec), + bytes: Arc::new(vec), offset: 0, len: len }; diff --git a/src/data/doc_indexes.rs b/src/data/doc_indexes.rs index 82eb3a86b..28c3bde46 100644 --- a/src/data/doc_indexes.rs +++ b/src/data/doc_indexes.rs @@ -2,7 +2,6 @@ use std::collections::btree_map::{BTreeMap, Iter, Entry}; use std::slice::from_raw_parts; use std::io::{self, Write}; use std::path::Path; -use std::ops::Deref; use std::sync::Arc; use std::mem; @@ -28,38 +27,28 @@ pub struct DocIndexes { impl DocIndexes { pub unsafe fn from_path>(path: P) -> io::Result { let mmap = MmapReadOnly::open_path(path)?; - - let ranges_len_offset = mmap.as_slice().len() - mem::size_of::(); - let ranges_len = (&mmap.as_slice()[ranges_len_offset..]).read_u64::()?; - let ranges_len = ranges_len as usize * mem::size_of::(); - - let ranges_offset = ranges_len_offset - ranges_len; - let ranges = Data::Mmap(mmap.range(ranges_offset, ranges_len)); - - let indexes = Data::Mmap(mmap.range(0, ranges_offset)); - - Ok(DocIndexes { ranges, indexes }) + DocIndexes::from_data(Data::Mmap(mmap)) } pub fn from_bytes(vec: Vec) -> io::Result { - let vec = Arc::new(vec); + let len = vec.len(); + DocIndexes::from_shared_bytes(Arc::new(vec), 0, len) + } - let ranges_len_offset = vec.len() - mem::size_of::(); - let ranges_len = (&vec[ranges_len_offset..]).read_u64::()?; + pub fn from_shared_bytes(bytes: Arc>, offset: usize, len: usize) -> io::Result { + let data = Data::Shared { bytes, offset, len }; + DocIndexes::from_data(data) + } + + fn from_data(data: Data) -> io::Result { + let ranges_len_offset = data.len() - mem::size_of::(); + let ranges_len = (&data[ranges_len_offset..]).read_u64::()?; let ranges_len = ranges_len as usize * mem::size_of::(); let ranges_offset = ranges_len_offset - ranges_len; - let ranges = Data::Shared { - vec: vec.clone(), - offset: ranges_offset, - len: ranges_len, - }; + let ranges = data.range(ranges_offset, ranges_len); - let indexes = Data::Shared { - vec: vec, - offset: 0, - len: ranges_offset, - }; + let indexes = data.range(0, ranges_offset); Ok(DocIndexes { ranges, indexes }) } diff --git a/src/data/mod.rs b/src/data/mod.rs index 8c0329bf4..4cd21b65e 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -12,17 +12,33 @@ pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder, RawDocIndexesBuilder} #[derive(Clone)] enum Data { Shared { - vec: Arc>, + bytes: Arc>, offset: usize, len: usize, }, Mmap(MmapReadOnly), } +impl Data { + pub fn range(&self, off: usize, l: usize) -> Data { + match self { + Data::Shared { bytes, offset, len } => { + assert!(off + l <= *len); + Data::Shared { + bytes: bytes.clone(), + offset: offset + off, + len: l, + } + }, + Data::Mmap(mmap) => Data::Mmap(mmap.range(off, l)), + } + } +} + impl Default for Data { fn default() -> Data { Data::Shared { - vec: Arc::default(), + bytes: Arc::default(), offset: 0, len: 0, } @@ -40,8 +56,8 @@ impl Deref for Data { impl AsRef<[u8]> for Data { fn as_ref(&self) -> &[u8] { match self { - Data::Shared { vec, offset, len } => { - &vec[*offset..offset + len] + Data::Shared { bytes, offset, len } => { + &bytes[*offset..offset + len] }, Data::Mmap(m) => m.as_slice(), } diff --git a/src/index/identifier.rs b/src/index/identifier.rs deleted file mode 100644 index ee36ab314..000000000 --- a/src/index/identifier.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::io::Write; - -use byteorder::{NetworkEndian, WriteBytesExt}; - -use crate::index::schema::SchemaAttr; -use crate::DocumentId; - -pub struct Identifier { - inner: Vec, -} - -impl Identifier { - pub fn data() -> Data { - let mut inner = Vec::new(); - let _ = inner.write(b"data"); - Data { inner } - } - - pub fn document(id: DocumentId) -> Document { - let mut inner = Vec::new(); - let _ = inner.write(b"docu"); - let _ = inner.write(b"-"); - let _ = inner.write_u64::(id); - Document { inner } - } -} - -pub struct Data { - inner: Vec, -} - -impl Data { - pub fn index(mut self) -> Self { - let _ = self.inner.write(b"-"); - let _ = self.inner.write(b"index"); - self - } - - pub fn schema(mut self) -> Self { - let _ = self.inner.write(b"-"); - let _ = self.inner.write(b"schema"); - self - } - - pub fn build(self) -> Vec { - self.inner - } -} - -pub struct Document { - inner: Vec, -} - -impl Document { - pub fn attribute(mut self, attr: SchemaAttr) -> Self { - let _ = self.inner.write(b"-"); - let _ = self.inner.write_u32::(attr.as_u32()); - self - } - - pub fn build(self) -> Vec { - self.inner - } -} diff --git a/src/index/mod.rs b/src/index/mod.rs index 842d6fb58..5fa036900 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,4 +1,3 @@ -pub mod identifier; pub mod schema; pub mod update; @@ -15,13 +14,10 @@ use ::rocksdb::{rocksdb, rocksdb_options}; use ::rocksdb::merge_operator::MergeOperands; use crate::DocIndex; -use crate::automaton; use crate::rank::Document; use crate::index::schema::Schema; use crate::index::update::Update; -use crate::tokenizer::TokenizerBuilder; -use crate::index::identifier::Identifier; -use crate::rank::{criterion, Config, RankedStream}; +use crate::rank::QueryBuilder; use crate::data::{DocIds, DocIndexes, RawDocIndexesBuilder}; use crate::blob::{PositiveBlob, NegativeBlob, Blob}; @@ -188,8 +184,7 @@ impl Index { let mut schema_bytes = Vec::new(); schema.write_to(&mut schema_bytes)?; - let data_key = Identifier::data().schema().build(); - database.put(&data_key, &schema_bytes)?; + database.put(b"data-schema", &schema_bytes)?; Ok(Self { database }) } @@ -205,8 +200,7 @@ impl Index { let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?; - let data_key = Identifier::data().schema().build(); - let _schema = match database.get(&data_key)? { + let _schema = match database.get(b"data-schema")? { Some(value) => Schema::read_from(&*value)?, None => return Err(String::from("Database does not contain a schema").into()), }; @@ -228,8 +222,7 @@ impl Index { } pub fn schema(&self) -> Result> { - let data_key = Identifier::data().schema().build(); - let bytes = self.database.get(&data_key)?.expect("data-schema entry not found"); + let bytes = self.database.get(b"data-schema")?.expect("data-schema entry not found"); Ok(Schema::read_from(&*bytes).expect("Invalid schema")) } @@ -237,26 +230,10 @@ impl Index { // this snapshot will allow consistent reads for the whole search operation let snapshot = self.database.snapshot(); - let index_key = Identifier::data().index().build(); - let blob = match snapshot.get(&index_key)? { - Some(value) => bincode::deserialize(&value)?, - None => PositiveBlob::default(), - }; + let builder = QueryBuilder::new(snapshot)?; + let documents = builder.query(query, 0..20); - let mut automatons = Vec::new(); - for query in query.split_whitespace().map(str::to_lowercase) { - let lev = automaton::build_prefix_dfa(&query); - automatons.push(lev); - } - - let config = Config { - blob: blob, - automatons: automatons, - criteria: criterion::default(), - distinct: ((), 1), - }; - - Ok(RankedStream::new(config).retrieve_documents(0..20)) + Ok(documents) } } diff --git a/src/index/update/mod.rs b/src/index/update/mod.rs index a53c44852..3c3601f26 100644 --- a/src/index/update/mod.rs +++ b/src/index/update/mod.rs @@ -1,13 +1,11 @@ use std::path::PathBuf; use std::error::Error; -use crate::blob::{BlobName, Sign}; - mod negative_update; mod positive_update; -pub use self::negative_update::{NegativeUpdateBuilder}; pub use self::positive_update::{PositiveUpdateBuilder, NewState}; +pub use self::negative_update::NegativeUpdateBuilder; pub struct Update { path: PathBuf, @@ -21,14 +19,4 @@ impl Update { pub fn into_path_buf(self) -> PathBuf { self.path } - - pub fn info(&self) -> UpdateInfo { - unimplemented!() - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct UpdateInfo { - pub sign: Sign, - pub id: BlobName, } diff --git a/src/index/update/negative_update.rs b/src/index/update/negative_update.rs index ef5d61c05..dc2ea5d7e 100644 --- a/src/index/update/negative_update.rs +++ b/src/index/update/negative_update.rs @@ -3,10 +3,8 @@ use std::error::Error; use ::rocksdb::rocksdb_options; -use crate::blob::BlobInfo; use crate::index::update::Update; -use crate::index::identifier::Identifier; -use crate::data::{DocIds, DocIdsBuilder}; +use crate::data::DocIdsBuilder; use crate::DocumentId; pub struct NegativeUpdateBuilder { @@ -27,34 +25,34 @@ impl NegativeUpdateBuilder { } pub fn build(self) -> Result> { - let blob_info = BlobInfo::new_negative(); - let env_options = rocksdb_options::EnvOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); file_writer.open(&self.path.to_string_lossy())?; - // write the doc ids - let blob_key = Identifier::blob(blob_info.name).document_ids().build(); - let blob_doc_ids = self.doc_ids.into_inner()?; - file_writer.put(&blob_key, &blob_doc_ids)?; + // // write the doc ids + // let blob_key = Identifier::blob(blob_info.name).document_ids().build(); + // let blob_doc_ids = self.doc_ids.into_inner()?; + // file_writer.put(&blob_key, &blob_doc_ids)?; - { - // write the blob name to be merged - let mut buffer = Vec::new(); - blob_info.write_into(&mut buffer); - let data_key = Identifier::data().blobs_order().build(); - file_writer.merge(&data_key, &buffer)?; - } + // { + // // write the blob name to be merged + // let mut buffer = Vec::new(); + // blob_info.write_into(&mut buffer); + // let data_key = Identifier::data().blobs_order().build(); + // file_writer.merge(&data_key, &buffer)?; + // } - let blob_doc_ids = DocIds::from_bytes(blob_doc_ids)?; - for id in blob_doc_ids.doc_ids().iter().cloned() { - let start = Identifier::document(id).build(); - let end = Identifier::document(id + 1).build(); - file_writer.delete_range(&start, &end)?; - } + // let blob_doc_ids = DocIds::from_bytes(blob_doc_ids)?; + // for id in blob_doc_ids.doc_ids().iter().cloned() { + // let start = Identifier::document(id).build(); + // let end = Identifier::document(id + 1).build(); + // file_writer.delete_range(&start, &end)?; + // } - file_writer.finish()?; - Update::open(self.path) + // file_writer.finish()?; + // Update::open(self.path) + + unimplemented!() } } diff --git a/src/index/update/positive_update.rs b/src/index/update/positive_update.rs index 2825406c1..b2b219e55 100644 --- a/src/index/update/positive_update.rs +++ b/src/index/update/positive_update.rs @@ -5,11 +5,9 @@ use std::error::Error; use ::rocksdb::rocksdb_options; use crate::index::update::Update; -use crate::index::identifier::Identifier; use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; use crate::tokenizer::TokenizerBuilder; -use crate::blob::PositiveBlobBuilder; -use crate::{DocIndex, DocumentId}; +use crate::DocumentId; pub enum NewState { Updated { @@ -51,69 +49,69 @@ impl PositiveUpdateBuilder where B: TokenizerBuilder { pub fn build(self) -> Result> { - let blob_info = BlobInfo::new_positive(); - let env_options = rocksdb_options::EnvOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); file_writer.open(&self.path.to_string_lossy())?; - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); - for ((document_id, field), state) in &self.new_states { - let value = match state { - NewState::Updated { value, props } if props.is_indexed() => value, - _ => continue, - }; + // let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); + // for ((document_id, field), state) in &self.new_states { + // let value = match state { + // NewState::Updated { value, props } if props.is_indexed() => value, + // _ => continue, + // }; - for (index, word) in self.tokenizer_builder.build(value) { - let doc_index = DocIndex { - document_id: *document_id, - attribute: field.as_u32() as u8, - attribute_index: index as u32, - }; - // insert the exact representation - let word_lower = word.to_lowercase(); + // for (index, word) in self.tokenizer_builder.build(value) { + // let doc_index = DocIndex { + // document_id: *document_id, + // attribute: field.as_u32() as u8, + // attribute_index: index as u32, + // }; + // // insert the exact representation + // let word_lower = word.to_lowercase(); - // and the unidecoded lowercased version - let word_unidecoded = unidecode::unidecode(word).to_lowercase(); - if word_lower != word_unidecoded { - builder.insert(word_unidecoded, doc_index); - } + // // and the unidecoded lowercased version + // let word_unidecoded = unidecode::unidecode(word).to_lowercase(); + // if word_lower != word_unidecoded { + // builder.insert(word_unidecoded, doc_index); + // } - builder.insert(word_lower, doc_index); - } - } - let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; + // builder.insert(word_lower, doc_index); + // } + // } + // let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; - // write the doc-idx - let blob_key = Identifier::blob(blob_info.name).document_indexes().build(); - file_writer.put(&blob_key, &blob_doc_idx)?; + // // write the doc-idx + // let blob_key = Identifier::blob(blob_info.name).document_indexes().build(); + // file_writer.put(&blob_key, &blob_doc_idx)?; - // write the fst - let blob_key = Identifier::blob(blob_info.name).fst_map().build(); - file_writer.put(&blob_key, &blob_fst_map)?; + // // write the fst + // let blob_key = Identifier::blob(blob_info.name).fst_map().build(); + // file_writer.put(&blob_key, &blob_fst_map)?; - { - // write the blob name to be merged - let mut buffer = Vec::new(); - blob_info.write_into(&mut buffer); - let data_key = Identifier::data().blobs_order().build(); - file_writer.merge(&data_key, &buffer)?; - } + // { + // // write the blob name to be merged + // let mut buffer = Vec::new(); + // blob_info.write_into(&mut buffer); + // let data_key = Identifier::data().blobs_order().build(); + // file_writer.merge(&data_key, &buffer)?; + // } - // write all the documents fields updates - for ((id, attr), state) in self.new_states { - let key = Identifier::document(id).attribute(attr).build(); - match state { - NewState::Updated { value, props } => if props.is_stored() { - file_writer.put(&key, value.as_bytes())? - }, - NewState::Removed => file_writer.delete(&key)?, - } - } + // // write all the documents fields updates + // for ((id, attr), state) in self.new_states { + // let key = Identifier::document(id).attribute(attr).build(); + // match state { + // NewState::Updated { value, props } => if props.is_stored() { + // file_writer.put(&key, value.as_bytes())? + // }, + // NewState::Removed => file_writer.delete(&key)?, + // } + // } - file_writer.finish()?; - Update::open(self.path) + // file_writer.finish()?; + // Update::open(self.path) + + unimplemented!() } } diff --git a/src/lib.rs b/src/lib.rs index 834971a40..ca416204a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ pub mod automaton; pub mod blob; pub mod data; -pub mod database; +pub mod retrieve; pub mod index; pub mod rank; pub mod tokenizer; diff --git a/src/rank/distinct_map.rs b/src/rank/distinct_map.rs new file mode 100644 index 000000000..39262e7c5 --- /dev/null +++ b/src/rank/distinct_map.rs @@ -0,0 +1,65 @@ +use std::collections::HashMap; +use std::hash::Hash; + +pub struct DistinctMap { + inner: HashMap, + limit: usize, + len: usize, +} + +impl DistinctMap { + pub fn new(limit: usize) -> Self { + DistinctMap { + inner: HashMap::new(), + limit: limit, + len: 0, + } + } + + pub fn digest(&mut self, key: K) -> bool { + let seen = self.inner.entry(key).or_insert(0); + if *seen < self.limit { + *seen += 1; + self.len += 1; + true + } else { + false + } + } + + pub fn accept_without_key(&mut self) -> bool { + self.len += 1; + true + } + + pub fn len(&self) -> usize { + self.len + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn easy_distinct_map() { + let mut map = DistinctMap::new(2); + for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] { + map.digest(x); + } + assert_eq!(map.len(), 8); + + let mut map = DistinctMap::new(2); + assert_eq!(map.digest(1), true); + assert_eq!(map.digest(1), true); + assert_eq!(map.digest(1), false); + assert_eq!(map.digest(1), false); + + assert_eq!(map.digest(2), true); + assert_eq!(map.digest(3), true); + assert_eq!(map.digest(2), true); + assert_eq!(map.digest(2), false); + + assert_eq!(map.len(), 5); + } +} diff --git a/src/rank/mod.rs b/src/rank/mod.rs index 39edb8976..cee465cac 100644 --- a/src/rank/mod.rs +++ b/src/rank/mod.rs @@ -1,9 +1,10 @@ pub mod criterion; mod ranked_stream; +mod distinct_map; use crate::{Match, DocumentId}; -pub use self::ranked_stream::{Config, RankedStream}; +pub use self::ranked_stream::{QueryBuilder, DistinctQueryBuilder}; #[inline] fn match_query_index(a: &Match, b: &Match) -> bool { diff --git a/src/rank/ranked_stream.rs b/src/rank/ranked_stream.rs index 9288dca8f..b1abc68d9 100644 --- a/src/rank/ranked_stream.rs +++ b/src/rank/ranked_stream.rs @@ -1,10 +1,7 @@ -use std::ops::{Deref, Range, RangeBounds}; -use std::collections::HashMap; +use std::ops::{Deref, Range}; use std::{mem, vec, str}; -use std::ops::Bound::*; use std::error::Error; use std::hash::Hash; -use std::rc::Rc; use fnv::FnvHashMap; use fst::Streamer; @@ -13,12 +10,11 @@ use ::rocksdb::rocksdb::{DB, Snapshot}; use crate::automaton::{self, DfaExt, AutomatonExt}; use crate::rank::criterion::{self, Criterion}; -use crate::blob::{PositiveBlob, Merge}; -use crate::blob::ops::Union; +use crate::rank::distinct_map::DistinctMap; +use crate::blob::PositiveBlob; use crate::{Match, DocumentId}; -use crate::database::Retrieve; +use crate::retrieve::Retrieve; use crate::rank::Document; -use crate::index::Index; fn clamp_range(range: Range, big: Range) -> Range { Range { @@ -63,9 +59,7 @@ where T: Deref, pub fn with_distinct(self, function: F, size: usize) -> DistinctQueryBuilder { DistinctQueryBuilder { - snapshot: self.snapshot, - blob: self.blob, - criteria: self.criteria, + inner: self, function: function, size: size } @@ -73,16 +67,28 @@ where T: Deref, fn query_all(&self, query: &str) -> Vec { let automatons = split_whitespace_automatons(query); - let mut stream: Union = unimplemented!(); + + let mut stream = { + let mut op_builder = fst::map::OpBuilder::new(); + for automaton in &automatons { + let stream = self.blob.as_map().search(automaton); + op_builder.push(stream); + } + op_builder.union() + }; + let mut matches = FnvHashMap::default(); - while let Some((string, indexed_values)) = stream.next() { + while let Some((input, indexed_values)) = stream.next() { for iv in indexed_values { let automaton = &automatons[iv.index]; - let distance = automaton.eval(string).to_u8(); - let is_exact = distance == 0 && string.len() == automaton.query_len(); + let distance = automaton.eval(input).to_u8(); + let is_exact = distance == 0 && input.len() == automaton.query_len(); - for doc_index in iv.doc_indexes.as_slice() { + let doc_indexes = self.blob.as_indexes(); + let doc_indexes = doc_indexes.get(iv.value).expect("BUG: could not find document indexes"); + + for doc_index in doc_indexes { let match_ = Match { query_index: iv.index as u32, distance: distance, @@ -103,11 +109,11 @@ impl QueryBuilder where T: Deref, C: Criterion, { - pub fn query(&self, query: &str, range: impl RangeBounds) -> Vec { + pub fn query(&self, query: &str, range: Range) -> Vec { let mut documents = self.query_all(query); let mut groups = vec![documents.as_mut_slice()]; - for criterion in self.criteria { + for criterion in &self.criteria { let tmp_groups = mem::replace(&mut groups, Vec::new()); for group in tmp_groups { @@ -118,127 +124,58 @@ where T: Deref, } } - // let range = clamp_range(range, 0..documents.len()); - let range: Range = unimplemented!(); + let range = clamp_range(range, 0..documents.len()); documents[range].to_vec() } } pub struct DistinctQueryBuilder, F, C> { - snapshot: Snapshot, - blob: PositiveBlob, - criteria: Vec, + inner: QueryBuilder, function: F, size: usize, } -// pub struct Schema; -// pub struct DocDatabase; -// where F: Fn(&Schema, &DocDatabase) -> Option, -// K: Hash + Eq, +pub struct DocDatabase; -impl, F, C> DistinctQueryBuilder +impl, F, K, C> DistinctQueryBuilder where T: Deref, + F: Fn(DocumentId, &DocDatabase) -> Option, + K: Hash + Eq, C: Criterion, { - pub fn query(&self, query: &str, range: impl RangeBounds) -> Vec { - // let mut documents = self.retrieve_all_documents(); - // let mut groups = vec![documents.as_mut_slice()]; + pub fn query(&self, query: &str, range: Range) -> Vec { + let mut documents = self.inner.query_all(query); + let mut groups = vec![documents.as_mut_slice()]; - // for criterion in self.criteria { - // let tmp_groups = mem::replace(&mut groups, Vec::new()); + for criterion in &self.inner.criteria { + let tmp_groups = mem::replace(&mut groups, Vec::new()); - // for group in tmp_groups { - // group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); - // for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) { - // groups.push(group); - // } - // } - // } - - // let mut out_documents = Vec::with_capacity(range.len()); - // let (distinct, limit) = self.distinct; - // let mut seen = DistinctMap::new(limit); - - // for document in documents { - // let accepted = match distinct(&document.id) { - // Some(key) => seen.digest(key), - // None => seen.accept_without_key(), - // }; - - // if accepted { - // if seen.len() == range.end { break } - // if seen.len() >= range.start { - // out_documents.push(document); - // } - // } - // } - - // out_documents - - unimplemented!() - } -} - -pub struct DistinctMap { - inner: HashMap, - limit: usize, - len: usize, -} - -impl DistinctMap { - pub fn new(limit: usize) -> Self { - DistinctMap { - inner: HashMap::new(), - limit: limit, - len: 0, + for group in tmp_groups { + group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); + for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) { + groups.push(group); + } + } } - } - pub fn digest(&mut self, key: K) -> bool { - let seen = self.inner.entry(key).or_insert(0); - if *seen < self.limit { - *seen += 1; - self.len += 1; - true - } else { - false + let doc_database = DocDatabase; + let mut out_documents = Vec::with_capacity(range.len()); + let mut seen = DistinctMap::new(self.size); + + for document in documents { + let accepted = match (self.function)(document.id, &doc_database) { + Some(key) => seen.digest(key), + None => seen.accept_without_key(), + }; + + if accepted { + if seen.len() == range.end { break } + if seen.len() >= range.start { + out_documents.push(document); + } + } } - } - pub fn accept_without_key(&mut self) -> bool { - self.len += 1; - true - } - - pub fn len(&self) -> usize { - self.len - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn easy_distinct_map() { - let mut map = DistinctMap::new(2); - for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] { - map.digest(x); - } - assert_eq!(map.len(), 8); - - let mut map = DistinctMap::new(2); - assert_eq!(map.digest(1), true); - assert_eq!(map.digest(1), true); - assert_eq!(map.digest(1), false); - assert_eq!(map.digest(1), false); - - assert_eq!(map.digest(2), true); - assert_eq!(map.digest(3), true); - assert_eq!(map.digest(2), true); - assert_eq!(map.digest(2), false); - - assert_eq!(map.len(), 5); + out_documents } } diff --git a/src/database.rs b/src/retrieve.rs similarity index 57% rename from src/database.rs rename to src/retrieve.rs index bcc25661e..d9ba3f4a0 100644 --- a/src/database.rs +++ b/src/retrieve.rs @@ -1,16 +1,32 @@ use std::error::Error; use std::ops::Deref; -use ::rocksdb::rocksdb::{DB, Snapshot}; +use ::rocksdb::rocksdb::{DB, Snapshot, DBVector}; -use crate::index::schema::Schema; +use crate::index::schema::{Schema, SchemaAttr}; use crate::blob::PositiveBlob; use crate::DocumentId; +pub struct DocDatabase<'a, R: ?Sized> { + retrieve: &'a R, + schema: Schema, +} + +impl<'a, R> DocDatabase<'a, R> { + pub fn get_document(&self, id: DocumentId) -> Result, Box> { + // if ids.is_empty() { return Ok(Vec::new()) } + unimplemented!() + } + + pub fn get_document_attribute(&self, id: DocumentId, attr: SchemaAttr) -> Result> { + unimplemented!() + } +} + pub trait Retrieve { fn schema(&self) -> Result, Box>; fn data_index(&self) -> Result>; - fn get_documents(&self, ids: &[DocumentId]) -> Result, Box>; + fn doc_database(&self) -> Result, Box>; } impl Retrieve for Snapshot @@ -30,13 +46,15 @@ where T: Deref, } } - fn get_documents(&self, ids: &[DocumentId]) -> Result, Box> { - if ids.is_empty() { return Ok(Vec::new()) } + fn doc_database(&self) -> Result, Box> { let schema = match self.schema()? { Some(schema) => schema, None => return Err(String::from("BUG: could not find schema").into()), }; - unimplemented!() + Ok(DocDatabase { + retrieve: self, + schema: schema, + }) } }