feat: Introduce the new Index structure

replacing the old ugly Blob system
This commit is contained in:
Clément Renault 2018-12-30 16:17:18 +01:00
parent 6bd779f9ae
commit 0080bf486f
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
28 changed files with 481 additions and 1712 deletions

View File

@ -5,7 +5,7 @@ use serde_derive::{Serialize, Deserialize};
use structopt::StructOpt;
use meilidb::database::schema::{Schema, SchemaBuilder, STORED, INDEXED};
use meilidb::database::PositiveUpdateBuilder;
use meilidb::database::UpdateBuilder;
use meilidb::tokenizer::DefaultBuilder;
use meilidb::database::Database;
@ -44,7 +44,7 @@ fn index(schema: Schema, database_path: &Path, csv_data_path: &Path) -> Result<D
let tokenizer_builder = DefaultBuilder::new();
let update_path = tempfile::NamedTempFile::new()?;
let mut update = PositiveUpdateBuilder::new(update_path.path(), schema, tokenizer_builder);
let mut update = UpdateBuilder::new(update_path.path().to_path_buf(), schema);
let mut rdr = csv::Reader::from_path(csv_data_path)?;
let mut raw_record = csv::StringRecord::new();
@ -59,12 +59,10 @@ fn index(schema: Schema, database_path: &Path, csv_data_path: &Path) -> Result<D
}
};
update.update(&document).unwrap();
update.update_document(&document).unwrap();
}
let mut update = update.build()?;
update.set_move(true);
database.ingest_update_file(update)?;
Ok(database)

View File

@ -14,6 +14,10 @@ pub struct DocIds {
}
impl DocIds {
pub fn empty() -> Self {
DocIds { data: SharedData::empty() }
}
pub fn from_bytes(vec: Vec<u8>) -> io::Result<Self> {
let len = vec.len();
DocIds::from_shared_bytes(Arc::new(vec), 0, len)
@ -24,6 +28,10 @@ impl DocIds {
DocIds::from_data(data)
}
pub fn as_bytes(&self) -> &[u8] {
&self.data
}
fn from_data(data: SharedData) -> io::Result<Self> {
let len = data.as_ref().read_u64::<LittleEndian>()?;
let data = data.range(mem::size_of::<u64>(), len as usize);

View File

@ -24,17 +24,17 @@ pub struct DocIndexes {
}
impl DocIndexes {
pub fn from_bytes(vec: Vec<u8>) -> io::Result<Self> {
pub fn from_bytes(vec: Vec<u8>) -> io::Result<DocIndexes> {
let len = vec.len();
DocIndexes::from_shared_bytes(Arc::new(vec), 0, len)
}
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> io::Result<Self> {
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> io::Result<DocIndexes> {
let data = SharedData { bytes, offset, len };
DocIndexes::from_data(data)
}
fn from_data(data: SharedData) -> io::Result<Self> {
fn from_data(data: SharedData) -> io::Result<DocIndexes> {
let ranges_len_offset = data.len() - size_of::<u64>();
let ranges_len = (&data[ranges_len_offset..]).read_u64::<LittleEndian>()?;
let ranges_len = ranges_len as usize;
@ -47,19 +47,21 @@ impl DocIndexes {
Ok(DocIndexes { ranges, indexes })
}
pub fn to_vec(&self) -> Vec<u8> {
let capacity = self.indexes.len() + self.ranges.len() + size_of::<u64>();
let mut bytes = Vec::with_capacity(capacity);
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let ranges_len = self.ranges.len() as u64;
let indexes_len = self.indexes.len() as u64;
let u64_size = size_of::<u64>() as u64;
let len = indexes_len + ranges_len + u64_size;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(&self.indexes);
bytes.extend_from_slice(&self.ranges);
bytes.write_u64::<LittleEndian>(self.ranges.len() as u64).unwrap();
bytes
let _ = bytes.write_u64::<LittleEndian>(ranges_len);
}
pub fn get(&self, index: usize) -> Option<&Set<DocIndex>> {
self.ranges().get(index as usize).map(|Range { start, end }| {
self.ranges().get(index).map(|Range { start, end }| {
let start = *start as usize;
let end = *end as usize;
let slice = &self.indexes()[start..end];
@ -216,9 +218,12 @@ mod tests {
let builder_bytes = builder.into_inner()?;
let docs = DocIndexes::from_bytes(builder_bytes.clone())?;
let bytes = docs.to_vec();
assert_eq!(builder_bytes, bytes);
let mut bytes = Vec::new();
docs.write_to_bytes(&mut bytes);
let len = size_of::<u64>();
assert_eq!(builder_bytes, &bytes[len..]);
Ok(())
}

View File

@ -15,6 +15,14 @@ struct SharedData {
}
impl SharedData {
pub fn empty() -> SharedData {
SharedData {
bytes: Arc::default(),
offset: 0,
len: 0,
}
}
pub fn range(&self, offset: usize, len: usize) -> SharedData {
assert!(offset + len <= self.len);
SharedData {
@ -27,11 +35,7 @@ impl SharedData {
impl Default for SharedData {
fn default() -> SharedData {
SharedData {
bytes: Arc::default(),
offset: 0,
len: 0,
}
SharedData::empty()
}
}

View File

@ -1,98 +0,0 @@
mod ops;
pub mod positive;
pub mod negative;
pub use self::positive::{PositiveBlob, PositiveBlobBuilder};
pub use self::negative::NegativeBlob;
pub use self::ops::OpBuilder;
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::sync::Arc;
use byteorder::{ReadBytesExt, WriteBytesExt};
#[derive(Debug)]
pub enum Blob {
Positive(PositiveBlob),
Negative(NegativeBlob),
}
impl Blob {
pub fn is_negative(&self) -> bool {
self.sign() == Sign::Negative
}
pub fn is_positive(&self) -> bool {
self.sign() == Sign::Positive
}
pub fn sign(&self) -> Sign {
match self {
Blob::Positive(_) => Sign::Positive,
Blob::Negative(_) => Sign::Negative,
}
}
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> Result<Blob, Box<Error>> {
let mut cursor = Cursor::new(&bytes.as_slice()[..len]);
cursor.consume(offset);
let byte = cursor.read_u8()?;
let blob = match Sign::from_byte(byte)? {
Sign::Positive => {
let offset = cursor.position() as usize;
let len = len - offset;
let blob = PositiveBlob::from_shared_bytes(bytes, offset, len)?;
Blob::Positive(blob)
},
Sign::Negative => {
let offset = cursor.position() as usize;
let len = len - offset;
let blob = NegativeBlob::from_shared_bytes(bytes, offset, len)?;
Blob::Negative(blob)
},
};
Ok(blob)
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let sign = self.sign();
sign.write_to_bytes(bytes);
match self {
Blob::Positive(b) => b.write_to_bytes(bytes),
Blob::Negative(b) => b.write_to_bytes(bytes),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Sign {
Positive,
Negative,
}
impl Sign {
pub fn invert(self) -> Sign {
match self {
Sign::Positive => Sign::Negative,
Sign::Negative => Sign::Positive,
}
}
pub fn from_byte(byte: u8) -> Result<Sign, Box<Error>> {
match byte {
0 => Ok(Sign::Positive),
1 => Ok(Sign::Negative),
b => Err(format!("Invalid sign byte {:?}", b).into()),
}
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
match self {
Sign::Positive => bytes.write_u8(0).unwrap(),
Sign::Negative => bytes.write_u8(1).unwrap(),
}
}
}

View File

@ -1,63 +0,0 @@
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::sync::Arc;
use std::fmt;
use sdset::Set;
use byteorder::{LittleEndian, ReadBytesExt};
use crate::data::DocIds;
use crate::DocumentId;
#[derive(Default)]
pub struct NegativeBlob {
doc_ids: DocIds,
}
impl NegativeBlob {
pub fn from_bytes(doc_ids: Vec<u8>) -> Result<Self, Box<Error>> {
let doc_ids = DocIds::from_bytes(doc_ids)?;
Ok(NegativeBlob { doc_ids })
}
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> Result<Self, Box<Error>> {
let mut cursor = Cursor::new(&bytes.as_slice()[..len]);
cursor.consume(offset);
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let doc_ids = DocIds::from_shared_bytes(bytes, offset, len)?;
Ok(NegativeBlob::from_raw(doc_ids))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
self.doc_ids.write_to_bytes(bytes)
}
pub fn from_raw(doc_ids: DocIds) -> Self {
NegativeBlob { doc_ids }
}
pub fn as_ids(&self) -> &DocIds {
&self.doc_ids
}
pub fn into_doc_ids(self) -> DocIds {
self.doc_ids
}
}
impl AsRef<Set<DocumentId>> for NegativeBlob {
fn as_ref(&self) -> &Set<DocumentId> {
self.as_ids().doc_ids()
}
}
impl fmt::Debug for NegativeBlob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NegativeBlob(")?;
f.debug_list().entries(self.as_ref().as_slice()).finish()?;
write!(f, ")")
}
}

View File

@ -1,5 +0,0 @@
mod blob;
mod ops;
pub use self::blob::NegativeBlob;
pub use self::ops::OpBuilder;

View File

@ -1,73 +0,0 @@
use sdset::multi::OpBuilder as SdOpBuilder;
use sdset::Set;
use crate::database::blob::NegativeBlob;
use crate::data::DocIds;
use crate::DocumentId;
pub struct OpBuilder<'a> {
inner: SdOpBuilder<'a, DocumentId>,
}
/// Do a set operation on multiple negative blobs.
impl<'a> OpBuilder<'a> {
pub fn new() -> Self {
Self { inner: SdOpBuilder::new() }
}
pub fn with_capacity(cap: usize) -> Self {
Self { inner: SdOpBuilder::with_capacity(cap) }
}
pub fn add(mut self, blob: &'a NegativeBlob) -> Self {
self.push(blob);
self
}
pub fn push(&mut self, blob: &'a NegativeBlob) {
let set = Set::new_unchecked(blob.as_ref());
self.inner.push(set);
}
pub fn union(self) -> Union<'a> {
Union::new(self.inner.union())
}
pub fn intersection(self) -> Intersection<'a> {
Intersection::new(self.inner.intersection())
}
pub fn difference(self) -> Difference<'a> {
Difference::new(self.inner.difference())
}
pub fn symmetric_difference(self) -> SymmetricDifference<'a> {
SymmetricDifference::new(self.inner.symmetric_difference())
}
}
macro_rules! logical_operation {
(struct $name:ident, $operation:ident) => {
pub struct $name<'a> {
op: sdset::multi::$name<'a, DocumentId>,
}
impl<'a> $name<'a> {
fn new(op: sdset::multi::$name<'a, DocumentId>) -> Self {
$name { op }
}
pub fn into_negative_blob(self) -> NegativeBlob {
let document_ids = sdset::SetOperation::into_set_buf(self.op);
let doc_ids = DocIds::from_raw(document_ids.into_vec());
NegativeBlob::from_raw(doc_ids)
}
}
}}
logical_operation!(struct Union, union);
logical_operation!(struct Intersection, intersection);
logical_operation!(struct Difference, difference);
logical_operation!(struct SymmetricDifference, symmetric_difference);

View File

@ -1,109 +0,0 @@
use std::error::Error;
use fst::{IntoStreamer, Streamer};
use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation};
use group_by::GroupBy;
use crate::database::blob::{Blob, Sign, PositiveBlob, PositiveBlobBuilder, NegativeBlob};
use crate::database::blob::{positive, negative};
fn blob_same_sign(a: &Blob, b: &Blob) -> bool {
a.sign() == b.sign()
}
fn unwrap_positive(blob: &Blob) -> &PositiveBlob {
match blob {
Blob::Positive(blob) => blob,
Blob::Negative(_) => panic!("called `unwrap_positive()` on a `Negative` value"),
}
}
fn unwrap_negative(blob: &Blob) -> &NegativeBlob {
match blob {
Blob::Negative(blob) => blob,
Blob::Positive(_) => panic!("called `unwrap_negative()` on a `Positive` value"),
}
}
pub struct OpBuilder {
blobs: Vec<Blob>,
}
impl OpBuilder {
pub fn new() -> OpBuilder {
OpBuilder { blobs: Vec::new() }
}
pub fn with_capacity(cap: usize) -> OpBuilder {
OpBuilder { blobs: Vec::with_capacity(cap) }
}
pub fn push(&mut self, blob: Blob) {
if self.blobs.is_empty() && blob.is_negative() { return }
self.blobs.push(blob);
}
pub fn merge(self) -> Result<PositiveBlob, Box<Error>> {
let groups = GroupBy::new(&self.blobs, blob_same_sign);
let mut aggregated = Vec::new();
for blobs in groups {
match blobs[0].sign() {
Sign::Positive => {
let mut op_builder = positive::OpBuilder::with_capacity(blobs.len());
for blob in blobs {
op_builder.push(unwrap_positive(blob));
}
let mut stream = op_builder.union().into_stream();
let mut builder = PositiveBlobBuilder::memory();
while let Some((input, doc_indexes)) = stream.next() {
// FIXME empty doc_indexes must be handled by OpBuilder
if !doc_indexes.is_empty() {
builder.insert(input, doc_indexes).unwrap();
}
}
let (map, doc_indexes) = builder.into_inner().unwrap();
let blob = PositiveBlob::from_bytes(map, doc_indexes).unwrap();
aggregated.push(Blob::Positive(blob));
},
Sign::Negative => {
let mut op_builder = negative::OpBuilder::with_capacity(blobs.len());
for blob in blobs {
op_builder.push(unwrap_negative(blob));
}
let blob = op_builder.union().into_negative_blob();
aggregated.push(Blob::Negative(blob));
},
}
}
let mut buffer = Vec::new();
aggregated.chunks(2).try_fold(PositiveBlob::default(), |base, slice| {
let negative = NegativeBlob::default();
let (positive, negative) = match slice {
[a, b] => (unwrap_positive(a), unwrap_negative(b)),
[a] => (unwrap_positive(a), &negative),
_ => unreachable!(),
};
let mut builder = PositiveBlobBuilder::memory();
let op_builder = positive::OpBuilder::new().add(&base).add(&positive);
let mut stream = op_builder.union().into_stream();
while let Some((input, doc_indexes)) = stream.next() {
let op = DifferenceByKey::new(doc_indexes, negative.as_ref(), |x| x.document_id, |x| *x);
buffer.clear();
op.extend_vec(&mut buffer);
if !buffer.is_empty() {
builder.insert(input, Set::new_unchecked(&buffer))?;
}
}
let (map, doc_indexes) = builder.into_inner()?;
PositiveBlob::from_bytes(map, doc_indexes)
})
}
}

View File

@ -1,275 +0,0 @@
use std::io::{Write, Cursor, BufRead};
use std::convert::From;
use std::error::Error;
use std::sync::Arc;
use std::fmt;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{map, Map, Streamer, IntoStreamer};
use fst::raw::Fst;
use sdset::Set;
use crate::DocIndex;
use crate::data::{DocIndexes, DocIndexesBuilder};
#[derive(Default)]
pub struct PositiveBlob {
map: Map,
indexes: DocIndexes,
}
impl PositiveBlob {
pub fn from_bytes(map: Vec<u8>, indexes: Vec<u8>) -> Result<Self, Box<Error>> {
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Ok(PositiveBlob { map, indexes })
}
pub fn from_raw(map: Map, indexes: DocIndexes) -> Self {
PositiveBlob { map, indexes }
}
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> Result<Self, Box<Error>> {
let mut cursor = Cursor::new(&bytes.as_slice()[..len]);
cursor.consume(offset);
let map_len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let map = Map::from(Fst::from_shared_bytes(bytes.clone(), offset, map_len)?);
cursor.consume(map_len);
let doc_len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let doc_indexes = DocIndexes::from_shared_bytes(bytes, offset, doc_len)?;
Ok(PositiveBlob::from_raw(map, doc_indexes))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let map_bytes = self.map.as_fst().as_bytes();
bytes.write_u64::<LittleEndian>(map_bytes.len() as u64).unwrap();
bytes.extend_from_slice(&map_bytes);
let doc_indexes_vec = self.indexes.to_vec(); // FIXME patch to have a as_slice() function
bytes.write_u64::<LittleEndian>(doc_indexes_vec.len() as u64).unwrap();
bytes.extend_from_slice(&doc_indexes_vec);
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<&[DocIndex]> {
self.map.get(key).map(|index| &self.indexes[index as usize])
}
pub fn as_map(&self) -> &Map {
&self.map
}
pub fn as_indexes(&self) -> &DocIndexes {
&self.indexes
}
pub fn explode(self) -> (Map, DocIndexes) {
(self.map, self.indexes)
}
}
impl fmt::Debug for PositiveBlob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "PositiveBlob([")?;
let mut stream = self.into_stream();
let mut first = true;
while let Some((k, v)) = stream.next() {
if !first {
write!(f, ", ")?;
}
first = false;
write!(f, "({}, {:?})", String::from_utf8_lossy(k), v)?;
}
write!(f, "])")
}
}
impl<'m, 'a> IntoStreamer<'a> for &'m PositiveBlob {
type Item = (&'a [u8], &'a [DocIndex]);
/// The type of the stream to be constructed.
type Into = PositiveBlobStream<'m>;
/// Construct a stream from `Self`.
fn into_stream(self) -> Self::Into {
PositiveBlobStream {
map_stream: self.map.into_stream(),
doc_indexes: &self.indexes,
}
}
}
pub struct PositiveBlobStream<'m> {
map_stream: map::Stream<'m>,
doc_indexes: &'m DocIndexes,
}
impl<'m, 'a> Streamer<'a> for PositiveBlobStream<'m> {
type Item = (&'a [u8], &'a [DocIndex]);
fn next(&'a mut self) -> Option<Self::Item> {
match self.map_stream.next() {
Some((input, index)) => {
let doc_indexes = &self.doc_indexes[index as usize];
Some((input, doc_indexes))
},
None => None,
}
}
}
pub struct PositiveBlobBuilder<W, X> {
map: fst::MapBuilder<W>,
indexes: DocIndexesBuilder<X>,
value: u64,
}
impl PositiveBlobBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
PositiveBlobBuilder {
map: fst::MapBuilder::memory(),
indexes: DocIndexesBuilder::memory(),
value: 0,
}
}
}
impl<W: Write, X: Write> PositiveBlobBuilder<W, X> {
pub fn new(map: W, indexes: X) -> Result<Self, Box<Error>> {
Ok(PositiveBlobBuilder {
map: fst::MapBuilder::new(map)?,
indexes: DocIndexesBuilder::new(indexes),
value: 0,
})
}
/// If a key is inserted that is less than or equal to any previous key added,
/// then an error is returned. Similarly, if there was a problem writing
/// to the underlying writer, an error is returned.
// FIXME what if one write doesn't work but the other do ?
pub fn insert<K>(&mut self, key: K, doc_indexes: &Set<DocIndex>) -> Result<(), Box<Error>>
where K: AsRef<[u8]>,
{
self.map.insert(key, self.value)?;
self.indexes.insert(doc_indexes)?;
self.value += 1;
Ok(())
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(drop)
}
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
let map = self.map.into_inner()?;
let indexes = self.indexes.into_inner()?;
Ok((map, indexes))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error;
use crate::{Attribute, WordArea};
use crate::DocumentId;
#[test]
fn create_query() -> Result<(), Box<Error>> {
let a = DocIndex { document_id: DocumentId(0), attribute: Attribute::new(3, 11), word_area: WordArea::new(30, 4) };
let b = DocIndex { document_id: DocumentId(1), attribute: Attribute::new(4, 21), word_area: WordArea::new(35, 6) };
let c = DocIndex { document_id: DocumentId(2), attribute: Attribute::new(8, 2), word_area: WordArea::new(89, 6) };
let mut builder = PositiveBlobBuilder::memory();
builder.insert("aaa", Set::new(&[a])?)?;
builder.insert("aab", Set::new(&[a, b, c])?)?;
builder.insert("aac", Set::new(&[a, c])?)?;
let (map_bytes, indexes_bytes) = builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));
assert_eq!(positive_blob.get("aad"), None);
Ok(())
}
#[test]
fn serialize_deserialize() -> Result<(), Box<Error>> {
let a = DocIndex {
document_id: DocumentId(0),
attribute: Attribute::new_faillible(3, 11),
word_area: WordArea::new_faillible(30, 4)
};
let b = DocIndex {
document_id: DocumentId(1),
attribute: Attribute::new_faillible(4, 21),
word_area: WordArea::new_faillible(35, 6)
};
let c = DocIndex {
document_id: DocumentId(2),
attribute: Attribute::new_faillible(8, 2),
word_area: WordArea::new_faillible(89, 6)
};
let mut builder = PositiveBlobBuilder::memory();
builder.insert("aaa", Set::new(&[a])?)?;
builder.insert("aab", Set::new(&[a, b, c])?)?;
builder.insert("aac", Set::new(&[a, c])?)?;
let (map_bytes, indexes_bytes) = builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));
assert_eq!(positive_blob.get("aad"), None);
Ok(())
}
#[test]
fn serde_serialize_deserialize() -> Result<(), Box<Error>> {
let a = DocIndex {
document_id: DocumentId(0),
attribute: Attribute::new_faillible(3, 11),
word_area: WordArea::new_faillible(30, 4)
};
let b = DocIndex {
document_id: DocumentId(1),
attribute: Attribute::new_faillible(4, 21),
word_area: WordArea::new_faillible(35, 6)
};
let c = DocIndex {
document_id: DocumentId(2),
attribute: Attribute::new_faillible(8, 2),
word_area: WordArea::new_faillible(89, 6)
};
let mut builder = PositiveBlobBuilder::memory();
builder.insert("aaa", Set::new(&[a])?)?;
builder.insert("aab", Set::new(&[a, b, c])?)?;
builder.insert("aac", Set::new(&[a, c])?)?;
let (map_bytes, indexes_bytes) = builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(map_bytes, indexes_bytes)?;
assert_eq!(positive_blob.get("aaa"), Some(&[a][..]));
assert_eq!(positive_blob.get("aab"), Some(&[a, b, c][..]));
assert_eq!(positive_blob.get("aac"), Some(&[a, c][..]));
assert_eq!(positive_blob.get("aad"), None);
Ok(())
}
}

View File

@ -1,5 +0,0 @@
mod blob;
mod ops;
pub use self::blob::{PositiveBlob, PositiveBlobBuilder};
pub use self::ops::OpBuilder;

View File

@ -1,128 +0,0 @@
use sdset::multi::OpBuilder as SdOpBuilder;
use sdset::{SetOperation, Set};
use crate::database::blob::PositiveBlob;
use crate::data::DocIndexes;
use crate::DocIndex;
pub struct OpBuilder<'m> {
// the operation on the maps is always an union.
map_op: fst::map::OpBuilder<'m>,
indexes: Vec<&'m DocIndexes>,
}
/// Do a set operation on multiple positive blobs.
impl<'m> OpBuilder<'m> {
pub fn new() -> Self {
Self {
map_op: fst::map::OpBuilder::new(),
indexes: Vec::new(),
}
}
pub fn with_capacity(cap: usize) -> Self {
Self {
map_op: fst::map::OpBuilder::new(), // TODO patch fst to add with_capacity
indexes: Vec::with_capacity(cap),
}
}
pub fn add(mut self, blob: &'m PositiveBlob) -> Self {
self.push(blob);
self
}
pub fn push(&mut self, blob: &'m PositiveBlob) {
self.map_op.push(blob.as_map());
self.indexes.push(blob.as_indexes());
}
pub fn union(self) -> Union<'m> {
Union::new(self.map_op.union(), self.indexes)
}
pub fn intersection(self) -> Intersection<'m> {
Intersection::new(self.map_op.union(), self.indexes)
}
pub fn difference(self) -> Difference<'m> {
Difference::new(self.map_op.union(), self.indexes)
}
pub fn symmetric_difference(self) -> SymmetricDifference<'m> {
SymmetricDifference::new(self.map_op.union(), self.indexes)
}
}
macro_rules! logical_operation {
(struct $name:ident, $operation:ident) => {
pub struct $name<'m> {
stream: fst::map::Union<'m>,
indexes: Vec<&'m DocIndexes>,
outs: Vec<DocIndex>,
}
impl<'m> $name<'m> {
fn new(stream: fst::map::Union<'m>, indexes: Vec<&'m DocIndexes>) -> Self {
$name {
stream: stream,
indexes: indexes,
outs: Vec::new(),
}
}
}
impl<'m, 'a> fst::Streamer<'a> for $name<'m> {
type Item = (&'a [u8], &'a Set<DocIndex>);
fn next(&'a mut self) -> Option<Self::Item> {
// loop {
// let (input, ivalues) = match self.stream.next() {
// Some(value) => value,
// None => return None,
// };
// self.outs.clear();
// let mut builder = SdOpBuilder::with_capacity(ivalues.len());
// for ivalue in ivalues {
// let indexes = self.indexes[ivalue.index];
// let indexes = indexes.get(ivalue.value).expect("BUG: could not find document indexes");
// let set = Set::new_unchecked(indexes);
// builder.push(set);
// }
// builder.$operation().extend_vec(&mut self.outs);
// if self.outs.is_empty() { continue }
// return Some((input, &self.outs))
// }
// FIXME make the above code compile
match self.stream.next() {
Some((input, ivalues)) => {
self.outs.clear();
let mut builder = SdOpBuilder::with_capacity(ivalues.len());
for ivalue in ivalues {
let doc_indexes = &self.indexes[ivalue.index][ivalue.value as usize];
let set = Set::new_unchecked(doc_indexes);
builder.push(set);
}
builder.$operation().extend_vec(&mut self.outs);
if self.outs.is_empty() { return None }
return Some((input, Set::new_unchecked(&self.outs)))
},
None => None
}
}
}
}}
logical_operation!(struct Union, union);
logical_operation!(struct Intersection, intersection);
logical_operation!(struct Difference, difference);
logical_operation!(struct SymmetricDifference, symmetric_difference);

View File

@ -7,7 +7,7 @@ use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, DBVector, MergeOperands};
use crossbeam::atomic::ArcCell;
use crate::database::blob::{self, Blob, PositiveBlob};
use crate::database::index::{self, Index, Positive};
use crate::database::{DatabaseView, Update, Schema};
use crate::database::{DATA_INDEX, DATA_SCHEMA};
@ -85,12 +85,9 @@ impl Database {
Err(e) => return Err(e.to_string().into()),
};
let move_update = update.can_be_moved();
let path = update.into_path_buf();
let path = path.to_string_lossy();
let path = update.path().to_string_lossy();
let mut options = IngestExternalFileOptions::new();
options.move_files(move_update);
// options.move_files(move_update);
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?;
@ -124,42 +121,28 @@ impl Database {
}
}
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
if key != DATA_INDEX {
panic!("The merge operator only supports \"data-index\" merging")
}
fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
assert_eq!(key, DATA_INDEX, "The merge operator only supports \"data-index\" merging");
let capacity = {
let remaining = operands.size_hint().0;
let already_exist = usize::from(existing_value.is_some());
remaining + already_exist
};
let mut index: Option<Index> = None;
let mut op = blob::OpBuilder::with_capacity(capacity);
if let Some(bytes) = existing_value {
for bytes in existing.into_iter().chain(operands) {
let bytes_len = bytes.len();
let bytes = Arc::new(bytes.to_vec());
let blob = match PositiveBlob::from_shared_bytes(bytes, 0, bytes_len) {
Ok(blob) => blob,
Err(e) => panic!("BUG: could not deserialize data-index due to {}", e),
let operand = Index::from_shared_bytes(bytes, 0, bytes_len);
let operand = operand.expect("BUG: could not deserialize index");
let merged = match index {
Some(ref index) => index.merge(&operand).expect("BUG: could not merge index"),
None => operand,
};
op.push(Blob::Positive(blob));
index.replace(merged);
}
for bytes in operands {
let bytes_len = bytes.len();
let bytes = Arc::new(bytes.to_vec());
let blob = match Blob::from_shared_bytes(bytes, 0, bytes_len) {
Ok(blob) => blob,
Err(e) => panic!("BUG: could not deserialize blob due to {}", e),
};
op.push(blob);
}
let blob = op.merge().expect("BUG: could not merge blobs");
let index = index.unwrap_or_default();
let mut bytes = Vec::new();
blob.write_to_bytes(&mut bytes);
index.write_to_bytes(&mut bytes);
bytes
}
@ -172,7 +155,7 @@ mod tests {
use tempfile::tempdir;
use crate::database::schema::{SchemaBuilder, STORED, INDEXED};
use crate::database::update::PositiveUpdateBuilder;
use crate::database::update::UpdateBuilder;
use crate::tokenizer::DefaultBuilder;
#[test]
@ -219,15 +202,14 @@ mod tests {
let docid0;
let docid1;
let mut update = {
let mut builder = PositiveUpdateBuilder::new(update_path, schema, tokenizer_builder);
let mut builder = UpdateBuilder::new(update_path, schema);
docid0 = builder.update(&doc0).unwrap();
docid1 = builder.update(&doc1).unwrap();
docid0 = builder.update_document(&doc0).unwrap();
docid1 = builder.update_document(&doc1).unwrap();
builder.build()?
};
update.set_move(true);
database.ingest_update_file(update)?;
let view = database.view();

View File

@ -9,9 +9,9 @@ use serde::de::DeserializeOwned;
use crate::database::{DocumentKey, DocumentKeyAttr};
use crate::database::{retrieve_data_schema, retrieve_data_index};
use crate::database::blob::positive::PositiveBlob;
use crate::database::deserializer::Deserializer;
use crate::database::schema::Schema;
use crate::database::index::Index;
use crate::rank::{QueryBuilder, FilterFunc};
use crate::DocumentId;
@ -19,7 +19,7 @@ pub struct DatabaseView<D>
where D: Deref<Target=DB>
{
snapshot: Snapshot<D>,
blob: PositiveBlob,
index: Index,
schema: Schema,
}
@ -28,16 +28,16 @@ where D: Deref<Target=DB>
{
pub fn new(snapshot: Snapshot<D>) -> Result<DatabaseView<D>, Box<Error>> {
let schema = retrieve_data_schema(&snapshot)?;
let blob = retrieve_data_index(&snapshot)?;
Ok(DatabaseView { snapshot, blob, schema })
let index = retrieve_data_index(&snapshot)?;
Ok(DatabaseView { snapshot, index, schema })
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn blob(&self) -> &PositiveBlob {
&self.blob
pub fn index(&self) -> &Index {
&self.index
}
pub fn into_snapshot(self) -> Snapshot<D> {

81
src/database/index/mod.rs Normal file
View File

@ -0,0 +1,81 @@
mod negative;
mod positive;
pub(crate) use self::negative::Negative;
pub(crate) use self::positive::{Positive, PositiveBuilder};
use std::sync::Arc;
use std::error::Error;
use std::io::{Cursor, BufRead};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{IntoStreamer, Streamer};
use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation};
use fst::raw::Fst;
use fst::Map;
use crate::data::{DocIds, DocIndexes};
#[derive(Default)]
pub struct Index {
pub(crate) negative: Negative,
pub(crate) positive: Positive,
}
impl Index {
pub fn from_bytes(bytes: Vec<u8>) -> Result<Index, Box<Error>> {
let len = bytes.len();
Index::from_shared_bytes(Arc::new(bytes), 0, len)
}
pub fn from_shared_bytes(
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
) -> Result<Index, Box<Error>>
{
let (negative, neg_offset) = Negative::from_shared_bytes(bytes.clone(), offset, len)?;
let (positive, _) = Positive::from_shared_bytes(bytes, offset + neg_offset, len)?;
Ok(Index { negative, positive })
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
self.negative.write_to_bytes(bytes);
self.positive.write_to_bytes(bytes);
}
pub fn merge(&self, other: &Index) -> Result<Index, Box<Error>> {
if other.negative.is_empty() {
let negative = Negative::default();
let positive = self.positive.union(&other.positive)?;
return Ok(Index { negative, positive })
}
let mut buffer = Vec::new();
let mut builder = PositiveBuilder::memory();
let mut stream = self.positive.into_stream();
while let Some((key, indexes)) = stream.next() {
let op = DifferenceByKey::new(indexes, &other.negative, |x| x.document_id, |x| *x);
buffer.clear();
op.extend_vec(&mut buffer);
if !buffer.is_empty() {
let indexes = Set::new_unchecked(&buffer);
builder.insert(key, indexes)?;
}
}
let positive = {
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive { map, indexes }
};
let negative = Negative::default();
let positive = positive.union(&other.positive)?;
Ok(Index { negative, positive })
}
}

View File

@ -0,0 +1,53 @@
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::mem::size_of;
use std::ops::Deref;
use std::sync::Arc;
use sdset::Set;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use crate::data::DocIds;
use crate::DocumentId;
#[derive(Default)]
pub struct Negative {
pub doc_ids: DocIds,
}
impl Negative {
pub fn from_shared_bytes(
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
) -> Result<(Negative, usize), Box<Error>>
{
let mut cursor = Cursor::new(&bytes[..len]);
cursor.consume(offset);
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let doc_ids = DocIds::from_shared_bytes(bytes, offset, len)?;
Ok((Negative { doc_ids }, offset + len))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let slice = self.doc_ids.as_bytes();
let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice);
}
pub fn is_empty(&self) -> bool {
self.doc_ids.doc_ids().is_empty()
}
}
impl Deref for Negative {
type Target = Set<DocumentId>;
fn deref(&self) -> &Self::Target {
self.doc_ids.doc_ids()
}
}

View File

@ -0,0 +1,172 @@
use std::io::{Write, BufRead, Cursor};
use std::mem::size_of;
use std::error::Error;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{map, Map, Streamer, IntoStreamer};
use sdset::{Set, SetOperation};
use sdset::duo::Union;
use fst::raw::Fst;
use crate::data::{DocIndexes, DocIndexesBuilder};
use crate::DocIndex;
#[derive(Default)]
pub struct Positive {
pub map: Map,
pub indexes: DocIndexes,
}
impl Positive {
pub fn from_shared_bytes(
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
) -> Result<(Positive, usize), Box<Error>>
{
let mut cursor = Cursor::new(&bytes[..len]);
cursor.consume(offset);
let map_len = cursor.read_u64::<LittleEndian>()? as usize;
let map_offset = cursor.position() as usize;
let fst = Fst::from_shared_bytes(bytes.clone(), map_offset, map_len)?;
let map = Map::from(fst);
cursor.consume(map_len);
let indexes_len = cursor.read_u64::<LittleEndian>()? as usize;
let indexes_offset = cursor.position() as usize;
let indexes = DocIndexes::from_shared_bytes(bytes, indexes_offset, indexes_len)?;
let positive = Positive { map, indexes };
let len = indexes_offset + indexes_len;
Ok((positive, len))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
// indexes
let slice = self.map.as_fst().as_bytes();
let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice);
// map
self.indexes.write_to_bytes(bytes);
}
pub fn union(&self, other: &Positive) -> Result<Positive, Box<Error>> {
let mut builder = PositiveBuilder::memory();
let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union();
let mut buffer = Vec::new();
while let Some((key, ivalues)) = stream.next() {
buffer.clear();
match ivalues {
[a, b] => {
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?;
let a = Set::new_unchecked(indexes);
let indexes = if b.index == 0 { &self.indexes } else { &other.indexes };
let indexes = indexes.get(b.value as usize).ok_or(format!("index not found"))?;
let b = Set::new_unchecked(indexes);
let op = Union::new(a, b);
op.extend_vec(&mut buffer);
},
[a] => {
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?;
buffer.extend_from_slice(indexes)
},
_ => continue,
}
if !buffer.is_empty() {
let indexes = Set::new_unchecked(&buffer);
builder.insert(key, indexes)?;
}
}
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Ok(Positive { map, indexes })
}
}
impl<'m, 'a> IntoStreamer<'a> for &'m Positive {
type Item = (&'a [u8], &'a Set<DocIndex>);
/// The type of the stream to be constructed.
type Into = Stream<'m>;
/// Construct a stream from `Self`.
fn into_stream(self) -> Self::Into {
Stream {
map_stream: self.map.into_stream(),
indexes: &self.indexes,
}
}
}
pub struct Stream<'m> {
map_stream: map::Stream<'m>,
indexes: &'m DocIndexes,
}
impl<'m, 'a> Streamer<'a> for Stream<'m> {
type Item = (&'a [u8], &'a Set<DocIndex>);
fn next(&'a mut self) -> Option<Self::Item> {
match self.map_stream.next() {
Some((input, index)) => {
let indexes = &self.indexes[index as usize];
let indexes = Set::new_unchecked(indexes);
Some((input, indexes))
},
None => None,
}
}
}
pub struct PositiveBuilder<W, X> {
map: fst::MapBuilder<W>,
indexes: DocIndexesBuilder<X>,
value: u64,
}
impl PositiveBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
PositiveBuilder {
map: fst::MapBuilder::memory(),
indexes: DocIndexesBuilder::memory(),
value: 0,
}
}
}
impl<W: Write, X: Write> PositiveBuilder<W, X> {
/// If a key is inserted that is less than or equal to any previous key added,
/// then an error is returned. Similarly, if there was a problem writing
/// to the underlying writer, an error is returned.
// FIXME what if one write doesn't work but the other do ?
pub fn insert<K>(&mut self, key: K, indexes: &Set<DocIndex>) -> Result<(), Box<Error>>
where K: AsRef<[u8]>,
{
self.map.insert(key, self.value)?;
self.indexes.insert(indexes)?;
self.value += 1;
Ok(())
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(drop)
}
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
let map = self.map.into_inner()?;
let indexes = self.indexes.into_inner()?;
Ok((map, indexes))
}
}

View File

@ -6,15 +6,12 @@ use std::sync::Arc;
use rocksdb::rocksdb::{DB, Snapshot};
pub use self::update::{
Update, PositiveUpdateBuilder, NewState,
SerializerError, NegativeUpdateBuilder
};
pub use self::index::Index;
pub use self::update::{Update, UpdateBuilder};
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
pub use self::database_view::{DatabaseView, DocumentIter};
pub use self::database::Database;
pub use self::schema::Schema;
use self::blob::positive::PositiveBlob;
const DATA_INDEX: &[u8] = b"data-index";
const DATA_SCHEMA: &[u8] = b"data-schema";
@ -29,8 +26,8 @@ macro_rules! forward_to_unserializable_type {
}
}
pub mod blob;
pub mod schema;
pub(crate) mod index;
mod update;
mod database;
mod document_key;
@ -52,15 +49,15 @@ where D: Deref<Target=DB>
}
}
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<PositiveBlob, Box<Error>>
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
where D: Deref<Target=DB>
{
match snapshot.get(DATA_INDEX)? {
Some(vector) => {
let bytes_len = vector.as_ref().len();
let bytes = Arc::new(vector.as_ref().to_vec());
Ok(PositiveBlob::from_shared_bytes(bytes, 0, bytes_len)?)
Ok(Index::from_shared_bytes(bytes, 0, bytes_len)?)
},
None => Ok(PositiveBlob::default()),
None => Ok(Index::default()),
}
}

View File

@ -1,4 +1,3 @@
use crate::database::update::SerializerError;
use std::collections::{HashMap, BTreeMap};
use crate::database::calculate_hash;
use std::io::{Read, Write};
@ -141,13 +140,10 @@ impl Schema {
attributes
}
pub fn document_id<T>(&self, document: &T) -> Result<DocumentId, SerializerError>
pub fn document_id<T>(&self, document: &T) -> Result<DocumentId, Box<Error>>
where T: Serialize,
{
let find_document_id = FindDocumentIdSerializer {
id_attribute_name: self.identifier_name(),
};
document.serialize(find_document_id)
unimplemented!()
}
pub fn props(&self, attr: SchemaAttr) -> SchemaProps {
@ -188,192 +184,6 @@ impl fmt::Display for SchemaAttr {
}
}
struct FindDocumentIdSerializer<'a> {
id_attribute_name: &'a str,
}
impl<'a> ser::Serializer for FindDocumentIdSerializer<'a> {
type Ok = DocumentId;
type Error = SerializerError;
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
type SerializeStruct = FindDocumentIdStructSerializer<'a>;
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
forward_to_unserializable_type! {
bool => serialize_bool,
char => serialize_char,
i8 => serialize_i8,
i16 => serialize_i16,
i32 => serialize_i32,
i64 => serialize_i64,
u8 => serialize_u8,
u16 => serialize_u16,
u32 => serialize_u32,
u64 => serialize_u64,
f32 => serialize_f32,
f64 => serialize_f64,
}
fn serialize_str(self, _v: &str) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "str" })
}
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "&[u8]" })
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "()" })
}
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "unit struct" })
}
fn serialize_unit_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str
) -> Result<Self::Ok, Self::Error>
{
Err(SerializerError::UnserializableType { name: "unit variant" })
}
fn serialize_newtype_struct<T: ?Sized>(
self,
_name: &'static str,
value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
value.serialize(self)
}
fn serialize_newtype_variant<T: ?Sized>(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "newtype variant" })
}
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Err(SerializerError::UnserializableType { name: "sequence" })
}
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
Err(SerializerError::UnserializableType { name: "tuple" })
}
fn serialize_tuple_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeTupleStruct, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple struct" })
}
fn serialize_tuple_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeTupleVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple variant" })
}
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
// Ok(MapSerializer {
// schema: self.schema,
// document_id: self.document_id,
// new_states: self.new_states,
// })
Err(SerializerError::UnserializableType { name: "map" })
}
fn serialize_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeStruct, Self::Error>
{
Ok(FindDocumentIdStructSerializer {
id_attribute_name: self.id_attribute_name,
document_id: None,
})
}
fn serialize_struct_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeStructVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "struct variant" })
}
}
struct FindDocumentIdStructSerializer<'a> {
id_attribute_name: &'a str,
document_id: Option<DocumentId>,
}
impl<'a> ser::SerializeStruct for FindDocumentIdStructSerializer<'a> {
type Ok = DocumentId;
type Error = SerializerError;
fn serialize_field<T: ?Sized>(
&mut self,
key: &'static str,
value: &T
) -> Result<(), Self::Error>
where T: Serialize,
{
if self.id_attribute_name == key {
// TODO can it be possible to have multiple ids?
let id = bincode::serialize(value).unwrap();
let hash = calculate_hash(&id);
self.document_id = Some(DocumentId(hash));
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
match self.document_id {
Some(document_id) => Ok(document_id),
None => Err(SerializerError::DocumentIdNotFound)
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -0,0 +1,95 @@
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::error::Error;
use fst::map::{Map, MapBuilder};
use rocksdb::rocksdb_options;
use serde::Serialize;
use sdset::Set;
use crate::database::index::{Index, Positive, PositiveBuilder, Negative};
use crate::database::{DATA_INDEX, Schema, DocumentKeyAttr};
use crate::data::{DocIds, DocIndexes};
use crate::{DocumentId, DocIndex};
use super::Update;
type Token = Vec<u8>; // TODO could be replaced by a SmallVec
type Value = Vec<u8>;
pub struct UpdateBuilder {
sst_file: PathBuf,
schema: Schema,
removed_documents: BTreeSet<DocumentId>,
words_indexes: BTreeMap<Token, Vec<DocIndex>>,
keys_values: BTreeMap<DocumentKeyAttr, Value>,
}
impl UpdateBuilder {
pub fn new(path: PathBuf, schema: Schema) -> UpdateBuilder {
UpdateBuilder {
sst_file: path,
schema: schema,
removed_documents: BTreeSet::new(),
words_indexes: BTreeMap::new(),
keys_values: BTreeMap::new(),
}
}
pub fn update_document<T>(&mut self, document: T) -> Result<DocumentId, Box<Error>>
where T: Serialize,
{
unimplemented!()
}
pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, Box<Error>>
where T: Serialize,
{
unimplemented!()
}
pub fn build(self) -> Result<Update, Box<Error>> {
let tree = {
let negative = {
let documents_ids = self.removed_documents.into_iter().collect();
let doc_ids = DocIds::from_raw(documents_ids);
Negative { doc_ids }
};
let positive = {
let mut builder = PositiveBuilder::memory();
for (key, mut indexes) in self.words_indexes {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
builder.insert(key, indexes);
}
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive { map, indexes }
};
Index { negative, positive }
};
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.sst_file.to_string_lossy())?;
// write the data-index
let mut bytes = Vec::new();
tree.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents attributes updates
for (key, value) in self.keys_values {
file_writer.put(key.as_ref(), &value)?;
}
file_writer.finish()?;
Ok(Update { sst_file: self.sst_file })
}
}

View File

@ -1,35 +1,15 @@
use std::path::PathBuf;
use std::error::Error;
use std::path::{Path, PathBuf};
mod negative;
mod positive;
mod builder;
pub use self::positive::{PositiveUpdateBuilder, NewState, SerializerError};
pub use self::negative::NegativeUpdateBuilder;
pub use self::builder::UpdateBuilder;
pub struct Update {
path: PathBuf,
can_be_moved: bool,
sst_file: PathBuf,
}
impl Update {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
Ok(Update { path: path.into(), can_be_moved: false })
}
pub fn open_and_move<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
Ok(Update { path: path.into(), can_be_moved: true })
}
pub fn set_move(&mut self, can_be_moved: bool) {
self.can_be_moved = can_be_moved
}
pub fn can_be_moved(&self) -> bool {
self.can_be_moved
}
pub fn into_path_buf(self) -> PathBuf {
self.path
pub fn path(&self) -> &Path {
&self.sst_file
}
}

View File

@ -1,4 +0,0 @@
mod update;
mod unordered_builder;
pub use self::update::NegativeUpdateBuilder;

View File

@ -1,37 +0,0 @@
use std::collections::BTreeSet;
use std::io;
use byteorder::{NativeEndian, WriteBytesExt};
use crate::DocumentId;
pub struct UnorderedNegativeBlobBuilder<W> {
doc_ids: BTreeSet<DocumentId>, // TODO: prefer a linked-list
wrt: W,
}
impl UnorderedNegativeBlobBuilder<Vec<u8>> {
pub fn memory() -> Self {
UnorderedNegativeBlobBuilder::new(Vec::new())
}
}
impl<W: io::Write> UnorderedNegativeBlobBuilder<W> {
pub fn new(wrt: W) -> Self {
Self {
doc_ids: BTreeSet::new(),
wrt: wrt,
}
}
pub fn insert(&mut self, doc: DocumentId) -> bool {
self.doc_ids.insert(doc)
}
pub fn into_inner(mut self) -> io::Result<W> {
for id in self.doc_ids {
self.wrt.write_u64::<NativeEndian>(id.0)?;
}
Ok(self.wrt)
}
}

View File

@ -1,61 +0,0 @@
use std::path::PathBuf;
use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::database::update::negative::unordered_builder::UnorderedNegativeBlobBuilder;
use crate::database::blob::{Blob, NegativeBlob};
use crate::database::update::Update;
use crate::database::DocumentKey;
use crate::database::DATA_INDEX;
use crate::DocumentId;
pub struct NegativeUpdateBuilder {
path: PathBuf,
doc_ids: UnorderedNegativeBlobBuilder<Vec<u8>>,
}
impl NegativeUpdateBuilder {
pub fn new<P: Into<PathBuf>>(path: P) -> NegativeUpdateBuilder {
NegativeUpdateBuilder {
path: path.into(),
doc_ids: UnorderedNegativeBlobBuilder::memory(),
}
}
pub fn remove(&mut self, id: DocumentId) -> bool {
self.doc_ids.insert(id)
}
pub fn build(self) -> Result<Update, Box<Error>> {
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
let bytes = self.doc_ids.into_inner()?;
let negative_blob = NegativeBlob::from_bytes(bytes)?;
let blob = Blob::Negative(negative_blob);
// write the data-index aka negative blob
let mut bytes = Vec::new();
blob.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// FIXME remove this ugly thing !
// let Blob::Negative(negative_blob) = blob;
let negative_blob = match blob {
Blob::Negative(blob) => blob,
Blob::Positive(_) => unreachable!(),
};
for &document_id in negative_blob.as_ref().as_slice() {
let start = DocumentKey::new(document_id);
let end = start.with_attribute_max();
file_writer.delete_range(start.as_ref(), end.as_ref())?;
}
file_writer.finish()?;
Update::open(self.path)
}
}

View File

@ -1,4 +0,0 @@
mod update;
mod unordered_builder;
pub use self::update::{PositiveUpdateBuilder, NewState, SerializerError};

View File

@ -1,49 +0,0 @@
#![allow(unused)]
use std::collections::BTreeMap;
use std::error::Error;
use std::io::Write;
use sdset::Set;
use crate::database::blob::positive::PositiveBlobBuilder;
use crate::DocIndex;
pub struct UnorderedPositiveBlobBuilder<W, X> {
builder: PositiveBlobBuilder<W, X>,
map: BTreeMap<Vec<u8>, Vec<DocIndex>>,
}
impl UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
Self {
builder: PositiveBlobBuilder::memory(),
map: BTreeMap::new(),
}
}
}
impl<W: Write, X: Write> UnorderedPositiveBlobBuilder<W, X> {
pub fn new(map_wtr: W, doc_wtr: X) -> Result<Self, Box<Error>> {
Ok(UnorderedPositiveBlobBuilder {
builder: PositiveBlobBuilder::new(map_wtr, doc_wtr)?,
map: BTreeMap::new(),
})
}
pub fn insert<K: Into<Vec<u8>>>(&mut self, input: K, doc_index: DocIndex) {
self.map.entry(input.into()).or_insert_with(Vec::new).push(doc_index);
}
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(drop)
}
pub fn into_inner(mut self) -> Result<(W, X), Box<Error>> {
for (key, mut doc_indexes) in self.map {
doc_indexes.sort_unstable();
self.builder.insert(&key, Set::new_unchecked(&doc_indexes))?;
}
self.builder.into_inner()
}
}

View File

@ -1,505 +0,0 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::error::Error;
use std::fmt;
use ::rocksdb::rocksdb_options;
use serde::ser::{self, Serialize};
use crate::database::update::positive::unordered_builder::UnorderedPositiveBlobBuilder;
use crate::database::blob::positive::PositiveBlob;
use crate::database::schema::{Schema, SchemaAttr};
use crate::tokenizer::{TokenizerBuilder, Token};
use crate::database::DocumentKeyAttr;
use crate::database::update::Update;
use crate::database::DATA_INDEX;
use crate::database::blob::Blob;
use crate::{DocumentId, DocIndex, Attribute, WordArea};
pub enum NewState {
Updated { value: Vec<u8> },
Removed,
}
pub struct PositiveUpdateBuilder<B> {
path: PathBuf,
schema: Schema,
tokenizer_builder: B,
builder: UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
new_states: BTreeMap<DocumentKeyAttr, NewState>,
}
impl<B> PositiveUpdateBuilder<B> {
pub fn new<P: Into<PathBuf>>(path: P, schema: Schema, tokenizer_builder: B) -> PositiveUpdateBuilder<B> {
PositiveUpdateBuilder {
path: path.into(),
schema: schema,
tokenizer_builder: tokenizer_builder,
builder: UnorderedPositiveBlobBuilder::memory(),
new_states: BTreeMap::new(),
}
}
pub fn update<T: Serialize>(&mut self, document: &T) -> Result<DocumentId, SerializerError>
where B: TokenizerBuilder
{
let document_id = self.schema.document_id(document)?;
let serializer = Serializer {
schema: &self.schema,
tokenizer_builder: &self.tokenizer_builder,
document_id: document_id,
builder: &mut self.builder,
new_states: &mut self.new_states
};
document.serialize(serializer)?;
Ok(document_id)
}
// TODO value must be a field that can be indexed
pub fn update_field(&mut self, id: DocumentId, attr: SchemaAttr, value: String) {
let value = bincode::serialize(&value).unwrap();
self.new_states.insert(DocumentKeyAttr::new(id, attr), NewState::Updated { value });
}
pub fn remove_field(&mut self, id: DocumentId, attr: SchemaAttr) {
self.new_states.insert(DocumentKeyAttr::new(id, attr), NewState::Removed);
}
}
#[derive(Debug)]
pub enum SerializerError {
DocumentIdNotFound,
UnserializableType { name: &'static str },
Custom(String),
}
impl ser::Error for SerializerError {
fn custom<T: fmt::Display>(msg: T) -> Self {
SerializerError::Custom(msg.to_string())
}
}
impl fmt::Display for SerializerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SerializerError::DocumentIdNotFound => {
write!(f, "serialized document does not have an id according to the schema")
}
SerializerError::UnserializableType { name } => {
write!(f, "Only struct and map types are considered valid documents and
can be serialized, not {} types directly.", name)
},
SerializerError::Custom(s) => f.write_str(&s),
}
}
}
impl Error for SerializerError {}
struct Serializer<'a, B> {
schema: &'a Schema,
tokenizer_builder: &'a B,
document_id: DocumentId,
builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>,
}
impl<'a, B> ser::Serializer for Serializer<'a, B>
where B: TokenizerBuilder
{
type Ok = ();
type Error = SerializerError;
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
type SerializeStruct = StructSerializer<'a, B>;
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
forward_to_unserializable_type! {
bool => serialize_bool,
char => serialize_char,
i8 => serialize_i8,
i16 => serialize_i16,
i32 => serialize_i32,
i64 => serialize_i64,
u8 => serialize_u8,
u16 => serialize_u16,
u32 => serialize_u32,
u64 => serialize_u64,
f32 => serialize_f32,
f64 => serialize_f64,
}
fn serialize_str(self, _v: &str) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "str" })
}
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "&[u8]" })
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "()" })
}
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "unit struct" })
}
fn serialize_unit_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str
) -> Result<Self::Ok, Self::Error>
{
Err(SerializerError::UnserializableType { name: "unit variant" })
}
fn serialize_newtype_struct<T: ?Sized>(
self,
_name: &'static str,
value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
value.serialize(self)
}
fn serialize_newtype_variant<T: ?Sized>(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "newtype variant" })
}
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Err(SerializerError::UnserializableType { name: "sequence" })
}
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
Err(SerializerError::UnserializableType { name: "tuple" })
}
fn serialize_tuple_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeTupleStruct, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple struct" })
}
fn serialize_tuple_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeTupleVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple variant" })
}
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
// Ok(MapSerializer {
// schema: self.schema,
// document_id: self.document_id,
// new_states: self.new_states,
// })
Err(SerializerError::UnserializableType { name: "map" })
}
fn serialize_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeStruct, Self::Error>
{
Ok(StructSerializer {
schema: self.schema,
tokenizer_builder: self.tokenizer_builder,
document_id: self.document_id,
builder: self.builder,
new_states: self.new_states,
})
}
fn serialize_struct_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeStructVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "struct variant" })
}
}
struct StructSerializer<'a, B> {
schema: &'a Schema,
tokenizer_builder: &'a B,
document_id: DocumentId,
builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>,
}
impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B>
where B: TokenizerBuilder
{
type Ok = ();
type Error = SerializerError;
fn serialize_field<T: ?Sized>(
&mut self,
key: &'static str,
value: &T
) -> Result<(), Self::Error>
where T: Serialize,
{
if let Some(attr) = self.schema.attribute(key) {
let props = self.schema.props(attr);
if props.is_stored() {
let value = bincode::serialize(value).unwrap();
let key = DocumentKeyAttr::new(self.document_id, attr);
self.new_states.insert(key, NewState::Updated { value });
}
if props.is_indexed() {
let serializer = IndexerSerializer {
builder: self.builder,
tokenizer_builder: self.tokenizer_builder,
document_id: self.document_id,
attribute: attr,
};
value.serialize(serializer)?;
}
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(())
}
}
struct IndexerSerializer<'a, B> {
tokenizer_builder: &'a B,
builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>,
document_id: DocumentId,
attribute: SchemaAttr,
}
impl<'a, B> ser::Serializer for IndexerSerializer<'a, B>
where B: TokenizerBuilder
{
type Ok = ();
type Error = SerializerError;
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
type SerializeMap = ser::Impossible<Self::Ok, Self::Error>;
type SerializeStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
forward_to_unserializable_type! {
bool => serialize_bool,
char => serialize_char,
i8 => serialize_i8,
i16 => serialize_i16,
i32 => serialize_i32,
i64 => serialize_i64,
u8 => serialize_u8,
u16 => serialize_u16,
u32 => serialize_u32,
u64 => serialize_u64,
f32 => serialize_f32,
f64 => serialize_f64,
}
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
for Token { word, word_index, char_index } in self.tokenizer_builder.build(v) {
let doc_index = DocIndex {
document_id: self.document_id,
attribute: Attribute::new_faillible(self.attribute.0, word_index as u32),
word_area: WordArea::new_faillible(char_index as u32, word.len() as u16),
};
// insert the exact representation
let word_lower = word.to_lowercase();
// and the unidecoded lowercased version
let word_unidecoded = unidecode::unidecode(word).to_lowercase();
if word_lower != word_unidecoded {
self.builder.insert(word_unidecoded, doc_index);
}
self.builder.insert(word_lower, doc_index);
}
Ok(())
}
fn serialize_bytes(self, _v: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "&[u8]" })
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_some<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "Option" })
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "()" })
}
fn serialize_unit_struct(self, _name: &'static str) -> Result<Self::Ok, Self::Error> {
Err(SerializerError::UnserializableType { name: "unit struct" })
}
fn serialize_unit_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str
) -> Result<Self::Ok, Self::Error>
{
Err(SerializerError::UnserializableType { name: "unit variant" })
}
fn serialize_newtype_struct<T: ?Sized>(
self,
_name: &'static str,
value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
value.serialize(self)
}
fn serialize_newtype_variant<T: ?Sized>(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_value: &T
) -> Result<Self::Ok, Self::Error>
where T: Serialize,
{
Err(SerializerError::UnserializableType { name: "newtype variant" })
}
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Err(SerializerError::UnserializableType { name: "seq" })
}
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Self::Error> {
Err(SerializerError::UnserializableType { name: "tuple" })
}
fn serialize_tuple_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeTupleStruct, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple struct" })
}
fn serialize_tuple_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeTupleVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "tuple variant" })
}
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Err(SerializerError::UnserializableType { name: "map" })
}
fn serialize_struct(
self,
_name: &'static str,
_len: usize
) -> Result<Self::SerializeStruct, Self::Error>
{
Err(SerializerError::UnserializableType { name: "struct" })
}
fn serialize_struct_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize
) -> Result<Self::SerializeStructVariant, Self::Error>
{
Err(SerializerError::UnserializableType { name: "struct variant" })
}
}
impl<B> PositiveUpdateBuilder<B> {
pub fn build(self) -> Result<Update, Box<Error>> {
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
let (blob_fst_map, blob_doc_idx) = self.builder.into_inner()?;
let positive_blob = PositiveBlob::from_bytes(blob_fst_map, blob_doc_idx)?;
let blob = Blob::Positive(positive_blob);
// write the data-index aka positive blob
let mut bytes = Vec::new();
blob.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents fields updates
for (key, state) in self.new_states {
match state {
NewState::Updated { value } => {
file_writer.put(key.as_ref(), &value)?
},
NewState::Removed => file_writer.delete(key.as_ref())?,
}
}
file_writer.finish()?;
Update::open(self.path)
}
}

View File

@ -86,7 +86,7 @@ where D: Deref<Target=DB>,
let mut stream = {
let mut op_builder = fst::map::OpBuilder::new();
for automaton in &automatons {
let stream = self.view.blob().as_map().search(automaton);
let stream = self.view.index().positive.map.search(automaton);
op_builder.push(stream);
}
op_builder.union()
@ -100,7 +100,7 @@ where D: Deref<Target=DB>,
let distance = automaton.eval(input).to_u8();
let is_exact = distance == 0 && input.len() == automaton.query_len();
let doc_indexes = self.view.blob().as_indexes();
let doc_indexes = &self.view.index().positive.indexes;
let doc_indexes = &doc_indexes[iv.value as usize];
for doc_index in doc_indexes {