From af791db23d55c81e7de8f63cc2765e29775e1ee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 29 Nov 2018 14:54:17 +0100 Subject: [PATCH] feat: Use the blob::OpBuilder to merge "data-index" blobs --- src/blob/ops.rs | 9 +-- src/index/mod.rs | 149 +++++------------------------------------------ 2 files changed, 17 insertions(+), 141 deletions(-) diff --git a/src/blob/ops.rs b/src/blob/ops.rs index 4ab0ef564..90f91547f 100644 --- a/src/blob/ops.rs +++ b/src/blob/ops.rs @@ -16,14 +16,14 @@ fn blob_same_sign(a: &Blob, b: &Blob) -> bool { fn unwrap_positive(blob: &Blob) -> &PositiveBlob { match blob { Blob::Positive(blob) => blob, - Blob::Negative(_) => panic!("called `Blob::unwrap_positive()` on a `Negative` value"), + Blob::Negative(_) => panic!("called `unwrap_positive()` on a `Negative` value"), } } fn unwrap_negative(blob: &Blob) -> &NegativeBlob { match blob { Blob::Negative(blob) => blob, - Blob::Positive(_) => panic!("called `Blob::unwrap_negative()` on a `Positive` value"), + Blob::Positive(_) => panic!("called `unwrap_negative()` on a `Positive` value"), } } @@ -57,14 +57,12 @@ impl OpBuilder { let mut stream = op_builder.union().into_stream(); let mut builder = RawPositiveBlobBuilder::memory(); - while let Some((input, doc_indexes)) = stream.next() { // FIXME empty doc_indexes must be handled by OpBuilder if !doc_indexes.is_empty() { builder.insert(input, doc_indexes).unwrap(); } } - let (map, doc_indexes) = builder.into_inner().unwrap(); let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap(); Either::Left(blob) @@ -74,7 +72,6 @@ impl OpBuilder { for blob in blobs { op_builder.push(unwrap_negative(blob)); } - let blob = op_builder.union().into_negative_blob(); Either::Right(blob) }, @@ -83,14 +80,12 @@ impl OpBuilder { let mut zipped = positives.into_iter().zip(negatives); let mut buffer = Vec::new(); - zipped.try_fold(PositiveBlob::default(), |base, (positive, negative)| { let mut builder = RawPositiveBlobBuilder::memory(); let doc_ids = Set::new_unchecked(negative.as_ref()); let op_builder = positive::OpBuilder::new().add(&base).add(&positive); let mut stream = op_builder.union().into_stream(); - while let Some((input, doc_indexes)) = stream.next() { let doc_indexes = Set::new_unchecked(doc_indexes); let op = DifferenceByKey::new(doc_indexes, doc_ids, |x| x.document_id, |x| *x); diff --git a/src/index/mod.rs b/src/index/mod.rs index 5fa036900..3a96d83e0 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -4,156 +4,37 @@ pub mod update; use std::error::Error; use std::path::Path; -use fst::map::{Map, MapBuilder, OpBuilder}; -use fst::{IntoStreamer, Streamer}; -use sdset::duo::Union as SdUnion; -use sdset::duo::DifferenceByKey; -use sdset::{Set, SetOperation}; use ::rocksdb::rocksdb::Writable; use ::rocksdb::{rocksdb, rocksdb_options}; use ::rocksdb::merge_operator::MergeOperands; -use crate::DocIndex; use crate::rank::Document; use crate::index::schema::Schema; use crate::index::update::Update; use crate::rank::QueryBuilder; -use crate::data::{DocIds, DocIndexes, RawDocIndexesBuilder}; -use crate::blob::{PositiveBlob, NegativeBlob, Blob}; - -fn union_positives(a: &PositiveBlob, b: &PositiveBlob) -> Result> { - let (a_map, a_indexes) = (a.as_map(), a.as_indexes()); - let (b_map, b_indexes) = (b.as_map(), b.as_indexes()); - - let mut map_builder = MapBuilder::memory(); - let mut indexes_builder = RawDocIndexesBuilder::memory(); - - let op_builder = OpBuilder::new().add(a_map).add(b_map); - let mut stream = op_builder.union(); - let mut i = 0; - - while let Some((key, indexed)) = stream.next() { - let doc_idx: Vec = match indexed { - [a, b] => { - let a_doc_idx = a_indexes.get(a.value).expect("BUG: could not find document indexes"); - let b_doc_idx = b_indexes.get(b.value).expect("BUG: could not find document indexes"); - - let a_doc_idx = Set::new_unchecked(a_doc_idx); - let b_doc_idx = Set::new_unchecked(b_doc_idx); - - let sd_union = SdUnion::new(a_doc_idx, b_doc_idx); - sd_union.into_set_buf().into_vec() - }, - [a] => { - let indexes = if a.index == 0 { a_indexes } else { b_indexes }; - let doc_idx = indexes.get(a.value).expect("BUG: could not find document indexes"); - doc_idx.to_vec() - }, - _ => unreachable!(), - }; - - if !doc_idx.is_empty() { - map_builder.insert(key, i)?; - indexes_builder.insert(&doc_idx)?; - i += 1; - } - } - - let inner = map_builder.into_inner()?; - let map = Map::from_bytes(inner)?; - - let inner = indexes_builder.into_inner()?; - let indexes = DocIndexes::from_bytes(inner)?; - - Ok(PositiveBlob::from_raw(map, indexes)) -} - -fn union_negatives(a: &NegativeBlob, b: &NegativeBlob) -> NegativeBlob { - let a_doc_ids = a.as_ids().doc_ids(); - let b_doc_ids = b.as_ids().doc_ids(); - - let a_doc_ids = Set::new_unchecked(a_doc_ids); - let b_doc_ids = Set::new_unchecked(b_doc_ids); - - let sd_union = SdUnion::new(a_doc_ids, b_doc_ids); - let doc_ids = sd_union.into_set_buf().into_vec(); - let doc_ids = DocIds::from_document_ids(doc_ids); - - NegativeBlob::from_raw(doc_ids) -} - -fn merge_positive_negative(pos: &PositiveBlob, neg: &NegativeBlob) -> Result> { - let (map, indexes) = (pos.as_map(), pos.as_indexes()); - let doc_ids = neg.as_ids().doc_ids(); - - let doc_ids = Set::new_unchecked(doc_ids); - - let mut map_builder = MapBuilder::memory(); - let mut indexes_builder = RawDocIndexesBuilder::memory(); - - let mut stream = map.into_stream(); - let mut i = 0; - - while let Some((key, index)) = stream.next() { - let doc_idx = indexes.get(index).expect("BUG: could not find document indexes"); - let doc_idx = Set::new_unchecked(doc_idx); - - let diff = DifferenceByKey::new(doc_idx, doc_ids, |&d| d.document_id, |id| *id); - let doc_idx: Vec = diff.into_set_buf().into_vec(); - - map_builder.insert(key, i)?; - indexes_builder.insert(&doc_idx)?; - i += 1; - } - - let inner = map_builder.into_inner()?; - let map = Map::from_bytes(inner)?; - - let inner = indexes_builder.into_inner()?; - let indexes = DocIndexes::from_bytes(inner)?; - - Ok(PositiveBlob::from_raw(map, indexes)) -} - -#[derive(Default)] -struct Merge { - blob: PositiveBlob, -} - -impl Merge { - fn new(blob: PositiveBlob) -> Merge { - Merge { blob } - } - - fn merge(&mut self, blob: Blob) { - self.blob = match blob { - Blob::Positive(blob) => union_positives(&self.blob, &blob).unwrap(), - Blob::Negative(blob) => merge_positive_negative(&self.blob, &blob).unwrap(), - }; - } - - fn build(self) -> PositiveBlob { - self.blob - } -} +use crate::blob::{self, Blob}; fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { if key != b"data-index" { panic!("The merge operator only supports \"data-index\" merging") } - let mut merge = match existing_value { - Some(existing_value) => { - let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index"); - Merge::new(blob) - }, - None => Merge::default(), + let capacity = { + let remaining = operands.size_hint().0; + let already_exist = usize::from(existing_value.is_some()); + remaining + already_exist }; - for bytes in operands { - let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blobs"); - merge.merge(blob); + let mut op = blob::OpBuilder::with_capacity(capacity); + if let Some(existing_value) = existing_value { + let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index"); + op.push(Blob::Positive(blob)); } - let blob = merge.build(); + for bytes in operands { + let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blob"); + op.push(blob); + } + + let blob = op.merge().expect("BUG: could no merge blobs"); bincode::serialize(&blob).expect("BUG: could not serialize merged blob") }