feat: Use the blob::OpBuilder to merge "data-index" blobs

This commit is contained in:
Clément Renault 2018-11-29 14:54:17 +01:00
parent 612a8d9d44
commit af791db23d
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
2 changed files with 17 additions and 141 deletions

View File

@ -16,14 +16,14 @@ fn blob_same_sign(a: &Blob, b: &Blob) -> bool {
fn unwrap_positive(blob: &Blob) -> &PositiveBlob { fn unwrap_positive(blob: &Blob) -> &PositiveBlob {
match blob { match blob {
Blob::Positive(blob) => blob, Blob::Positive(blob) => blob,
Blob::Negative(_) => panic!("called `Blob::unwrap_positive()` on a `Negative` value"), Blob::Negative(_) => panic!("called `unwrap_positive()` on a `Negative` value"),
} }
} }
fn unwrap_negative(blob: &Blob) -> &NegativeBlob { fn unwrap_negative(blob: &Blob) -> &NegativeBlob {
match blob { match blob {
Blob::Negative(blob) => blob, Blob::Negative(blob) => blob,
Blob::Positive(_) => panic!("called `Blob::unwrap_negative()` on a `Positive` value"), Blob::Positive(_) => panic!("called `unwrap_negative()` on a `Positive` value"),
} }
} }
@ -57,14 +57,12 @@ impl OpBuilder {
let mut stream = op_builder.union().into_stream(); let mut stream = op_builder.union().into_stream();
let mut builder = RawPositiveBlobBuilder::memory(); let mut builder = RawPositiveBlobBuilder::memory();
while let Some((input, doc_indexes)) = stream.next() { while let Some((input, doc_indexes)) = stream.next() {
// FIXME empty doc_indexes must be handled by OpBuilder // FIXME empty doc_indexes must be handled by OpBuilder
if !doc_indexes.is_empty() { if !doc_indexes.is_empty() {
builder.insert(input, doc_indexes).unwrap(); builder.insert(input, doc_indexes).unwrap();
} }
} }
let (map, doc_indexes) = builder.into_inner().unwrap(); let (map, doc_indexes) = builder.into_inner().unwrap();
let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap(); let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap();
Either::Left(blob) Either::Left(blob)
@ -74,7 +72,6 @@ impl OpBuilder {
for blob in blobs { for blob in blobs {
op_builder.push(unwrap_negative(blob)); op_builder.push(unwrap_negative(blob));
} }
let blob = op_builder.union().into_negative_blob(); let blob = op_builder.union().into_negative_blob();
Either::Right(blob) Either::Right(blob)
}, },
@ -83,14 +80,12 @@ impl OpBuilder {
let mut zipped = positives.into_iter().zip(negatives); let mut zipped = positives.into_iter().zip(negatives);
let mut buffer = Vec::new(); let mut buffer = Vec::new();
zipped.try_fold(PositiveBlob::default(), |base, (positive, negative)| { zipped.try_fold(PositiveBlob::default(), |base, (positive, negative)| {
let mut builder = RawPositiveBlobBuilder::memory(); let mut builder = RawPositiveBlobBuilder::memory();
let doc_ids = Set::new_unchecked(negative.as_ref()); let doc_ids = Set::new_unchecked(negative.as_ref());
let op_builder = positive::OpBuilder::new().add(&base).add(&positive); let op_builder = positive::OpBuilder::new().add(&base).add(&positive);
let mut stream = op_builder.union().into_stream(); let mut stream = op_builder.union().into_stream();
while let Some((input, doc_indexes)) = stream.next() { while let Some((input, doc_indexes)) = stream.next() {
let doc_indexes = Set::new_unchecked(doc_indexes); let doc_indexes = Set::new_unchecked(doc_indexes);
let op = DifferenceByKey::new(doc_indexes, doc_ids, |x| x.document_id, |x| *x); let op = DifferenceByKey::new(doc_indexes, doc_ids, |x| x.document_id, |x| *x);

View File

@ -4,156 +4,37 @@ pub mod update;
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
use fst::map::{Map, MapBuilder, OpBuilder};
use fst::{IntoStreamer, Streamer};
use sdset::duo::Union as SdUnion;
use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation};
use ::rocksdb::rocksdb::Writable; use ::rocksdb::rocksdb::Writable;
use ::rocksdb::{rocksdb, rocksdb_options}; use ::rocksdb::{rocksdb, rocksdb_options};
use ::rocksdb::merge_operator::MergeOperands; use ::rocksdb::merge_operator::MergeOperands;
use crate::DocIndex;
use crate::rank::Document; use crate::rank::Document;
use crate::index::schema::Schema; use crate::index::schema::Schema;
use crate::index::update::Update; use crate::index::update::Update;
use crate::rank::QueryBuilder; use crate::rank::QueryBuilder;
use crate::data::{DocIds, DocIndexes, RawDocIndexesBuilder}; use crate::blob::{self, Blob};
use crate::blob::{PositiveBlob, NegativeBlob, Blob};
fn union_positives(a: &PositiveBlob, b: &PositiveBlob) -> Result<PositiveBlob, Box<Error>> {
let (a_map, a_indexes) = (a.as_map(), a.as_indexes());
let (b_map, b_indexes) = (b.as_map(), b.as_indexes());
let mut map_builder = MapBuilder::memory();
let mut indexes_builder = RawDocIndexesBuilder::memory();
let op_builder = OpBuilder::new().add(a_map).add(b_map);
let mut stream = op_builder.union();
let mut i = 0;
while let Some((key, indexed)) = stream.next() {
let doc_idx: Vec<DocIndex> = match indexed {
[a, b] => {
let a_doc_idx = a_indexes.get(a.value).expect("BUG: could not find document indexes");
let b_doc_idx = b_indexes.get(b.value).expect("BUG: could not find document indexes");
let a_doc_idx = Set::new_unchecked(a_doc_idx);
let b_doc_idx = Set::new_unchecked(b_doc_idx);
let sd_union = SdUnion::new(a_doc_idx, b_doc_idx);
sd_union.into_set_buf().into_vec()
},
[a] => {
let indexes = if a.index == 0 { a_indexes } else { b_indexes };
let doc_idx = indexes.get(a.value).expect("BUG: could not find document indexes");
doc_idx.to_vec()
},
_ => unreachable!(),
};
if !doc_idx.is_empty() {
map_builder.insert(key, i)?;
indexes_builder.insert(&doc_idx)?;
i += 1;
}
}
let inner = map_builder.into_inner()?;
let map = Map::from_bytes(inner)?;
let inner = indexes_builder.into_inner()?;
let indexes = DocIndexes::from_bytes(inner)?;
Ok(PositiveBlob::from_raw(map, indexes))
}
fn union_negatives(a: &NegativeBlob, b: &NegativeBlob) -> NegativeBlob {
let a_doc_ids = a.as_ids().doc_ids();
let b_doc_ids = b.as_ids().doc_ids();
let a_doc_ids = Set::new_unchecked(a_doc_ids);
let b_doc_ids = Set::new_unchecked(b_doc_ids);
let sd_union = SdUnion::new(a_doc_ids, b_doc_ids);
let doc_ids = sd_union.into_set_buf().into_vec();
let doc_ids = DocIds::from_document_ids(doc_ids);
NegativeBlob::from_raw(doc_ids)
}
fn merge_positive_negative(pos: &PositiveBlob, neg: &NegativeBlob) -> Result<PositiveBlob, Box<Error>> {
let (map, indexes) = (pos.as_map(), pos.as_indexes());
let doc_ids = neg.as_ids().doc_ids();
let doc_ids = Set::new_unchecked(doc_ids);
let mut map_builder = MapBuilder::memory();
let mut indexes_builder = RawDocIndexesBuilder::memory();
let mut stream = map.into_stream();
let mut i = 0;
while let Some((key, index)) = stream.next() {
let doc_idx = indexes.get(index).expect("BUG: could not find document indexes");
let doc_idx = Set::new_unchecked(doc_idx);
let diff = DifferenceByKey::new(doc_idx, doc_ids, |&d| d.document_id, |id| *id);
let doc_idx: Vec<DocIndex> = diff.into_set_buf().into_vec();
map_builder.insert(key, i)?;
indexes_builder.insert(&doc_idx)?;
i += 1;
}
let inner = map_builder.into_inner()?;
let map = Map::from_bytes(inner)?;
let inner = indexes_builder.into_inner()?;
let indexes = DocIndexes::from_bytes(inner)?;
Ok(PositiveBlob::from_raw(map, indexes))
}
#[derive(Default)]
struct Merge {
blob: PositiveBlob,
}
impl Merge {
fn new(blob: PositiveBlob) -> Merge {
Merge { blob }
}
fn merge(&mut self, blob: Blob) {
self.blob = match blob {
Blob::Positive(blob) => union_positives(&self.blob, &blob).unwrap(),
Blob::Negative(blob) => merge_positive_negative(&self.blob, &blob).unwrap(),
};
}
fn build(self) -> PositiveBlob {
self.blob
}
}
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> { fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
if key != b"data-index" { panic!("The merge operator only supports \"data-index\" merging") } if key != b"data-index" { panic!("The merge operator only supports \"data-index\" merging") }
let mut merge = match existing_value { let capacity = {
Some(existing_value) => { let remaining = operands.size_hint().0;
let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index"); let already_exist = usize::from(existing_value.is_some());
Merge::new(blob) remaining + already_exist
},
None => Merge::default(),
}; };
for bytes in operands { let mut op = blob::OpBuilder::with_capacity(capacity);
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blobs"); if let Some(existing_value) = existing_value {
merge.merge(blob); let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index");
op.push(Blob::Positive(blob));
} }
let blob = merge.build(); for bytes in operands {
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blob");
op.push(blob);
}
let blob = op.merge().expect("BUG: could no merge blobs");
bincode::serialize(&blob).expect("BUG: could not serialize merged blob") bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
} }