feat: Introduce the "data-index" entry with merge compaction

This commit is contained in:
Clément Renault 2018-11-26 17:30:19 +01:00
parent 0e856db4e6
commit b636e5fe57
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
12 changed files with 251 additions and 231 deletions

View File

@ -8,10 +8,9 @@ authors = ["Kerollmops <renault.cle@gmail.com>"]
bincode = "1.0"
byteorder = "1.2"
fnv = "1.0"
fs2 = "0.4"
lazy_static = "1.1"
linked-hash-map = { version = "0.5", features = ["serde_impl"] }
sdset = "0.2"
sdset = "0.3"
serde = "1.0"
serde_derive = "1.0"
unidecode = "0.3"

View File

@ -1,17 +1,17 @@
use crate::vec_read_only::VecReadOnly;
use std::collections::BinaryHeap;
use std::{mem, cmp};
use std::rc::Rc;
use std::cmp;
use fst::{Automaton, Streamer};
use fst::automaton::AlwaysMatch;
use sdset::{Set, SetBuf, SetOperation};
use sdset::{Set, SetOperation};
use sdset::duo::OpBuilder as SdOpBuilder;
use group_by::GroupBy;
use crate::blob::{Blob, Sign};
use crate::blob::ops::{OpBuilder, Union, IndexedDocIndexes};
use crate::DocIndex;
use crate::blob::{Blob, Sign};
use crate::vec_read_only::VecReadOnly;
use crate::blob::ops::{OpBuilder, Union, IndexedDocIndexes};
fn group_is_negative(blobs: &&[Blob]) -> bool {
blobs[0].sign() == Sign::Negative

View File

@ -12,7 +12,6 @@ use std::error::Error;
use std::io::{Write, Read};
use std::{io, fmt, mem};
use fst::Map;
use uuid::Uuid;
use rocksdb::rocksdb::{DB, Snapshot};
use serde::ser::{Serialize, Serializer, SerializeTuple};
@ -108,100 +107,3 @@ impl Sign {
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct BlobName(Uuid);
impl BlobName {
pub fn new() -> BlobName {
BlobName(Uuid::new_v4())
}
pub fn as_bytes(&self) -> &[u8; 16] {
self.0.as_bytes()
}
}
impl fmt::Display for BlobName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("BlobName")
.field(&self.0.to_hyphenated().to_string())
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct BlobInfo {
pub sign: Sign,
pub name: BlobName,
}
impl BlobInfo {
pub fn new_positive() -> BlobInfo {
BlobInfo {
sign: Sign::Positive,
name: BlobName::new(),
}
}
pub fn new_negative() -> BlobInfo {
BlobInfo {
sign: Sign::Negative,
name: BlobName::new(),
}
}
pub fn read_from<R: Read>(reader: R) -> bincode::Result<BlobInfo> {
bincode::deserialize_from(reader)
}
pub fn read_from_slice(slice: &[u8]) -> bincode::Result<Vec<BlobInfo>> {
let len = slice.len() / mem::size_of::<BlobInfo>();
let mut blob_infos = Vec::with_capacity(len);
let mut cursor = io::Cursor::new(slice);
while blob_infos.len() != len {
let blob_info = BlobInfo::read_from(&mut cursor)?;
blob_infos.push(blob_info);
}
Ok(blob_infos)
}
pub fn write_into<W: Write>(&self, writer: W) -> bincode::Result<()> {
bincode::serialize_into(writer, self)
}
}
pub fn blobs_from_blob_infos(infos: &[BlobInfo], snapshot: &Snapshot<&DB>) -> Result<Vec<Blob>, Box<Error>> {
let mut blobs = Vec::with_capacity(infos.len());
for info in infos {
let blob = match info.sign {
Sign::Positive => {
let blob_key = Identifier::blob(info.name).fst_map().build();
let map = match snapshot.get(&blob_key)? {
Some(value) => value.to_vec(),
None => return Err(format!("No fst entry found for blob {}", info.name).into()),
};
let blob_key = Identifier::blob(info.name).document_indexes().build();
let doc_idx = match snapshot.get(&blob_key)? {
Some(value) => value.to_vec(),
None => return Err(format!("No doc-idx entry found for blob {}", info.name).into()),
};
PositiveBlob::from_bytes(map, doc_idx).map(Blob::Positive)?
},
Sign::Negative => {
let blob_key = Identifier::blob(info.name).document_ids().build();
let doc_ids = match snapshot.get(&blob_key)? {
Some(value) => value.to_vec(),
None => return Err(format!("No doc-ids entry found for blob {}", info.name).into()),
};
NegativeBlob::from_bytes(doc_ids).map(Blob::Negative)?
},
};
blobs.push(blob);
}
Ok(blobs)
}

View File

@ -1,6 +1,6 @@
use std::io::{Read, Write};
use std::error::Error;
use std::io::Write;
use std::path::Path;
use std::error::Error;
use crate::DocumentId;
use crate::data::{DocIds, DocIdsBuilder};
@ -24,6 +24,10 @@ impl NegativeBlob {
Ok(NegativeBlob { doc_ids })
}
pub fn from_raw(doc_ids: DocIds) -> Self {
NegativeBlob { doc_ids }
}
pub fn as_ids(&self) -> &DocIds {
&self.doc_ids
}

View File

@ -1,7 +1,7 @@
use std::io::{Read, Write};
use std::error::Error;
use std::path::Path;
use std::fmt;
use std::io::Write;
use std::path::Path;
use std::error::Error;
use fst::{Map, MapBuilder};
@ -10,6 +10,7 @@ use crate::data::{DocIndexes, DocIndexesBuilder};
use serde::ser::{Serialize, Serializer, SerializeTuple};
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
#[derive(Default)]
pub struct PositiveBlob {
map: Map,
indexes: DocIndexes,
@ -31,6 +32,10 @@ impl PositiveBlob {
Ok(PositiveBlob { map, indexes })
}
pub fn from_raw(map: Map, indexes: DocIndexes) -> Self {
PositiveBlob { map, indexes }
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<&[DocIndex]> {
self.map.get(key).and_then(|index| self.indexes.get(index))
}
@ -109,7 +114,7 @@ impl<W: Write, X: Write> PositiveBlobBuilder<W, X> {
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(|_| ())
self.into_inner().map(drop)
}
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
@ -130,6 +135,10 @@ impl<W: Write, X: Write> PositiveBlobBuilder<W, X> {
}
impl PositiveBlobBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
PositiveBlobBuilder::new(Vec::new(), Vec::new())
}
pub fn build(self) -> Result<PositiveBlob, Box<Error>> {
self.into_inner().and_then(|(m, i)| PositiveBlob::from_bytes(m, i))
}

View File

@ -35,6 +35,10 @@ impl DocIds {
Ok(DocIds { data })
}
pub fn from_document_ids(vec: Vec<DocumentId>) -> Self {
DocIds::from_bytes(unsafe { mem::transmute(vec) }).unwrap()
}
pub fn contains(&self, doc: DocumentId) -> bool {
// FIXME prefer using the sdset::exponential_search function
self.doc_ids().binary_search(&doc).is_ok()

View File

@ -19,7 +19,7 @@ struct Range {
end: u64,
}
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct DocIndexes {
ranges: Data,
indexes: Data,
@ -29,15 +29,14 @@ impl DocIndexes {
pub unsafe fn from_path<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let mmap = MmapReadOnly::open_path(path)?;
let range_len = mmap.as_slice().read_u64::<LittleEndian>()?;
let range_len = range_len as usize * mem::size_of::<Range>();
let ranges_len_offset = mmap.as_slice().len() - mem::size_of::<u64>();
let ranges_len = (&mmap.as_slice()[ranges_len_offset..]).read_u64::<LittleEndian>()?;
let ranges_len = ranges_len as usize * mem::size_of::<Range>();
let offset = mem::size_of::<u64>() as usize;
let ranges = Data::Mmap(mmap.range(offset, range_len));
let ranges_offset = ranges_len_offset - ranges_len;
let ranges = Data::Mmap(mmap.range(ranges_offset, ranges_len));
let len = mmap.len() - range_len - offset;
let offset = offset + range_len;
let indexes = Data::Mmap(mmap.range(offset, len));
let indexes = Data::Mmap(mmap.range(0, ranges_offset));
Ok(DocIndexes { ranges, indexes })
}
@ -45,19 +44,22 @@ impl DocIndexes {
pub fn from_bytes(vec: Vec<u8>) -> io::Result<Self> {
let vec = Arc::new(vec);
let range_len = vec.as_slice().read_u64::<LittleEndian>()?;
let range_len = range_len as usize * mem::size_of::<Range>();
let ranges_len_offset = vec.len() - mem::size_of::<u64>();
let ranges_len = (&vec[ranges_len_offset..]).read_u64::<LittleEndian>()?;
let ranges_len = ranges_len as usize * mem::size_of::<Range>();
let offset = mem::size_of::<u64>() as usize;
let ranges_offset = ranges_len_offset - ranges_len;
let ranges = Data::Shared {
vec: vec.clone(),
offset,
len: range_len
offset: ranges_offset,
len: ranges_len,
};
let len = vec.len() - range_len - offset;
let offset = offset + range_len;
let indexes = Data::Shared { vec, offset, len };
let indexes = Data::Shared {
vec: vec,
offset: 0,
len: ranges_offset,
};
Ok(DocIndexes { ranges, indexes })
}
@ -94,6 +96,53 @@ impl Serialize for DocIndexes {
}
}
pub struct RawDocIndexesBuilder<W> {
ranges: Vec<Range>,
wtr: W,
}
impl RawDocIndexesBuilder<Vec<u8>> {
pub fn memory() -> Self {
RawDocIndexesBuilder::new(Vec::new())
}
}
impl<W: Write> RawDocIndexesBuilder<W> {
pub fn new(wtr: W) -> Self {
RawDocIndexesBuilder {
ranges: Vec::new(),
wtr: wtr,
}
}
pub fn insert(&mut self, indexes: &[DocIndex]) -> io::Result<()> {
let len = indexes.len() as u64;
let start = self.ranges.last().map(|r| r.start).unwrap_or(0);
let range = Range { start, end: start + len };
self.ranges.push(range);
// write the values
let indexes = unsafe { into_u8_slice(indexes) };
self.wtr.write_all(indexes)
}
pub fn finish(self) -> io::Result<()> {
self.into_inner().map(drop)
}
pub fn into_inner(mut self) -> io::Result<W> {
// write the ranges
let ranges = unsafe { into_u8_slice(self.ranges.as_slice()) };
self.wtr.write_all(ranges)?;
// write the length of the ranges
let len = ranges.len() as u64;
self.wtr.write_u64::<LittleEndian>(len)?;
Ok(self.wtr)
}
}
pub struct DocIndexesBuilder<W> {
keys: BTreeMap<String, u64>,
indexes: Vec<Vec<DocIndex>>,
@ -136,29 +185,27 @@ impl<W: Write> DocIndexesBuilder<W> {
}
pub fn finish(self) -> io::Result<()> {
self.into_inner().map(|_| ())
self.into_inner().map(drop)
}
pub fn into_inner(mut self) -> io::Result<W> {
for vec in &mut self.indexes {
vec.sort_unstable();
}
let (ranges, values) = into_sliced_ranges(self.indexes, self.number_docs);
// write values first
let slice = unsafe { into_u8_slice(values.as_slice()) };
self.wtr.write_all(slice)?;
// write ranges after
let slice = unsafe { into_u8_slice(ranges.as_slice()) };
self.wtr.write_all(slice)?;
// write the length of the ranges
let len = ranges.len() as u64;
// TODO check if this is correct
self.wtr.write_u64::<LittleEndian>(len)?;
unsafe {
// write Ranges first
let slice = into_u8_slice(ranges.as_slice());
self.wtr.write_all(slice)?;
// write Values after
let slice = into_u8_slice(values.as_slice());
self.wtr.write_all(slice)?;
}
self.wtr.flush()?;
Ok(self.wtr)

View File

@ -7,7 +7,7 @@ use std::sync::Arc;
use fst::raw::MmapReadOnly;
pub use self::doc_ids::{DocIds, DocIdsBuilder};
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder};
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder, RawDocIndexesBuilder};
#[derive(Clone)]
enum Data {
@ -19,6 +19,16 @@ enum Data {
Mmap(MmapReadOnly),
}
impl Default for Data {
fn default() -> Data {
Data::Shared {
vec: Arc::default(),
offset: 0,
len: 0,
}
}
}
impl Deref for Data {
type Target = [u8];

View File

@ -3,7 +3,6 @@ use std::io::Write;
use byteorder::{NetworkEndian, WriteBytesExt};
use crate::index::schema::SchemaAttr;
use crate::blob::BlobName;
use crate::DocumentId;
pub struct Identifier {
@ -17,13 +16,6 @@ impl Identifier {
Data { inner }
}
pub fn blob(name: BlobName) -> Blob {
let mut inner = Vec::new();
let _ = inner.write(b"blob");
let _ = inner.write(name.as_bytes());
Blob { inner }
}
pub fn document(id: DocumentId) -> Document {
let mut inner = Vec::new();
let _ = inner.write(b"docu");
@ -38,9 +30,9 @@ pub struct Data {
}
impl Data {
pub fn blobs_order(mut self) -> Self {
pub fn index(mut self) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write(b"blobs-order");
let _ = self.inner.write(b"index");
self
}
@ -55,34 +47,6 @@ impl Data {
}
}
pub struct Blob {
inner: Vec<u8>,
}
impl Blob {
pub fn document_indexes(mut self) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write(b"doc-idx");
self
}
pub fn document_ids(mut self) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write(b"doc-ids");
self
}
pub fn fst_map(mut self) -> Self {
let _ = self.inner.write(b"-");
let _ = self.inner.write(b"fst");
self
}
pub fn build(self) -> Vec<u8> {
self.inner
}
}
pub struct Document {
inner: Vec<u8>,
}

View File

@ -2,76 +2,163 @@ pub mod identifier;
pub mod schema;
pub mod update;
use std::io;
use std::rc::Rc;
use std::error::Error;
use std::fs::{self, File};
use std::fmt::{self, Write};
use std::ops::{Deref, BitOr};
use std::path::{Path, PathBuf};
use std::collections::{BTreeSet, BTreeMap};
use std::path::Path;
use fs2::FileExt;
use fst::map::{Map, MapBuilder, OpBuilder};
use fst::{IntoStreamer, Streamer};
use sdset::duo::Union as SdUnion;
use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation};
use ::rocksdb::rocksdb::Writable;
use ::rocksdb::{rocksdb, rocksdb_options};
use ::rocksdb::merge_operator::MergeOperands;
use crate::DocIndex;
use crate::automaton;
use crate::rank::Document;
use crate::data::DocIdsBuilder;
use crate::{DocIndex, DocumentId};
use crate::index::schema::Schema;
use crate::index::update::Update;
use crate::tokenizer::TokenizerBuilder;
use crate::index::identifier::Identifier;
use crate::blob::{PositiveBlobBuilder, PositiveBlob, BlobInfo, Sign, Blob, blobs_from_blob_infos};
use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer};
use crate::rank::{criterion, Config, RankedStream};
use crate::automaton;
use crate::data::{DocIds, DocIndexes, RawDocIndexesBuilder};
use crate::blob::{PositiveBlob, NegativeBlob, Blob};
fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut output = Vec::new();
for bytes in operands.chain(value) {
output.extend_from_slice(bytes);
fn union_positives(a: &PositiveBlob, b: &PositiveBlob) -> Result<PositiveBlob, Box<Error>> {
let (a_map, a_indexes) = (a.as_map(), a.as_indexes());
let (b_map, b_indexes) = (b.as_map(), b.as_indexes());
let mut map_builder = MapBuilder::memory();
let mut indexes_builder = RawDocIndexesBuilder::memory();
let op_builder = OpBuilder::new().add(a_map).add(b_map);
let mut stream = op_builder.union();
let mut i = 0;
while let Some((key, indexed)) = stream.next() {
let doc_idx: Vec<DocIndex> = match indexed {
[a, b] => {
let a_doc_idx = a_indexes.get(a.value).expect("BUG: could not find document indexes");
let b_doc_idx = b_indexes.get(b.value).expect("BUG: could not find document indexes");
let a_doc_idx = Set::new_unchecked(a_doc_idx);
let b_doc_idx = Set::new_unchecked(b_doc_idx);
let sd_union = SdUnion::new(a_doc_idx, b_doc_idx);
sd_union.into_set_buf().into_vec()
},
[a] => {
let indexes = if a.index == 0 { a_indexes } else { b_indexes };
let doc_idx = indexes.get(a.value).expect("BUG: could not find document indexes");
doc_idx.to_vec()
},
_ => unreachable!(),
};
if !doc_idx.is_empty() {
map_builder.insert(key, i)?;
indexes_builder.insert(&doc_idx)?;
i += 1;
}
}
output
let inner = map_builder.into_inner()?;
let map = Map::from_bytes(inner)?;
let inner = indexes_builder.into_inner()?;
let indexes = DocIndexes::from_bytes(inner)?;
Ok(PositiveBlob::from_raw(map, indexes))
}
pub struct MergeBuilder {
blobs: Vec<Blob>,
fn union_negatives(a: &NegativeBlob, b: &NegativeBlob) -> NegativeBlob {
let a_doc_ids = a.as_ids().doc_ids();
let b_doc_ids = b.as_ids().doc_ids();
let a_doc_ids = Set::new_unchecked(a_doc_ids);
let b_doc_ids = Set::new_unchecked(b_doc_ids);
let sd_union = SdUnion::new(a_doc_ids, b_doc_ids);
let doc_ids = sd_union.into_set_buf().into_vec();
let doc_ids = DocIds::from_document_ids(doc_ids);
NegativeBlob::from_raw(doc_ids)
}
impl MergeBuilder {
pub fn new() -> MergeBuilder {
MergeBuilder { blobs: Vec::new() }
fn merge_positive_negative(pos: &PositiveBlob, neg: &NegativeBlob) -> Result<PositiveBlob, Box<Error>> {
let (map, indexes) = (pos.as_map(), pos.as_indexes());
let doc_ids = neg.as_ids().doc_ids();
let doc_ids = Set::new_unchecked(doc_ids);
let mut map_builder = MapBuilder::memory();
let mut indexes_builder = RawDocIndexesBuilder::memory();
let mut stream = map.into_stream();
let mut i = 0;
while let Some((key, index)) = stream.next() {
let doc_idx = indexes.get(index).expect("BUG: could not find document indexes");
let doc_idx = Set::new_unchecked(doc_idx);
let diff = DifferenceByKey::new(doc_idx, doc_ids, |&d| d.document_id, |id| *id);
let doc_idx: Vec<DocIndex> = diff.into_set_buf().into_vec();
map_builder.insert(key, i)?;
indexes_builder.insert(&doc_idx)?;
i += 1;
}
pub fn push(&mut self, blob: Blob) {
if blob.sign() == Sign::Negative && self.blobs.is_empty() { return }
self.blobs.push(blob);
let inner = map_builder.into_inner()?;
let map = Map::from_bytes(inner)?;
let inner = indexes_builder.into_inner()?;
let indexes = DocIndexes::from_bytes(inner)?;
Ok(PositiveBlob::from_raw(map, indexes))
}
#[derive(Default)]
struct Merge {
blob: PositiveBlob,
}
impl Merge {
fn new(blob: PositiveBlob) -> Merge {
Merge { blob }
}
pub fn merge(self) -> PositiveBlob {
unimplemented!()
fn merge(&mut self, blob: Blob) {
self.blob = match blob {
Blob::Positive(blob) => union_positives(&self.blob, &blob).unwrap(),
Blob::Negative(blob) => merge_positive_negative(&self.blob, &blob).unwrap(),
};
}
fn build(self) -> PositiveBlob {
self.blob
}
}
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
if key != b"data-index" { panic!("The merge operator only allow \"data-index\" merging") }
if key != b"data-index" { panic!("The merge operator only supports \"data-index\" merging") }
let mut merge_builder = MergeBuilder::new();
if let Some(existing_value) = existing_value {
let base: PositiveBlob = bincode::deserialize(existing_value).unwrap(); // FIXME what do we do here ?
merge_builder.push(Blob::Positive(base));
}
let mut merge = match existing_value {
Some(existing_value) => {
let blob = bincode::deserialize(existing_value).expect("BUG: could not deserialize data-index");
Merge::new(blob)
},
None => Merge::default(),
};
for bytes in operands {
let blob: Blob = bincode::deserialize(bytes).unwrap();
merge_builder.push(blob);
let blob = bincode::deserialize(bytes).expect("BUG: could not deserialize blobs");
merge.merge(blob);
}
let blob = merge_builder.merge();
// blob.to_vec()
unimplemented!()
let blob = merge.build();
bincode::serialize(&blob).expect("BUG: could not serialize merged blob")
}
pub struct Index {
@ -95,7 +182,7 @@ impl Index {
opts.create_if_missing(true);
let mut cf_opts = rocksdb_options::ColumnFamilyOptions::new();
cf_opts.add_merge_operator("blobs order operator", simple_vec_append);
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
@ -114,7 +201,7 @@ impl Index {
opts.create_if_missing(false);
let mut cf_opts = rocksdb_options::ColumnFamilyOptions::new();
cf_opts.add_merge_operator("blobs order operator", simple_vec_append);
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
@ -150,12 +237,9 @@ impl Index {
// this snapshot will allow consistent reads for the whole search operation
let snapshot = self.database.snapshot();
let data_key = Identifier::data().blobs_order().build();
let blobs = match snapshot.get(&data_key)? {
Some(value) => {
let blob_infos = BlobInfo::read_from_slice(&value)?;
blobs_from_blob_infos(&blob_infos, &snapshot)?
},
let index_key = Identifier::data().index().build();
let map = match snapshot.get(&index_key)? {
Some(value) => bincode::deserialize(&value)?,
None => Vec::new(),
};
@ -166,7 +250,7 @@ impl Index {
}
let config = Config {
blobs: &blobs,
map: map,
automatons: automatons,
criteria: criterion::default(),
distinct: ((), 1),

View File

@ -1,8 +1,6 @@
use std::path::PathBuf;
use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::blob::{BlobName, Sign};
mod negative_update;

View File

@ -1,7 +1,6 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::error::Error;
use std::fmt::Write;
use ::rocksdb::rocksdb_options;