feat: Make the OpBuilder work only for PositiveBlob

This commit is contained in:
Clément Renault 2018-11-28 17:12:24 +01:00
parent 9b58ffe2d9
commit 612a8d9d44
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
24 changed files with 658 additions and 1373 deletions

View File

@ -8,13 +8,13 @@ authors = ["Kerollmops <renault.cle@gmail.com>"]
bincode = "1.0"
byteorder = "1.2"
fnv = "1.0"
itertools = "0.7"
lazy_static = "1.1"
linked-hash-map = { version = "0.5", features = ["serde_impl"] }
sdset = "0.3"
serde = "1.0"
serde_derive = "1.0"
unidecode = "0.3"
uuid = { version = "0.7", features = ["serde", "v4"] }
[dependencies.fst]
git = "https://github.com/Kerollmops/fst.git"

View File

@ -1,504 +0,0 @@
use std::collections::BinaryHeap;
use std::rc::Rc;
use std::cmp;
use fst::{Automaton, Streamer};
use fst::automaton::AlwaysMatch;
use sdset::{Set, SetOperation};
use sdset::duo::OpBuilder as SdOpBuilder;
use group_by::GroupBy;
use crate::DocIndex;
use crate::blob::{Blob, Sign};
use crate::vec_read_only::VecReadOnly;
use crate::blob::ops::{OpBuilder, Union, IndexedDocIndexes};
fn group_is_negative(blobs: &&[Blob]) -> bool {
blobs[0].sign() == Sign::Negative
}
fn blob_same_sign(a: &Blob, b: &Blob) -> bool {
a.sign() == b.sign()
}
fn sign_from_group_index(group: usize) -> Sign {
if group % 2 == 0 {
Sign::Positive
} else {
Sign::Negative
}
}
pub struct Merge<'b> {
heap: GroupHeap<'b>,
outs: Vec<IndexedDocIndexes>,
cur_slot: Option<Slot>,
}
impl<'b> Merge<'b> {
pub fn always_match(blobs: &'b [Blob]) -> Self {
Self::with_automatons(vec![AlwaysMatch], blobs)
}
}
impl<'b> Merge<'b> {
pub fn with_automatons<A>(automatons: Vec<A>, blobs: &'b [Blob]) -> Self
where A: 'b + Automaton + Clone
{
let mut groups = Vec::new();
// We can skip blobs that are negative: they didn't remove anything at the start
for blobs in GroupBy::new(blobs, blob_same_sign).skip_while(group_is_negative) {
let mut builder = OpBuilder::with_automatons(automatons.clone());
for blob in blobs {
builder.push(blob);
}
groups.push(builder.union());
}
let mut heap = GroupHeap::new(groups);
heap.refill();
Merge {
heap: heap,
outs: Vec::new(),
cur_slot: None,
}
}
}
impl<'b, 'a> Streamer<'a> for Merge<'b> {
type Item = (&'a [u8], &'a [IndexedDocIndexes]);
fn next(&'a mut self) -> Option<Self::Item> {
self.outs.clear();
loop {
if let Some(slot) = self.cur_slot.take() {
self.heap.refill();
}
let slot = match self.heap.pop() {
None => return None,
Some(slot) => {
self.cur_slot = Some(slot);
self.cur_slot.as_ref().unwrap()
}
};
let mut doc_indexes = Vec::new();
let mut doc_indexes_slots = Vec::with_capacity(self.heap.num_groups());
let len = match sign_from_group_index(slot.grp_index) {
Sign::Positive => {
doc_indexes.extend_from_slice(&slot.output);
slot.output.len()
},
Sign::Negative => 0,
};
let mut slotidi = SlotIndexedDocIndexes {
index: slot.aut_index,
start: 0,
len: len,
};
let mut buffer = Vec::new();
while let Some(slot2) = self.heap.pop_if_equal(slot.input()) {
if slotidi.index == slot2.aut_index {
buffer.clear();
buffer.extend(doc_indexes.drain(slotidi.start..));
let a = Set::new_unchecked(&buffer);
let b = Set::new_unchecked(&slot2.output);
match sign_from_group_index(slot2.grp_index) {
Sign::Positive => { SdOpBuilder::new(a, b).union().extend_vec(&mut doc_indexes) },
Sign::Negative => SdOpBuilder::new(a, b).difference().extend_vec(&mut doc_indexes),
}
slotidi.len = doc_indexes.len() - slotidi.start;
} else {
if slotidi.len != 0 {
doc_indexes_slots.push(slotidi);
}
slotidi = SlotIndexedDocIndexes {
index: slot2.aut_index,
start: doc_indexes.len(),
len: slot2.output.len(),
};
buffer.extend_from_slice(&slot2.output);
}
}
if slotidi.len != 0 {
doc_indexes_slots.push(slotidi);
}
let read_only = VecReadOnly::new(doc_indexes);
self.outs.reserve(doc_indexes_slots.len());
for slot in doc_indexes_slots {
let indexes = IndexedDocIndexes {
index: slot.index,
doc_indexes: read_only.range(slot.start, slot.len),
};
self.outs.push(indexes);
}
if !self.outs.is_empty() {
let slot = self.cur_slot.as_ref().unwrap(); // FIXME
return Some((slot.input(), &self.outs))
}
}
}
}
struct SlotIndexedDocIndexes {
index: usize,
start: usize,
len: usize,
}
#[derive(Debug, Eq, PartialEq)]
struct Slot {
grp_index: usize,
aut_index: usize,
input: Rc<Vec<u8>>,
output: VecReadOnly<DocIndex>,
}
impl Slot {
fn input(&self) -> &[u8] {
&self.input
}
}
impl PartialOrd for Slot {
fn partial_cmp(&self, other: &Slot) -> Option<cmp::Ordering> {
(&self.input, self.aut_index, self.grp_index, &self.output)
.partial_cmp(&(&other.input, other.aut_index, other.grp_index, &other.output))
.map(|ord| ord.reverse())
}
}
impl Ord for Slot {
fn cmp(&self, other: &Slot) -> cmp::Ordering {
self.partial_cmp(other).unwrap()
}
}
struct GroupHeap<'b> {
groups: Vec<Union<'b>>,
heap: BinaryHeap<Slot>,
}
impl<'b> GroupHeap<'b> {
fn new(groups: Vec<Union<'b>>) -> GroupHeap<'b> {
GroupHeap {
groups: groups,
heap: BinaryHeap::new(),
}
}
fn num_groups(&self) -> usize {
self.groups.len()
}
fn pop(&mut self) -> Option<Slot> {
self.heap.pop()
}
fn peek_is_duplicate(&self, key: &[u8]) -> bool {
self.heap.peek().map(|s| *s.input == key).unwrap_or(false)
}
fn pop_if_equal(&mut self, key: &[u8]) -> Option<Slot> {
if self.peek_is_duplicate(key) { self.pop() } else { None }
}
fn refill(&mut self) {
for (i, group) in self.groups.iter_mut().enumerate() {
if let Some((input, doc_indexes)) = group.next() {
let input = Rc::new(input.to_vec());
for doc_index in doc_indexes {
let slot = Slot {
input: input.clone(),
grp_index: i,
aut_index: doc_index.index,
output: doc_index.doc_indexes.clone(),
};
self.heap.push(slot);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blob::{PositiveBlobBuilder, NegativeBlobBuilder};
use crate::DocIndex;
fn get_all<'m, I, S>(stream: I) -> Vec<(String, VecReadOnly<DocIndex>)>
where
I: for<'a> fst::IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [IndexedDocIndexes])>,
S: 'm + for<'a> fst::Streamer<'a, Item=(&'a [u8], &'a [IndexedDocIndexes])>,
{
let mut result = Vec::new();
let mut stream = stream.into_stream();
while let Some((string, indexes)) = stream.next() {
let string = String::from_utf8(string.to_owned()).unwrap();
result.push((string, indexes[0].doc_indexes.clone()))
}
result
}
#[test]
fn single_positive_blob() {
let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 };
let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 };
let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 };
let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 };
let a = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("hell", doc2);
builder.insert("hello", doc3);
builder.insert("wor", doc4);
Blob::Positive(builder.build().unwrap())
};
let blobs = &[a];
let merge = Merge::always_match(blobs);
let value = get_all(merge);
assert_eq!(value.len(), 3);
assert_eq!(value[0].0, "hell");
assert_eq!(&*value[0].1, &[doc1, doc2][..]);
assert_eq!(value[1].0, "hello");
assert_eq!(&*value[1].1, &[doc3][..]);
assert_eq!(value[2].0, "wor");
assert_eq!(&*value[2].1, &[doc4][..]);
}
#[test]
fn single_negative_blob() {
let a = {
let mut builder = NegativeBlobBuilder::new(Vec::new());
builder.insert(1);
builder.insert(2);
builder.insert(3);
builder.insert(4);
Blob::Negative(builder.build().unwrap())
};
let blobs = &[a];
let merge = Merge::always_match(blobs);
let value = get_all(merge);
assert_eq!(value.len(), 0);
}
#[test]
fn two_positive_blobs() {
let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 };
let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 };
let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 };
let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 };
let a = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("wor", doc4);
Blob::Positive(builder.build().unwrap())
};
let b = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc2);
builder.insert("hello", doc3);
Blob::Positive(builder.build().unwrap())
};
let blobs = &[a, b];
let merge = Merge::always_match(blobs);
let value = get_all(merge);
assert_eq!(value.len(), 3);
assert_eq!(value[0].0, "hell");
assert_eq!(&*value[0].1, &[doc1, doc2][..]);
assert_eq!(value[1].0, "hello");
assert_eq!(&*value[1].1, &[doc3][..]);
assert_eq!(value[2].0, "wor");
assert_eq!(&*value[2].1, &[doc4][..]);
}
#[test]
fn one_positive_one_negative_blobs() {
let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 };
let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 };
let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 };
let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 };
let a = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("hell", doc2);
builder.insert("hello", doc3);
builder.insert("wor", doc4);
Blob::Positive(builder.build().unwrap())
};
let b = {
let mut builder = NegativeBlobBuilder::new(Vec::new());
builder.insert(2);
builder.insert(3);
Blob::Negative(builder.build().unwrap())
};
let blobs = &[a, b];
let merge = Merge::always_match(blobs);
let value = get_all(merge);
assert_eq!(value.len(), 2);
assert_eq!(value[0].0, "hell");
assert_eq!(&*value[0].1, &[doc1][..]);
assert_eq!(value[1].0, "wor");
assert_eq!(&*value[1].1, &[doc4][..]);
}
#[test]
fn alternate_positive_negative_blobs() {
let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 };
let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 };
let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 };
let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 };
let a = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("hell", doc2);
builder.insert("hello", doc3);
Blob::Positive(builder.build().unwrap())
};
let b = {
let mut builder = NegativeBlobBuilder::new(Vec::new());
builder.insert(1);
builder.insert(4);
Blob::Negative(builder.build().unwrap())
};
let c = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("wor", doc4);
Blob::Positive(builder.build().unwrap())
};
let d = {
let mut builder = NegativeBlobBuilder::new(Vec::new());
builder.insert(1);
Blob::Negative(builder.build().unwrap())
};
let blobs = &[a, b, c, d];
let merge = Merge::always_match(blobs);
let value = get_all(merge);
assert_eq!(value.len(), 3);
assert_eq!(value[0].0, "hell");
assert_eq!(&*value[0].1, &[doc2][..]);
assert_eq!(value[1].0, "hello");
assert_eq!(&*value[1].1, &[doc3][..]);
assert_eq!(value[2].0, "wor");
assert_eq!(&*value[2].1, &[doc4][..]);
}
#[test]
fn alternate_multiple_positive_negative_blobs() {
let doc1 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 0 };
let doc2 = DocIndex{ document_id: 12, attribute: 0, attribute_index: 2 };
let doc3 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 1 };
let doc4 = DocIndex{ document_id: 0, attribute: 0, attribute_index: 2 };
let a = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("hell", doc2);
builder.insert("hello", doc3);
Blob::Positive(builder.build().unwrap())
};
let b = {
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
builder.insert("hell", doc1);
builder.insert("wor", doc4);
Blob::Positive(builder.build().unwrap())
};
let c = {
let mut builder = NegativeBlobBuilder::new(Vec::new());
builder.insert(1);
builder.insert(4);
Blob::Negative(builder.build().unwrap())
};
let d = {
let mut builder = NegativeBlobBuilder::new(Vec::new());
builder.insert(1);
Blob::Negative(builder.build().unwrap())
};
let blobs = &[a, b, c, d];
let merge = Merge::always_match(blobs);
let value = get_all(merge);
assert_eq!(value.len(), 2);
assert_eq!(value[0].0, "hell");
assert_eq!(&*value[0].1, &[doc2][..]);
assert_eq!(value[1].0, "hello");
assert_eq!(&*value[1].1, &[doc3][..]);
}
}

View File

@ -1,31 +1,30 @@
mod merge;
pub mod ops;
mod ops_indexed_value;
mod positive_blob;
mod negative_blob;
mod ops;
pub mod positive;
pub mod negative;
pub use self::merge::Merge;
pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder};
pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder};
pub use self::positive::{PositiveBlob, RawPositiveBlobBuilder, PositiveBlobBuilder};
pub use self::negative::{NegativeBlob, NegativeBlobBuilder};
pub use self::ops::OpBuilder;
use std::error::Error;
use std::io::{Write, Read};
use std::{io, fmt, mem};
use std::fmt;
use uuid::Uuid;
use rocksdb::rocksdb::{DB, Snapshot};
use serde::ser::{Serialize, Serializer, SerializeTuple};
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
use crate::index::identifier::Identifier;
use crate::data::DocIndexes;
pub enum Blob {
Positive(PositiveBlob),
Negative(NegativeBlob),
}
impl Blob {
pub fn is_negative(&self) -> bool {
self.sign() == Sign::Negative
}
pub fn is_positive(&self) -> bool {
self.sign() == Sign::Positive
}
pub fn sign(&self) -> Sign {
match self {
Blob::Positive(_) => Sign::Positive,

View File

@ -37,6 +37,12 @@ impl NegativeBlob {
}
}
impl AsRef<[DocumentId]> for NegativeBlob {
fn as_ref(&self) -> &[DocumentId] {
self.as_ids().doc_ids()
}
}
impl Serialize for NegativeBlob {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.doc_ids.serialize(serializer)

5
src/blob/negative/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod blob;
mod ops;
pub use self::blob::{NegativeBlob, NegativeBlobBuilder};
pub use self::ops::OpBuilder;

73
src/blob/negative/ops.rs Normal file
View File

@ -0,0 +1,73 @@
use sdset::multi::OpBuilder as SdOpBuilder;
use sdset::Set;
use crate::blob::NegativeBlob;
use crate::data::DocIds;
use crate::DocumentId;
pub struct OpBuilder<'a> {
inner: SdOpBuilder<'a, DocumentId>,
}
/// Do a set operation on multiple negative blobs.
impl<'a> OpBuilder<'a> {
pub fn new() -> Self {
Self { inner: SdOpBuilder::new() }
}
pub fn with_capacity(cap: usize) -> Self {
Self { inner: SdOpBuilder::with_capacity(cap) }
}
pub fn add(mut self, blob: &'a NegativeBlob) -> Self {
self.push(blob);
self
}
pub fn push(&mut self, blob: &'a NegativeBlob) {
let set = Set::new_unchecked(blob.as_ref());
self.inner.push(set);
}
pub fn union(self) -> Union<'a> {
Union::new(self.inner.union())
}
pub fn intersection(self) -> Intersection<'a> {
Intersection::new(self.inner.intersection())
}
pub fn difference(self) -> Difference<'a> {
Difference::new(self.inner.difference())
}
pub fn symmetric_difference(self) -> SymmetricDifference<'a> {
SymmetricDifference::new(self.inner.symmetric_difference())
}
}
macro_rules! logical_operation {
(struct $name:ident, $operation:ident) => {
pub struct $name<'a> {
op: sdset::multi::$name<'a, DocumentId>,
}
impl<'a> $name<'a> {
fn new(op: sdset::multi::$name<'a, DocumentId>) -> Self {
$name { op }
}
pub fn into_negative_blob(self) -> NegativeBlob {
let document_ids = sdset::SetOperation::into_set_buf(self.op);
let doc_ids = DocIds::from_document_ids(document_ids.into_vec());
NegativeBlob::from_raw(doc_ids)
}
}
}}
logical_operation!(struct Union, union);
logical_operation!(struct Intersection, intersection);
logical_operation!(struct Difference, difference);
logical_operation!(struct SymmetricDifference, symmetric_difference);

View File

@ -1,334 +1,109 @@
use std::collections::BTreeMap;
use std::error::Error;
use fst::{map, Streamer, Automaton};
use fst::automaton::AlwaysMatch;
use sdset::multi::OpBuilder as SdOpBuilder;
use sdset::{SetOperation, Set};
use fst::{IntoStreamer, Streamer};
use group_by::GroupBy;
use itertools::{Itertools, Either};
use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation};
use crate::blob::ops_indexed_value::{
OpIndexedValueBuilder, UnionIndexedValue,
};
use crate::blob::Blob;
use crate::data::DocIndexes;
use crate::vec_read_only::VecReadOnly;
use crate::DocIndex;
use crate::blob::{Blob, Sign, PositiveBlob, RawPositiveBlobBuilder, NegativeBlob};
use crate::blob::{positive, negative};
pub struct OpBuilder<'m, A: Automaton> {
// the operation on the maps is always an union.
maps: OpIndexedValueBuilder<'m>,
automatons: Vec<A>,
indexes: Vec<&'m DocIndexes>,
fn blob_same_sign(a: &Blob, b: &Blob) -> bool {
a.sign() == b.sign()
}
impl<'m> OpBuilder<'m, AlwaysMatch> {
pub fn new() -> Self {
Self {
maps: OpIndexedValueBuilder::new(),
automatons: vec![AlwaysMatch],
indexes: Vec::new(),
}
}
}
/// Do a set operation on multiple maps with the same automatons.
impl<'m, A: 'm + Automaton> OpBuilder<'m, A> {
pub fn with_automatons(automatons: Vec<A>) -> Self {
Self {
maps: OpIndexedValueBuilder::new(),
automatons: automatons,
indexes: Vec::new(),
}
}
pub fn add(mut self, blob: &'m Blob) -> Self
where A: Clone
{
self.push(blob);
self
}
pub fn push(&mut self, blob: &'m Blob)
where A: Clone
{
fn unwrap_positive(blob: &Blob) -> &PositiveBlob {
match blob {
Blob::Positive(blob) => {
let mut op = map::OpBuilder::new();
for automaton in self.automatons.iter().cloned() {
let stream = blob.as_map().search(automaton);
op.push(stream);
Blob::Positive(blob) => blob,
Blob::Negative(_) => panic!("called `Blob::unwrap_positive()` on a `Negative` value"),
}
}
let stream = op.union();
let indexes = blob.as_indexes();
fn unwrap_negative(blob: &Blob) -> &NegativeBlob {
match blob {
Blob::Negative(blob) => blob,
Blob::Positive(_) => panic!("called `Blob::unwrap_negative()` on a `Positive` value"),
}
}
self.maps.push(stream);
self.indexes.push(indexes);
pub struct OpBuilder {
blobs: Vec<Blob>,
}
impl OpBuilder {
pub fn new() -> OpBuilder {
OpBuilder { blobs: Vec::new() }
}
pub fn with_capacity(cap: usize) -> OpBuilder {
OpBuilder { blobs: Vec::with_capacity(cap) }
}
pub fn push(&mut self, blob: Blob) {
if self.blobs.is_empty() && blob.is_negative() { return }
self.blobs.push(blob);
}
pub fn merge(self) -> Result<PositiveBlob, Box<Error>> {
let groups = GroupBy::new(&self.blobs, blob_same_sign);
let (positives, negatives): (Vec<_>, Vec<_>) = groups.partition_map(|blobs| {
match blobs[0].sign() {
Sign::Positive => {
let mut op_builder = positive::OpBuilder::with_capacity(blobs.len());
for blob in blobs {
op_builder.push(unwrap_positive(blob));
}
let mut stream = op_builder.union().into_stream();
let mut builder = RawPositiveBlobBuilder::memory();
while let Some((input, doc_indexes)) = stream.next() {
// FIXME empty doc_indexes must be handled by OpBuilder
if !doc_indexes.is_empty() {
builder.insert(input, doc_indexes).unwrap();
}
}
let (map, doc_indexes) = builder.into_inner().unwrap();
let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap();
Either::Left(blob)
},
Blob::Negative(blob) => {
unimplemented!()
Sign::Negative => {
let mut op_builder = negative::OpBuilder::with_capacity(blobs.len());
for blob in blobs {
op_builder.push(unwrap_negative(blob));
}
let blob = op_builder.union().into_negative_blob();
Either::Right(blob)
},
}
}
});
pub fn union(self) -> Union<'m> {
Union::new(self.maps, self.indexes, self.automatons.len())
}
let mut zipped = positives.into_iter().zip(negatives);
let mut buffer = Vec::new();
pub fn intersection(self) -> Intersection<'m> {
Intersection::new(self.maps, self.indexes, self.automatons.len())
}
zipped.try_fold(PositiveBlob::default(), |base, (positive, negative)| {
let mut builder = RawPositiveBlobBuilder::memory();
let doc_ids = Set::new_unchecked(negative.as_ref());
pub fn difference(self) -> Difference<'m> {
Difference::new(self.maps, self.indexes, self.automatons.len())
}
let op_builder = positive::OpBuilder::new().add(&base).add(&positive);
let mut stream = op_builder.union().into_stream();
pub fn symmetric_difference(self) -> SymmetricDifference<'m> {
SymmetricDifference::new(self.maps, self.indexes, self.automatons.len())
while let Some((input, doc_indexes)) = stream.next() {
let doc_indexes = Set::new_unchecked(doc_indexes);
let op = DifferenceByKey::new(doc_indexes, doc_ids, |x| x.document_id, |x| *x);
buffer.clear();
op.extend_vec(&mut buffer);
if !buffer.is_empty() {
builder.insert(input, &buffer)?;
}
}
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct IndexedDocIndexes {
pub index: usize,
pub doc_indexes: VecReadOnly<DocIndex>,
}
struct SlotIndexedDocIndexes {
index: usize,
start: usize,
len: usize,
}
macro_rules! logical_operation {
(struct $name:ident, $operation:ident) => {
pub struct $name<'m> {
maps: UnionIndexedValue<'m>,
indexes: Vec<&'m DocIndexes>,
number_automatons: usize,
outs: Vec<IndexedDocIndexes>,
}
impl<'m> $name<'m> {
fn new(maps: OpIndexedValueBuilder<'m>, indexes: Vec<&'m DocIndexes>, number_automatons: usize) -> Self {
$name {
maps: maps.union(),
indexes: indexes,
number_automatons: number_automatons,
outs: Vec::new(),
}
}
}
impl<'m, 'a> fst::Streamer<'a> for $name<'m> {
type Item = (&'a [u8], &'a [IndexedDocIndexes]);
fn next(&'a mut self) -> Option<Self::Item> {
match self.maps.next() {
Some((input, ivalues)) => {
self.outs.clear();
let mut builders = vec![BTreeMap::new(); self.number_automatons];
for iv in ivalues {
let builder = &mut builders[iv.aut_index];
builder.insert(iv.rdr_index, iv.value);
}
let mut doc_indexes = Vec::new();
let mut doc_indexes_slots = Vec::with_capacity(builders.len());
for (aut_index, values) in builders.into_iter().enumerate() {
let mut builder = SdOpBuilder::with_capacity(values.len());
for (rdr_index, value) in values {
let indexes = self.indexes[rdr_index].get(value).expect("could not find indexes");
let indexes = Set::new_unchecked(indexes);
builder.push(indexes);
}
let start = doc_indexes.len();
builder.$operation().extend_vec(&mut doc_indexes);
let len = doc_indexes.len() - start;
if len != 0 {
let slot = SlotIndexedDocIndexes {
index: aut_index,
start: start,
len: len,
};
doc_indexes_slots.push(slot);
}
}
let read_only = VecReadOnly::new(doc_indexes);
self.outs.reserve(doc_indexes_slots.len());
for slot in doc_indexes_slots {
let indexes = IndexedDocIndexes {
index: slot.index,
doc_indexes: read_only.range(slot.start, slot.len),
};
self.outs.push(indexes);
}
if self.outs.is_empty() { return None }
Some((input, &self.outs))
},
None => None,
}
}
}
}}
logical_operation!(struct Union, union);
logical_operation!(struct Intersection, intersection);
logical_operation!(struct Difference, difference);
logical_operation!(struct SymmetricDifference, symmetric_difference);
#[cfg(test)]
mod tests {
use super::*;
use crate::blob::PositiveBlobBuilder;
fn get_exact_key<'m, I, S>(stream: I, key: &[u8]) -> Option<VecReadOnly<DocIndex>>
where
I: for<'a> fst::IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [IndexedDocIndexes])>,
S: 'm + for<'a> fst::Streamer<'a, Item=(&'a [u8], &'a [IndexedDocIndexes])>,
{
let mut stream = stream.into_stream();
while let Some((string, indexes)) = stream.next() {
if string == key {
return Some(indexes[0].doc_indexes.clone())
}
}
None
}
#[test]
fn union_two_blobs() {
let doc1 = DocIndex { document_id: 12, attribute: 1, attribute_index: 22 };
let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 };
let meta1 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc1);
Blob::Positive(builder.build().unwrap())
};
let meta2 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc2);
Blob::Positive(builder.build().unwrap())
};
let metas = OpBuilder::new().add(&meta1).add(&meta2).union();
let value = get_exact_key(metas, b"chameau");
assert_eq!(&*value.unwrap(), &[doc1, doc2][..]);
}
#[test]
fn intersection_two_blobs() {
let doc1 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 };
let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 };
let meta1 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc1);
Blob::Positive(builder.build().unwrap())
};
let meta2 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc2);
Blob::Positive(builder.build().unwrap())
};
let metas = OpBuilder::new().add(&meta1).add(&meta2).intersection();
let value = get_exact_key(metas, b"chameau");
assert_eq!(&*value.unwrap(), &[doc1][..]);
}
#[test]
fn difference_two_blobs() {
let doc1 = DocIndex { document_id: 12, attribute: 1, attribute_index: 22 };
let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 };
let doc3 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 };
let meta1 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc1);
builder.insert("chameau", doc2);
Blob::Positive(builder.build().unwrap())
};
let meta2 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc3);
Blob::Positive(builder.build().unwrap())
};
let metas = OpBuilder::new().add(&meta1).add(&meta2).difference();
let value = get_exact_key(metas, b"chameau");
assert_eq!(&*value.unwrap(), &[doc1][..]);
}
#[test]
fn symmetric_difference_two_blobs() {
let doc1 = DocIndex { document_id: 12, attribute: 1, attribute_index: 22 };
let doc2 = DocIndex { document_id: 31, attribute: 0, attribute_index: 1 };
let doc3 = DocIndex { document_id: 32, attribute: 0, attribute_index: 1 };
let doc4 = DocIndex { document_id: 34, attribute: 12, attribute_index: 1 };
let meta1 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc1);
builder.insert("chameau", doc2);
builder.insert("chameau", doc3);
Blob::Positive(builder.build().unwrap())
};
let meta2 = {
let mapw = Vec::new();
let indexesw = Vec::new();
let mut builder = PositiveBlobBuilder::new(mapw, indexesw);
builder.insert("chameau", doc2);
builder.insert("chameau", doc3);
builder.insert("chameau", doc4);
Blob::Positive(builder.build().unwrap())
};
let metas = OpBuilder::new().add(&meta1).add(&meta2).symmetric_difference();
let value = get_exact_key(metas, b"chameau");
assert_eq!(&*value.unwrap(), &[doc1, doc4][..]);
let (map, doc_indexes) = builder.into_inner()?;
PositiveBlob::from_bytes(map, doc_indexes)
})
}
}

View File

@ -1,203 +0,0 @@
use std::collections::BinaryHeap;
use std::rc::Rc;
use std::cmp;
use fst::raw::{self, Output};
use fst::{self, IntoStreamer, Streamer};
type BoxedStream<'f> = Box<for<'a> Streamer<'a, Item=(&'a [u8], &'a [raw::IndexedValue])> + 'f>;
pub struct OpIndexedValueBuilder<'f> {
streams: Vec<BoxedStream<'f>>,
}
impl<'f> OpIndexedValueBuilder<'f> {
pub fn new() -> Self {
Self { streams: Vec::new() }
}
pub fn push<I, S>(&mut self, stream: I)
where
I: for<'a> IntoStreamer<'a, Into=S, Item=(&'a [u8], &'a [raw::IndexedValue])>,
S: 'f + for<'a> Streamer<'a, Item=(&'a [u8], &'a [raw::IndexedValue])>,
{
self.streams.push(Box::new(stream.into_stream()));
}
pub fn union(self) -> UnionIndexedValue<'f> {
UnionIndexedValue {
heap: StreamIndexedValueHeap::new(self.streams),
outs: Vec::new(),
cur_slot: None,
}
}
}
pub struct UnionIndexedValue<'f> {
heap: StreamIndexedValueHeap<'f>,
outs: Vec<IndexedValue>,
cur_slot: Option<SlotIndexedValue>,
}
impl<'f> UnionIndexedValue<'f> {
pub fn len(&self) -> usize {
self.heap.num_slots()
}
}
impl<'a, 'm> fst::Streamer<'a> for UnionIndexedValue<'m> {
type Item = (&'a [u8], &'a [IndexedValue]);
fn next(&'a mut self) -> Option<Self::Item> {
if let Some(slot) = self.cur_slot.take() {
self.heap.refill(slot);
}
let slot = match self.heap.pop() {
None => return None,
Some(slot) => {
self.cur_slot = Some(slot);
self.cur_slot.as_mut().unwrap()
}
};
self.outs.clear();
self.outs.push(slot.indexed_value());
while let Some(slot2) = self.heap.pop_if_equal(slot.input()) {
self.outs.push(slot2.indexed_value());
self.heap.refill(slot2);
}
Some((slot.input(), &self.outs))
}
}
struct StreamIndexedValueHeap<'f> {
rdrs: Vec<BoxedStream<'f>>,
heap: BinaryHeap<SlotIndexedValue>,
}
impl<'f> StreamIndexedValueHeap<'f> {
fn new(streams: Vec<BoxedStream<'f>>) -> StreamIndexedValueHeap<'f> {
let mut u = StreamIndexedValueHeap {
rdrs: streams,
heap: BinaryHeap::new(),
};
for i in 0..u.rdrs.len() {
u.refill(SlotIndexedValue::new(i));
}
u
}
fn pop(&mut self) -> Option<SlotIndexedValue> {
self.heap.pop()
}
fn peek_is_duplicate(&self, key: &[u8]) -> bool {
self.heap.peek().map(|s| s.input() == key).unwrap_or(false)
}
fn pop_if_equal(&mut self, key: &[u8]) -> Option<SlotIndexedValue> {
if self.peek_is_duplicate(key) {
self.pop()
} else {
None
}
}
fn pop_if_le(&mut self, key: &[u8]) -> Option<SlotIndexedValue> {
if self.heap.peek().map(|s| s.input() <= key).unwrap_or(false) {
self.pop()
} else {
None
}
}
fn num_slots(&self) -> usize {
self.rdrs.len()
}
fn refill(&mut self, mut slot: SlotIndexedValue) {
if let Some((input, ivalues)) = self.rdrs[slot.rdr_index].next() {
slot.set_input(input);
for values in ivalues {
slot.set_aut_index(values.index);
slot.set_output(values.value);
self.heap.push(slot.clone());
}
}
}
}
#[derive(Debug, Clone)]
struct SlotIndexedValue {
rdr_index: usize,
aut_index: usize,
input: Rc<Vec<u8>>,
output: Output,
}
#[derive(Debug)]
pub struct IndexedValue {
pub rdr_index: usize,
pub aut_index: usize,
pub value: u64,
}
impl PartialEq for SlotIndexedValue {
fn eq(&self, other: &Self) -> bool {
(&self.input, self.rdr_index, self.aut_index, self.output)
.eq(&(&other.input, other.rdr_index, other.aut_index, other.output))
}
}
impl Eq for SlotIndexedValue { }
impl PartialOrd for SlotIndexedValue {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
(&self.input, self.rdr_index, self.aut_index, self.output)
.partial_cmp(&(&other.input, other.rdr_index, other.aut_index, other.output))
.map(|ord| ord.reverse())
}
}
impl Ord for SlotIndexedValue {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.partial_cmp(other).unwrap()
}
}
impl SlotIndexedValue {
fn new(rdr_index: usize) -> SlotIndexedValue {
SlotIndexedValue {
rdr_index: rdr_index,
aut_index: 0,
input: Rc::new(Vec::with_capacity(64)),
output: Output::zero(),
}
}
fn indexed_value(&self) -> IndexedValue {
IndexedValue {
rdr_index: self.rdr_index,
aut_index: self.aut_index,
value: self.output.value(),
}
}
fn input(&self) -> &[u8] {
&self.input
}
fn set_aut_index(&mut self, aut_index: usize) {
self.aut_index = aut_index;
}
fn set_input(&mut self, input: &[u8]) {
if *self.input != input {
let inner = Rc::make_mut(&mut self.input);
inner.clear();
inner.extend(input);
}
}
fn set_output(&mut self, output: u64) {
self.output = Output::new(output);
}
}

View File

@ -3,10 +3,10 @@ use std::io::Write;
use std::path::Path;
use std::error::Error;
use fst::{Map, MapBuilder};
use fst::{map, Map, Streamer, IntoStreamer};
use crate::DocIndex;
use crate::data::{DocIndexes, DocIndexesBuilder};
use crate::data::{DocIndexes, RawDocIndexesBuilder, DocIndexesBuilder};
use serde::ser::{Serialize, Serializer, SerializeTuple};
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
@ -53,6 +53,40 @@ impl PositiveBlob {
}
}
impl<'m, 'a> IntoStreamer<'a> for &'m PositiveBlob {
type Item = (&'a [u8], &'a [DocIndex]);
/// The type of the stream to be constructed.
type Into = PositiveBlobStream<'m>;
/// Construct a stream from `Self`.
fn into_stream(self) -> Self::Into {
PositiveBlobStream {
map_stream: self.map.into_stream(),
doc_indexes: &self.indexes,
}
}
}
pub struct PositiveBlobStream<'m> {
map_stream: map::Stream<'m>,
doc_indexes: &'m DocIndexes,
}
impl<'m, 'a> Streamer<'a> for PositiveBlobStream<'m> {
type Item = (&'a [u8], &'a [DocIndex]);
fn next(&'a mut self) -> Option<Self::Item> {
match self.map_stream.next() {
Some((input, index)) => {
let doc_indexes = self.doc_indexes.get(index);
let doc_indexes = doc_indexes.expect("BUG: could not find document indexes");
Some((input, doc_indexes))
},
None => None,
}
}
}
impl Serialize for PositiveBlob {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut tuple = serializer.serialize_tuple(2)?;
@ -99,6 +133,50 @@ impl<'de> Deserialize<'de> for PositiveBlob {
}
}
pub struct RawPositiveBlobBuilder<W, X> {
map: fst::MapBuilder<W>,
indexes: RawDocIndexesBuilder<X>,
value: u64,
}
impl RawPositiveBlobBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
RawPositiveBlobBuilder {
map: fst::MapBuilder::memory(),
indexes: RawDocIndexesBuilder::memory(),
value: 0,
}
}
}
impl<W: Write, X: Write> RawPositiveBlobBuilder<W, X> {
pub fn new(map: W, indexes: X) -> Result<Self, Box<Error>> {
Ok(RawPositiveBlobBuilder {
map: fst::MapBuilder::new(map)?,
indexes: RawDocIndexesBuilder::new(indexes),
value: 0,
})
}
// FIXME what if one write doesn't work but the other do ?
pub fn insert(&mut self, key: &[u8], doc_indexes: &[DocIndex]) -> Result<(), Box<Error>> {
self.map.insert(key, self.value)?;
self.indexes.insert(doc_indexes)?;
self.value += 1;
Ok(())
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(drop)
}
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
let map = self.map.into_inner()?;
let indexes = self.indexes.into_inner()?;
Ok((map, indexes))
}
}
pub struct PositiveBlobBuilder<W, X> {
map: W,
indexes: DocIndexesBuilder<X>,
@ -122,7 +200,7 @@ impl<W: Write, X: Write> PositiveBlobBuilder<W, X> {
// of the input is the same as the machine that is reading it.
let map = {
let mut keys_builder = MapBuilder::new(self.map)?;
let mut keys_builder = fst::MapBuilder::new(self.map)?;
let keys = self.indexes.keys().map(|(s, v)| (s, *v));
keys_builder.extend_iter(keys)?;
keys_builder.into_inner()?

5
src/blob/positive/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod blob;
mod ops;
pub use self::blob::{PositiveBlob, RawPositiveBlobBuilder, PositiveBlobBuilder};
pub use self::ops::OpBuilder;

128
src/blob/positive/ops.rs Normal file
View File

@ -0,0 +1,128 @@
use sdset::multi::OpBuilder as SdOpBuilder;
use sdset::{SetOperation, Set};
use crate::blob::PositiveBlob;
use crate::data::DocIndexes;
use crate::DocIndex;
pub struct OpBuilder<'m> {
// the operation on the maps is always an union.
map_op: fst::map::OpBuilder<'m>,
indexes: Vec<&'m DocIndexes>,
}
/// Do a set operation on multiple positive blobs.
impl<'m> OpBuilder<'m> {
pub fn new() -> Self {
Self {
map_op: fst::map::OpBuilder::new(),
indexes: Vec::new(),
}
}
pub fn with_capacity(cap: usize) -> Self {
Self {
map_op: fst::map::OpBuilder::new(), // TODO patch fst to add with_capacity
indexes: Vec::with_capacity(cap),
}
}
pub fn add(mut self, blob: &'m PositiveBlob) -> Self {
self.push(blob);
self
}
pub fn push(&mut self, blob: &'m PositiveBlob) {
self.map_op.push(blob.as_map());
self.indexes.push(blob.as_indexes());
}
pub fn union(self) -> Union<'m> {
Union::new(self.map_op.union(), self.indexes)
}
pub fn intersection(self) -> Intersection<'m> {
Intersection::new(self.map_op.union(), self.indexes)
}
pub fn difference(self) -> Difference<'m> {
Difference::new(self.map_op.union(), self.indexes)
}
pub fn symmetric_difference(self) -> SymmetricDifference<'m> {
SymmetricDifference::new(self.map_op.union(), self.indexes)
}
}
macro_rules! logical_operation {
(struct $name:ident, $operation:ident) => {
pub struct $name<'m> {
stream: fst::map::Union<'m>,
indexes: Vec<&'m DocIndexes>,
outs: Vec<DocIndex>,
}
impl<'m> $name<'m> {
fn new(stream: fst::map::Union<'m>, indexes: Vec<&'m DocIndexes>) -> Self {
$name {
stream: stream,
indexes: indexes,
outs: Vec::new(),
}
}
}
impl<'m, 'a> fst::Streamer<'a> for $name<'m> {
type Item = (&'a [u8], &'a [DocIndex]);
fn next(&'a mut self) -> Option<Self::Item> {
// loop {
// let (input, ivalues) = match self.stream.next() {
// Some(value) => value,
// None => return None,
// };
// self.outs.clear();
// let mut builder = SdOpBuilder::with_capacity(ivalues.len());
// for ivalue in ivalues {
// let indexes = self.indexes[ivalue.index];
// let indexes = indexes.get(ivalue.value).expect("BUG: could not find document indexes");
// let set = Set::new_unchecked(indexes);
// builder.push(set);
// }
// builder.$operation().extend_vec(&mut self.outs);
// if self.outs.is_empty() { continue }
// return Some((input, &self.outs))
// }
// FIXME make the above code compile
match self.stream.next() {
Some((input, ivalues)) => {
self.outs.clear();
let mut builder = SdOpBuilder::with_capacity(ivalues.len());
for ivalue in ivalues {
let indexes = self.indexes[ivalue.index].get(ivalue.value).expect("");
let set = Set::new_unchecked(indexes);
builder.push(set);
}
builder.$operation().extend_vec(&mut self.outs);
if self.outs.is_empty() { return None }
return Some((input, &self.outs))
},
None => None
}
}
}
}}
logical_operation!(struct Union, union);
logical_operation!(struct Intersection, intersection);
logical_operation!(struct Difference, difference);
logical_operation!(struct SymmetricDifference, symmetric_difference);

View File

@ -28,7 +28,7 @@ impl DocIds {
// FIXME check if modulo DocumentId
let len = vec.len();
let data = Data::Shared {
vec: Arc::new(vec),
bytes: Arc::new(vec),
offset: 0,
len: len
};

View File

@ -2,7 +2,6 @@ use std::collections::btree_map::{BTreeMap, Iter, Entry};
use std::slice::from_raw_parts;
use std::io::{self, Write};
use std::path::Path;
use std::ops::Deref;
use std::sync::Arc;
use std::mem;
@ -28,38 +27,28 @@ pub struct DocIndexes {
impl DocIndexes {
pub unsafe fn from_path<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let mmap = MmapReadOnly::open_path(path)?;
let ranges_len_offset = mmap.as_slice().len() - mem::size_of::<u64>();
let ranges_len = (&mmap.as_slice()[ranges_len_offset..]).read_u64::<LittleEndian>()?;
let ranges_len = ranges_len as usize * mem::size_of::<Range>();
let ranges_offset = ranges_len_offset - ranges_len;
let ranges = Data::Mmap(mmap.range(ranges_offset, ranges_len));
let indexes = Data::Mmap(mmap.range(0, ranges_offset));
Ok(DocIndexes { ranges, indexes })
DocIndexes::from_data(Data::Mmap(mmap))
}
pub fn from_bytes(vec: Vec<u8>) -> io::Result<Self> {
let vec = Arc::new(vec);
let len = vec.len();
DocIndexes::from_shared_bytes(Arc::new(vec), 0, len)
}
let ranges_len_offset = vec.len() - mem::size_of::<u64>();
let ranges_len = (&vec[ranges_len_offset..]).read_u64::<LittleEndian>()?;
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> io::Result<Self> {
let data = Data::Shared { bytes, offset, len };
DocIndexes::from_data(data)
}
fn from_data(data: Data) -> io::Result<Self> {
let ranges_len_offset = data.len() - mem::size_of::<u64>();
let ranges_len = (&data[ranges_len_offset..]).read_u64::<LittleEndian>()?;
let ranges_len = ranges_len as usize * mem::size_of::<Range>();
let ranges_offset = ranges_len_offset - ranges_len;
let ranges = Data::Shared {
vec: vec.clone(),
offset: ranges_offset,
len: ranges_len,
};
let ranges = data.range(ranges_offset, ranges_len);
let indexes = Data::Shared {
vec: vec,
offset: 0,
len: ranges_offset,
};
let indexes = data.range(0, ranges_offset);
Ok(DocIndexes { ranges, indexes })
}

View File

@ -12,17 +12,33 @@ pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder, RawDocIndexesBuilder}
#[derive(Clone)]
enum Data {
Shared {
vec: Arc<Vec<u8>>,
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
},
Mmap(MmapReadOnly),
}
impl Data {
pub fn range(&self, off: usize, l: usize) -> Data {
match self {
Data::Shared { bytes, offset, len } => {
assert!(off + l <= *len);
Data::Shared {
bytes: bytes.clone(),
offset: offset + off,
len: l,
}
},
Data::Mmap(mmap) => Data::Mmap(mmap.range(off, l)),
}
}
}
impl Default for Data {
fn default() -> Data {
Data::Shared {
vec: Arc::default(),
bytes: Arc::default(),
offset: 0,
len: 0,
}
@ -40,8 +56,8 @@ impl Deref for Data {
impl AsRef<[u8]> for Data {
fn as_ref(&self) -> &[u8] {
match self {
Data::Shared { vec, offset, len } => {
&vec[*offset..offset + len]
Data::Shared { bytes, offset, len } => {
&bytes[*offset..offset + len]
},
Data::Mmap(m) => m.as_slice(),
}

View File

@ -1,64 +0,0 @@
use std::io::Write;
use byteorder::{NetworkEndian, WriteBytesExt};
use crate::index::schema::SchemaAttr;
use crate::DocumentId;
pub struct Identifier {
inner: Vec<u8>,
}
impl Identifier {
pub fn data() -> Data {
let mut inner = Vec::new();
let _ = inner.write(b"data");
Data { inner }
}
pub fn document(id: DocumentId) -> Document {
let mut inner = Vec::new();
let _ = inner.write(b"docu");
let _ = inner.write(b"-");
let _ = inner.write_u64::<NetworkEndian>(id);
Document { inner }
}
}
pub struct Data {
inner: Vec<u8>,
}
impl Data {
pub fn index(mut self) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write(b"index");
self
}
pub fn schema(mut self) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write(b"schema");
self
}
pub fn build(self) -> Vec<u8> {
self.inner
}
}
pub struct Document {
inner: Vec<u8>,
}
impl Document {
pub fn attribute(mut self, attr: SchemaAttr) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write_u32::<NetworkEndian>(attr.as_u32());
self
}
pub fn build(self) -> Vec<u8> {
self.inner
}
}

View File

@ -1,4 +1,3 @@
pub mod identifier;
pub mod schema;
pub mod update;
@ -15,13 +14,10 @@ use ::rocksdb::{rocksdb, rocksdb_options};
use ::rocksdb::merge_operator::MergeOperands;
use crate::DocIndex;
use crate::automaton;
use crate::rank::Document;
use crate::index::schema::Schema;
use crate::index::update::Update;
use crate::tokenizer::TokenizerBuilder;
use crate::index::identifier::Identifier;
use crate::rank::{criterion, Config, RankedStream};
use crate::rank::QueryBuilder;
use crate::data::{DocIds, DocIndexes, RawDocIndexesBuilder};
use crate::blob::{PositiveBlob, NegativeBlob, Blob};
@ -188,8 +184,7 @@ impl Index {
let mut schema_bytes = Vec::new();
schema.write_to(&mut schema_bytes)?;
let data_key = Identifier::data().schema().build();
database.put(&data_key, &schema_bytes)?;
database.put(b"data-schema", &schema_bytes)?;
Ok(Self { database })
}
@ -205,8 +200,7 @@ impl Index {
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
let data_key = Identifier::data().schema().build();
let _schema = match database.get(&data_key)? {
let _schema = match database.get(b"data-schema")? {
Some(value) => Schema::read_from(&*value)?,
None => return Err(String::from("Database does not contain a schema").into()),
};
@ -228,8 +222,7 @@ impl Index {
}
pub fn schema(&self) -> Result<Schema, Box<Error>> {
let data_key = Identifier::data().schema().build();
let bytes = self.database.get(&data_key)?.expect("data-schema entry not found");
let bytes = self.database.get(b"data-schema")?.expect("data-schema entry not found");
Ok(Schema::read_from(&*bytes).expect("Invalid schema"))
}
@ -237,26 +230,10 @@ impl Index {
// this snapshot will allow consistent reads for the whole search operation
let snapshot = self.database.snapshot();
let index_key = Identifier::data().index().build();
let blob = match snapshot.get(&index_key)? {
Some(value) => bincode::deserialize(&value)?,
None => PositiveBlob::default(),
};
let builder = QueryBuilder::new(snapshot)?;
let documents = builder.query(query, 0..20);
let mut automatons = Vec::new();
for query in query.split_whitespace().map(str::to_lowercase) {
let lev = automaton::build_prefix_dfa(&query);
automatons.push(lev);
}
let config = Config {
blob: blob,
automatons: automatons,
criteria: criterion::default(),
distinct: ((), 1),
};
Ok(RankedStream::new(config).retrieve_documents(0..20))
Ok(documents)
}
}

View File

@ -1,13 +1,11 @@
use std::path::PathBuf;
use std::error::Error;
use crate::blob::{BlobName, Sign};
mod negative_update;
mod positive_update;
pub use self::negative_update::{NegativeUpdateBuilder};
pub use self::positive_update::{PositiveUpdateBuilder, NewState};
pub use self::negative_update::NegativeUpdateBuilder;
pub struct Update {
path: PathBuf,
@ -21,14 +19,4 @@ impl Update {
pub fn into_path_buf(self) -> PathBuf {
self.path
}
pub fn info(&self) -> UpdateInfo {
unimplemented!()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct UpdateInfo {
pub sign: Sign,
pub id: BlobName,
}

View File

@ -3,10 +3,8 @@ use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::blob::BlobInfo;
use crate::index::update::Update;
use crate::index::identifier::Identifier;
use crate::data::{DocIds, DocIdsBuilder};
use crate::data::DocIdsBuilder;
use crate::DocumentId;
pub struct NegativeUpdateBuilder {
@ -27,34 +25,34 @@ impl NegativeUpdateBuilder {
}
pub fn build(self) -> Result<Update, Box<Error>> {
let blob_info = BlobInfo::new_negative();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
// write the doc ids
let blob_key = Identifier::blob(blob_info.name).document_ids().build();
let blob_doc_ids = self.doc_ids.into_inner()?;
file_writer.put(&blob_key, &blob_doc_ids)?;
// // write the doc ids
// let blob_key = Identifier::blob(blob_info.name).document_ids().build();
// let blob_doc_ids = self.doc_ids.into_inner()?;
// file_writer.put(&blob_key, &blob_doc_ids)?;
{
// write the blob name to be merged
let mut buffer = Vec::new();
blob_info.write_into(&mut buffer);
let data_key = Identifier::data().blobs_order().build();
file_writer.merge(&data_key, &buffer)?;
}
// {
// // write the blob name to be merged
// let mut buffer = Vec::new();
// blob_info.write_into(&mut buffer);
// let data_key = Identifier::data().blobs_order().build();
// file_writer.merge(&data_key, &buffer)?;
// }
let blob_doc_ids = DocIds::from_bytes(blob_doc_ids)?;
for id in blob_doc_ids.doc_ids().iter().cloned() {
let start = Identifier::document(id).build();
let end = Identifier::document(id + 1).build();
file_writer.delete_range(&start, &end)?;
}
// let blob_doc_ids = DocIds::from_bytes(blob_doc_ids)?;
// for id in blob_doc_ids.doc_ids().iter().cloned() {
// let start = Identifier::document(id).build();
// let end = Identifier::document(id + 1).build();
// file_writer.delete_range(&start, &end)?;
// }
file_writer.finish()?;
Update::open(self.path)
// file_writer.finish()?;
// Update::open(self.path)
unimplemented!()
}
}

View File

@ -5,11 +5,9 @@ use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::index::update::Update;
use crate::index::identifier::Identifier;
use crate::index::schema::{SchemaProps, Schema, SchemaAttr};
use crate::tokenizer::TokenizerBuilder;
use crate::blob::PositiveBlobBuilder;
use crate::{DocIndex, DocumentId};
use crate::DocumentId;
pub enum NewState {
Updated {
@ -51,69 +49,69 @@ impl<B> PositiveUpdateBuilder<B>
where B: TokenizerBuilder
{
pub fn build(self) -> Result<Update, Box<Error>> {
let blob_info = BlobInfo::new_positive();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
for ((document_id, field), state) in &self.new_states {
let value = match state {
NewState::Updated { value, props } if props.is_indexed() => value,
_ => continue,
};
// let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
// for ((document_id, field), state) in &self.new_states {
// let value = match state {
// NewState::Updated { value, props } if props.is_indexed() => value,
// _ => continue,
// };
for (index, word) in self.tokenizer_builder.build(value) {
let doc_index = DocIndex {
document_id: *document_id,
attribute: field.as_u32() as u8,
attribute_index: index as u32,
};
// insert the exact representation
let word_lower = word.to_lowercase();
// for (index, word) in self.tokenizer_builder.build(value) {
// let doc_index = DocIndex {
// document_id: *document_id,
// attribute: field.as_u32() as u8,
// attribute_index: index as u32,
// };
// // insert the exact representation
// let word_lower = word.to_lowercase();
// and the unidecoded lowercased version
let word_unidecoded = unidecode::unidecode(word).to_lowercase();
if word_lower != word_unidecoded {
builder.insert(word_unidecoded, doc_index);
}
// // and the unidecoded lowercased version
// let word_unidecoded = unidecode::unidecode(word).to_lowercase();
// if word_lower != word_unidecoded {
// builder.insert(word_unidecoded, doc_index);
// }
builder.insert(word_lower, doc_index);
}
}
let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
// write the doc-idx
let blob_key = Identifier::blob(blob_info.name).document_indexes().build();
file_writer.put(&blob_key, &blob_doc_idx)?;
// write the fst
let blob_key = Identifier::blob(blob_info.name).fst_map().build();
file_writer.put(&blob_key, &blob_fst_map)?;
{
// write the blob name to be merged
let mut buffer = Vec::new();
blob_info.write_into(&mut buffer);
let data_key = Identifier::data().blobs_order().build();
file_writer.merge(&data_key, &buffer)?;
}
// write all the documents fields updates
for ((id, attr), state) in self.new_states {
let key = Identifier::document(id).attribute(attr).build();
match state {
NewState::Updated { value, props } => if props.is_stored() {
file_writer.put(&key, value.as_bytes())?
},
NewState::Removed => file_writer.delete(&key)?,
}
}
file_writer.finish()?;
Update::open(self.path)
// builder.insert(word_lower, doc_index);
// }
// }
// let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
// // write the doc-idx
// let blob_key = Identifier::blob(blob_info.name).document_indexes().build();
// file_writer.put(&blob_key, &blob_doc_idx)?;
// // write the fst
// let blob_key = Identifier::blob(blob_info.name).fst_map().build();
// file_writer.put(&blob_key, &blob_fst_map)?;
// {
// // write the blob name to be merged
// let mut buffer = Vec::new();
// blob_info.write_into(&mut buffer);
// let data_key = Identifier::data().blobs_order().build();
// file_writer.merge(&data_key, &buffer)?;
// }
// // write all the documents fields updates
// for ((id, attr), state) in self.new_states {
// let key = Identifier::document(id).attribute(attr).build();
// match state {
// NewState::Updated { value, props } => if props.is_stored() {
// file_writer.put(&key, value.as_bytes())?
// },
// NewState::Removed => file_writer.delete(&key)?,
// }
// }
// file_writer.finish()?;
// Update::open(self.path)
unimplemented!()
}
}

View File

@ -6,7 +6,7 @@
pub mod automaton;
pub mod blob;
pub mod data;
pub mod database;
pub mod retrieve;
pub mod index;
pub mod rank;
pub mod tokenizer;

65
src/rank/distinct_map.rs Normal file
View File

@ -0,0 +1,65 @@
use std::collections::HashMap;
use std::hash::Hash;
pub struct DistinctMap<K> {
inner: HashMap<K, usize>,
limit: usize,
len: usize,
}
impl<K: Hash + Eq> DistinctMap<K> {
pub fn new(limit: usize) -> Self {
DistinctMap {
inner: HashMap::new(),
limit: limit,
len: 0,
}
}
pub fn digest(&mut self, key: K) -> bool {
let seen = self.inner.entry(key).or_insert(0);
if *seen < self.limit {
*seen += 1;
self.len += 1;
true
} else {
false
}
}
pub fn accept_without_key(&mut self) -> bool {
self.len += 1;
true
}
pub fn len(&self) -> usize {
self.len
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn easy_distinct_map() {
let mut map = DistinctMap::new(2);
for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] {
map.digest(x);
}
assert_eq!(map.len(), 8);
let mut map = DistinctMap::new(2);
assert_eq!(map.digest(1), true);
assert_eq!(map.digest(1), true);
assert_eq!(map.digest(1), false);
assert_eq!(map.digest(1), false);
assert_eq!(map.digest(2), true);
assert_eq!(map.digest(3), true);
assert_eq!(map.digest(2), true);
assert_eq!(map.digest(2), false);
assert_eq!(map.len(), 5);
}
}

View File

@ -1,9 +1,10 @@
pub mod criterion;
mod ranked_stream;
mod distinct_map;
use crate::{Match, DocumentId};
pub use self::ranked_stream::{Config, RankedStream};
pub use self::ranked_stream::{QueryBuilder, DistinctQueryBuilder};
#[inline]
fn match_query_index(a: &Match, b: &Match) -> bool {

View File

@ -1,10 +1,7 @@
use std::ops::{Deref, Range, RangeBounds};
use std::collections::HashMap;
use std::ops::{Deref, Range};
use std::{mem, vec, str};
use std::ops::Bound::*;
use std::error::Error;
use std::hash::Hash;
use std::rc::Rc;
use fnv::FnvHashMap;
use fst::Streamer;
@ -13,12 +10,11 @@ use ::rocksdb::rocksdb::{DB, Snapshot};
use crate::automaton::{self, DfaExt, AutomatonExt};
use crate::rank::criterion::{self, Criterion};
use crate::blob::{PositiveBlob, Merge};
use crate::blob::ops::Union;
use crate::rank::distinct_map::DistinctMap;
use crate::blob::PositiveBlob;
use crate::{Match, DocumentId};
use crate::database::Retrieve;
use crate::retrieve::Retrieve;
use crate::rank::Document;
use crate::index::Index;
fn clamp_range<T: Copy + Ord>(range: Range<T>, big: Range<T>) -> Range<T> {
Range {
@ -63,9 +59,7 @@ where T: Deref<Target=DB>,
pub fn with_distinct<F>(self, function: F, size: usize) -> DistinctQueryBuilder<T, F, C> {
DistinctQueryBuilder {
snapshot: self.snapshot,
blob: self.blob,
criteria: self.criteria,
inner: self,
function: function,
size: size
}
@ -73,16 +67,28 @@ where T: Deref<Target=DB>,
fn query_all(&self, query: &str) -> Vec<Document> {
let automatons = split_whitespace_automatons(query);
let mut stream: Union = unimplemented!();
let mut stream = {
let mut op_builder = fst::map::OpBuilder::new();
for automaton in &automatons {
let stream = self.blob.as_map().search(automaton);
op_builder.push(stream);
}
op_builder.union()
};
let mut matches = FnvHashMap::default();
while let Some((string, indexed_values)) = stream.next() {
while let Some((input, indexed_values)) = stream.next() {
for iv in indexed_values {
let automaton = &automatons[iv.index];
let distance = automaton.eval(string).to_u8();
let is_exact = distance == 0 && string.len() == automaton.query_len();
let distance = automaton.eval(input).to_u8();
let is_exact = distance == 0 && input.len() == automaton.query_len();
for doc_index in iv.doc_indexes.as_slice() {
let doc_indexes = self.blob.as_indexes();
let doc_indexes = doc_indexes.get(iv.value).expect("BUG: could not find document indexes");
for doc_index in doc_indexes {
let match_ = Match {
query_index: iv.index as u32,
distance: distance,
@ -103,11 +109,11 @@ impl<T, C> QueryBuilder<T, C>
where T: Deref<Target=DB>,
C: Criterion,
{
pub fn query(&self, query: &str, range: impl RangeBounds<usize>) -> Vec<Document> {
pub fn query(&self, query: &str, range: Range<usize>) -> Vec<Document> {
let mut documents = self.query_all(query);
let mut groups = vec![documents.as_mut_slice()];
for criterion in self.criteria {
for criterion in &self.criteria {
let tmp_groups = mem::replace(&mut groups, Vec::new());
for group in tmp_groups {
@ -118,127 +124,58 @@ where T: Deref<Target=DB>,
}
}
// let range = clamp_range(range, 0..documents.len());
let range: Range<usize> = unimplemented!();
let range = clamp_range(range, 0..documents.len());
documents[range].to_vec()
}
}
pub struct DistinctQueryBuilder<T: Deref<Target=DB>, F, C> {
snapshot: Snapshot<T>,
blob: PositiveBlob,
criteria: Vec<C>,
inner: QueryBuilder<T, C>,
function: F,
size: usize,
}
// pub struct Schema;
// pub struct DocDatabase;
// where F: Fn(&Schema, &DocDatabase) -> Option<K>,
// K: Hash + Eq,
pub struct DocDatabase;
impl<T: Deref<Target=DB>, F, C> DistinctQueryBuilder<T, F, C>
impl<T: Deref<Target=DB>, F, K, C> DistinctQueryBuilder<T, F, C>
where T: Deref<Target=DB>,
F: Fn(DocumentId, &DocDatabase) -> Option<K>,
K: Hash + Eq,
C: Criterion,
{
pub fn query(&self, query: &str, range: impl RangeBounds<usize>) -> Vec<Document> {
// let mut documents = self.retrieve_all_documents();
// let mut groups = vec![documents.as_mut_slice()];
pub fn query(&self, query: &str, range: Range<usize>) -> Vec<Document> {
let mut documents = self.inner.query_all(query);
let mut groups = vec![documents.as_mut_slice()];
// for criterion in self.criteria {
// let tmp_groups = mem::replace(&mut groups, Vec::new());
for criterion in &self.inner.criteria {
let tmp_groups = mem::replace(&mut groups, Vec::new());
// for group in tmp_groups {
// group.sort_unstable_by(|a, b| criterion.evaluate(a, b));
// for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) {
// groups.push(group);
// }
// }
// }
// let mut out_documents = Vec::with_capacity(range.len());
// let (distinct, limit) = self.distinct;
// let mut seen = DistinctMap::new(limit);
// for document in documents {
// let accepted = match distinct(&document.id) {
// Some(key) => seen.digest(key),
// None => seen.accept_without_key(),
// };
// if accepted {
// if seen.len() == range.end { break }
// if seen.len() >= range.start {
// out_documents.push(document);
// }
// }
// }
// out_documents
unimplemented!()
for group in tmp_groups {
group.sort_unstable_by(|a, b| criterion.evaluate(a, b));
for group in GroupByMut::new(group, |a, b| criterion.eq(a, b)) {
groups.push(group);
}
}
}
pub struct DistinctMap<K> {
inner: HashMap<K, usize>,
limit: usize,
len: usize,
}
let doc_database = DocDatabase;
let mut out_documents = Vec::with_capacity(range.len());
let mut seen = DistinctMap::new(self.size);
impl<K: Hash + Eq> DistinctMap<K> {
pub fn new(limit: usize) -> Self {
DistinctMap {
inner: HashMap::new(),
limit: limit,
len: 0,
for document in documents {
let accepted = match (self.function)(document.id, &doc_database) {
Some(key) => seen.digest(key),
None => seen.accept_without_key(),
};
if accepted {
if seen.len() == range.end { break }
if seen.len() >= range.start {
out_documents.push(document);
}
}
}
pub fn digest(&mut self, key: K) -> bool {
let seen = self.inner.entry(key).or_insert(0);
if *seen < self.limit {
*seen += 1;
self.len += 1;
true
} else {
false
}
}
pub fn accept_without_key(&mut self) -> bool {
self.len += 1;
true
}
pub fn len(&self) -> usize {
self.len
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn easy_distinct_map() {
let mut map = DistinctMap::new(2);
for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] {
map.digest(x);
}
assert_eq!(map.len(), 8);
let mut map = DistinctMap::new(2);
assert_eq!(map.digest(1), true);
assert_eq!(map.digest(1), true);
assert_eq!(map.digest(1), false);
assert_eq!(map.digest(1), false);
assert_eq!(map.digest(2), true);
assert_eq!(map.digest(3), true);
assert_eq!(map.digest(2), true);
assert_eq!(map.digest(2), false);
assert_eq!(map.len(), 5);
out_documents
}
}

View File

@ -1,16 +1,32 @@
use std::error::Error;
use std::ops::Deref;
use ::rocksdb::rocksdb::{DB, Snapshot};
use ::rocksdb::rocksdb::{DB, Snapshot, DBVector};
use crate::index::schema::Schema;
use crate::index::schema::{Schema, SchemaAttr};
use crate::blob::PositiveBlob;
use crate::DocumentId;
pub struct DocDatabase<'a, R: ?Sized> {
retrieve: &'a R,
schema: Schema,
}
impl<'a, R> DocDatabase<'a, R> {
pub fn get_document<D>(&self, id: DocumentId) -> Result<Option<D>, Box<Error>> {
// if ids.is_empty() { return Ok(Vec::new()) }
unimplemented!()
}
pub fn get_document_attribute(&self, id: DocumentId, attr: SchemaAttr) -> Result<DBVector, Box<Error>> {
unimplemented!()
}
}
pub trait Retrieve {
fn schema(&self) -> Result<Option<Schema>, Box<Error>>;
fn data_index(&self) -> Result<PositiveBlob, Box<Error>>;
fn get_documents<D>(&self, ids: &[DocumentId]) -> Result<Vec<D>, Box<Error>>;
fn doc_database(&self) -> Result<DocDatabase<Self>, Box<Error>>;
}
impl<T> Retrieve for Snapshot<T>
@ -30,13 +46,15 @@ where T: Deref<Target=DB>,
}
}
fn get_documents<D>(&self, ids: &[DocumentId]) -> Result<Vec<D>, Box<Error>> {
if ids.is_empty() { return Ok(Vec::new()) }
fn doc_database(&self) -> Result<DocDatabase<Self>, Box<Error>> {
let schema = match self.schema()? {
Some(schema) => schema,
None => return Err(String::from("BUG: could not find schema").into()),
};
unimplemented!()
Ok(DocDatabase {
retrieve: self,
schema: schema,
})
}
}