feat: Introduce the Index structure along with the Events types

This commit is contained in:
Clément Renault 2019-02-14 20:22:25 +01:00
parent ce50e74491
commit cbb0aaa217
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
12 changed files with 221 additions and 245 deletions

View File

@ -5,24 +5,25 @@ version = "0.3.0"
authors = ["Kerollmops <renault.cle@gmail.com>"]
[dependencies]
arc-swap = "0.3"
bincode = "1.0"
byteorder = "1.2"
arc-swap = "0.3"
elapsed = "0.1"
fst = "0.3"
hashbrown = { version = "0.1", features = ["serde"] }
lazy_static = "1.1"
levenshtein_automata = { version = "0.1", features = ["fst_automaton"] }
linked-hash-map = { version = "0.5", features = ["serde_impl"] }
lockfree = "0.5"
log = "0.4"
rayon = "1.0"
sdset = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
size_format = "1.0"
slice-group-by = "0.2"
unidecode = "0.3"
rayon = "1.0"
lockfree = "0.5.1"
[dependencies.toml]
git = "https://github.com/Kerollmops/toml-rs.git"

View File

@ -26,8 +26,8 @@ pub struct DocIndexes {
impl DocIndexes {
pub fn from_bytes(bytes: Vec<u8>) -> io::Result<DocIndexes> {
let bytes = Arc::new(bytes);
let len = bytes.len();
let bytes = Arc::from(bytes);
let data = SharedData::new(bytes, 0, len);
let mut cursor = Cursor::new(data);
DocIndexes::from_cursor(&mut cursor)

View File

@ -9,7 +9,7 @@ use std::sync::Arc;
pub use self::doc_ids::DocIds;
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder};
#[derive(Default, Clone)]
#[derive(Clone, Default)]
pub struct SharedData {
pub bytes: Arc<Vec<u8>>,
pub offset: usize,
@ -19,7 +19,7 @@ pub struct SharedData {
impl SharedData {
pub fn from_bytes(vec: Vec<u8>) -> SharedData {
let len = vec.len();
let bytes = Arc::new(vec);
let bytes = Arc::from(vec);
SharedData::new(bytes, 0, len)
}

View File

@ -1,28 +1,41 @@
use std::io::{Write, BufRead, Cursor};
use std::io::{Cursor, BufRead};
use std::error::Error;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{map, Map, Streamer, IntoStreamer};
use sdset::{Set, SetOperation};
use sdset::duo::Union;
use fst::{map, Map, IntoStreamer, Streamer};
use fst::raw::Fst;
use sdset::duo::{Union, DifferenceByKey};
use sdset::{Set, SetOperation};
use crate::data::{DocIndexes, DocIndexesBuilder};
use crate::data::SharedData;
use crate::DocIndex;
use crate::data::{SharedData, DocIndexes, DocIndexesBuilder};
use crate::{DocumentId, DocIndex};
#[derive(Default)]
pub struct Positive {
map: Map,
indexes: DocIndexes,
pub struct Index {
pub map: Map,
pub indexes: DocIndexes,
}
impl Positive {
pub fn new(map: Map, indexes: DocIndexes) -> Positive {
Positive { map, indexes }
impl Index {
pub fn from_bytes(bytes: Vec<u8>) -> Result<Index, Box<Error>> {
let len = bytes.len();
Index::from_shared_bytes(Arc::from(bytes), 0, len)
}
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Positive, Box<Error>> {
pub fn from_shared_bytes(
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
) -> Result<Index, Box<Error>>
{
let data = SharedData::new(bytes, offset, len);
let mut cursor = Cursor::new(data);
Index::from_cursor(&mut cursor)
}
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Index, Box<Error>> {
let len = cursor.read_u64::<LittleEndian>()? as usize;
let offset = cursor.position() as usize;
let data = cursor.get_ref().range(offset, len);
@ -33,7 +46,7 @@ impl Positive {
let indexes = DocIndexes::from_cursor(cursor)?;
Ok(Positive { map, indexes})
Ok(Index { map, indexes})
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
@ -45,16 +58,28 @@ impl Positive {
self.indexes.write_to_bytes(bytes);
}
pub fn map(&self) -> &Map {
&self.map
pub fn remove_documents(&self, documents: &Set<DocumentId>) -> Index {
let mut buffer = Vec::new();
let mut builder = IndexBuilder::new();
let mut stream = self.into_stream();
while let Some((key, indexes)) = stream.next() {
buffer.clear();
let op = DifferenceByKey::new(indexes, documents, |x| x.document_id, |x| *x);
op.extend_vec(&mut buffer);
if !buffer.is_empty() {
let indexes = Set::new_unchecked(&buffer);
builder.insert(key, indexes).unwrap();
}
}
builder.build()
}
pub fn indexes(&self) -> &DocIndexes {
&self.indexes
}
pub fn union(&self, other: &Positive) -> Result<Positive, Box<Error>> {
let mut builder = PositiveBuilder::memory();
pub fn union(&self, other: &Index) -> Index {
let mut builder = IndexBuilder::new();
let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union();
let mut buffer = Vec::new();
@ -63,19 +88,19 @@ impl Positive {
match ivalues {
[a, b] => {
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?;
let indexes = &indexes[a.value as usize];
let a = Set::new_unchecked(indexes);
let indexes = if b.index == 0 { &self.indexes } else { &other.indexes };
let indexes = indexes.get(b.value as usize).ok_or(format!("index not found"))?;
let indexes = &indexes[b.value as usize];
let b = Set::new_unchecked(indexes);
let op = Union::new(a, b);
op.extend_vec(&mut buffer);
},
[a] => {
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?;
[x] => {
let indexes = if x.index == 0 { &self.indexes } else { &other.indexes };
let indexes = &indexes[x.value as usize];
buffer.extend_from_slice(indexes)
},
_ => continue,
@ -83,23 +108,18 @@ impl Positive {
if !buffer.is_empty() {
let indexes = Set::new_unchecked(&buffer);
builder.insert(key, indexes)?;
builder.insert(key, indexes).unwrap();
}
}
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Ok(Positive { map, indexes })
builder.build()
}
}
impl<'m, 'a> IntoStreamer<'a> for &'m Positive {
impl<'m, 'a> IntoStreamer<'a> for &'m Index {
type Item = (&'a [u8], &'a Set<DocIndex>);
/// The type of the stream to be constructed.
type Into = Stream<'m>;
/// Construct a stream from `Self`.
fn into_stream(self) -> Self::Into {
Stream {
map_stream: self.map.into_stream(),
@ -128,28 +148,26 @@ impl<'m, 'a> Streamer<'a> for Stream<'m> {
}
}
pub struct PositiveBuilder<W, X> {
map: fst::MapBuilder<W>,
indexes: DocIndexesBuilder<X>,
pub struct IndexBuilder {
map: fst::MapBuilder<Vec<u8>>,
indexes: DocIndexesBuilder<Vec<u8>>,
value: u64,
}
impl PositiveBuilder<Vec<u8>, Vec<u8>> {
pub fn memory() -> Self {
PositiveBuilder {
impl IndexBuilder {
pub fn new() -> Self {
IndexBuilder {
map: fst::MapBuilder::memory(),
indexes: DocIndexesBuilder::memory(),
value: 0,
}
}
}
impl<W: Write, X: Write> PositiveBuilder<W, X> {
/// If a key is inserted that is less than or equal to any previous key added,
/// then an error is returned. Similarly, if there was a problem writing
/// to the underlying writer, an error is returned.
// FIXME what if one write doesn't work but the other do ?
pub fn insert<K>(&mut self, key: K, indexes: &Set<DocIndex>) -> Result<(), Box<Error>>
pub fn insert<K>(&mut self, key: K, indexes: &Set<DocIndex>) -> fst::Result<()>
where K: AsRef<[u8]>,
{
self.map.insert(key, self.value)?;
@ -158,9 +176,13 @@ impl<W: Write, X: Write> PositiveBuilder<W, X> {
Ok(())
}
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
let map = self.map.into_inner()?;
let indexes = self.indexes.into_inner()?;
Ok((map, indexes))
pub fn build(self) -> Index {
let map = self.map.into_inner().unwrap();
let indexes = self.indexes.into_inner().unwrap();
let map = Map::from_bytes(map).unwrap();
let indexes = DocIndexes::from_bytes(indexes).unwrap();
Index { map, indexes }
}
}

View File

@ -1,82 +0,0 @@
mod negative;
mod positive;
pub(crate) use self::negative::Negative;
pub(crate) use self::positive::{Positive, PositiveBuilder};
use std::error::Error;
use std::io::Cursor;
use std::sync::Arc;
use fst::{IntoStreamer, Streamer};
use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation};
use fst::Map;
use crate::data::{SharedData, DocIndexes};
#[derive(Default)]
pub struct Index {
pub(crate) negative: Negative,
pub(crate) positive: Positive,
}
impl Index {
pub fn from_bytes(bytes: Vec<u8>) -> Result<Index, Box<Error>> {
let len = bytes.len();
Index::from_shared_bytes(Arc::new(bytes), 0, len)
}
pub fn from_shared_bytes(
bytes: Arc<Vec<u8>>,
offset: usize,
len: usize,
) -> Result<Index, Box<Error>>
{
let data = SharedData::new(bytes, offset, len);
let mut cursor = Cursor::new(data);
let negative = Negative::from_cursor(&mut cursor)?;
let positive = Positive::from_cursor(&mut cursor)?;
Ok(Index { negative, positive })
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
self.negative.write_to_bytes(bytes);
self.positive.write_to_bytes(bytes);
}
pub fn merge(&self, other: &Index) -> Result<Index, Box<Error>> {
if other.negative.is_empty() {
let negative = Negative::default();
let positive = self.positive.union(&other.positive)?;
return Ok(Index { negative, positive })
}
let mut buffer = Vec::new();
let mut builder = PositiveBuilder::memory();
let mut stream = self.positive.into_stream();
while let Some((key, indexes)) = stream.next() {
let op = DifferenceByKey::new(indexes, &other.negative, |x| x.document_id, |x| *x);
buffer.clear();
op.extend_vec(&mut buffer);
if !buffer.is_empty() {
let indexes = Set::new_unchecked(&buffer);
builder.insert(key, indexes)?;
}
}
let positive = {
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
};
let negative = Negative::default();
let positive = positive.union(&other.positive)?;
Ok(Index { negative, positive })
}
}

View File

@ -1,43 +0,0 @@
use std::error::Error;
use std::io::Cursor;
use std::ops::Deref;
use sdset::Set;
use byteorder::{LittleEndian, WriteBytesExt};
use crate::data::SharedData;
use crate::data::DocIds;
use crate::DocumentId;
#[derive(Default)]
pub struct Negative(DocIds);
impl Negative {
pub fn new(doc_ids: DocIds) -> Negative {
Negative(doc_ids)
}
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Negative, Box<Error>> {
let doc_ids = DocIds::from_cursor(cursor)?;
Ok(Negative(doc_ids))
}
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let slice = self.0.as_bytes();
let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice);
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl Deref for Negative {
type Target = Set<DocumentId>;
fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

View File

@ -11,6 +11,7 @@ use std::ops::{Deref, DerefMut};
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, MergeOperands};
use size_format::SizeFormatterBinary;
use arc_swap::ArcSwap;
use lockfree::map::Map;
use hashbrown::HashMap;
@ -50,64 +51,91 @@ where D: Deref<Target=DB>
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
where D: Deref<Target=DB>
{
use self::update::ReadIndexEvent::*;
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX));
info!("loading index from kv-store took {}", elapsed);
let index = match vector? {
match vector? {
Some(vector) => {
let bytes = vector.as_ref().to_vec();
info!("index size if {} MiB", bytes.len() / 1024 / 1024);
let size = SizeFormatterBinary::new(bytes.len() as u64);
info!("index size is {}B", size);
let (elapsed, result) = elapsed::measure_time(|| {
match bincode::deserialize(&bytes)? {
RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"),
UpdatedDocuments(index) => Ok(index),
}
});
let (elapsed, index) = elapsed::measure_time(|| Index::from_bytes(bytes));
info!("loading index from bytes took {}", elapsed);
index?
result
},
None => Index::default(),
};
Ok(index)
None => Ok(Index::default()),
}
}
fn retrieve_data_ranked_map<D>(snapshot: &Snapshot<D>) -> Result<RankedMap, Box<Error>>
where D: Deref<Target=DB>,
{
match snapshot.get(DATA_RANKED_MAP)? {
Some(vector) => Ok(bincode::deserialize(&*vector)?),
use self::update::ReadRankedMapEvent::*;
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_RANKED_MAP));
info!("loading ranked map from kv-store took {}", elapsed);
match vector? {
Some(vector) => {
let bytes = vector.as_ref().to_vec();
let size = SizeFormatterBinary::new(bytes.len() as u64);
info!("ranked map size is {}B", size);
let (elapsed, result) = elapsed::measure_time(|| {
match bincode::deserialize(&bytes)? {
RemovedDocuments(_) => unreachable!("BUG: Must not extract a RemovedDocuments"),
UpdatedDocuments(ranked_map) => Ok(ranked_map),
}
});
info!("loading ranked map from bytes took {}", elapsed);
result
},
None => Ok(HashMap::new()),
}
}
fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut index: Option<Index> = None;
for bytes in existing.into_iter().chain(operands) {
let operand = Index::from_bytes(bytes.to_vec()).unwrap();
let merged = match index {
Some(ref index) => index.merge(&operand).unwrap(),
None => operand,
};
use self::update::ReadIndexEvent::*;
use self::update::WriteIndexEvent;
index.replace(merged);
let mut index = Index::default();
for bytes in existing.into_iter().chain(operands) {
match bincode::deserialize(bytes).unwrap() {
RemovedDocuments(d) => index = index.remove_documents(&d),
UpdatedDocuments(i) => index = index.union(&i),
}
}
let index = index.unwrap_or_default();
let mut bytes = Vec::new();
index.write_to_bytes(&mut bytes);
bytes
let event = WriteIndexEvent::UpdatedDocuments(&index);
bincode::serialize(&event).unwrap()
}
fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut ranked_map: Option<RankedMap> = None;
use self::update::ReadRankedMapEvent::*;
use self::update::WriteRankedMapEvent;
let mut ranked_map = RankedMap::default();
for bytes in existing.into_iter().chain(operands) {
let operand: RankedMap = bincode::deserialize(bytes).unwrap();
match ranked_map {
Some(ref mut ranked_map) => ranked_map.extend(operand),
None => { ranked_map.replace(operand); },
};
match bincode::deserialize(bytes).unwrap() {
RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.contains(k)),
UpdatedDocuments(i) => ranked_map.extend(i),
}
}
let ranked_map = ranked_map.unwrap_or_default();
bincode::serialize(&ranked_map).unwrap()
let event = WriteRankedMapEvent::UpdatedDocuments(&ranked_map);
bincode::serialize(&event).unwrap()
}
fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
@ -247,7 +275,7 @@ impl Drop for DatabaseIndex {
fn drop(&mut self) {
if self.must_die.load(Ordering::Relaxed) {
if let Err(err) = fs::remove_dir_all(&self.path) {
error!("Impossible to remove mdb when Database id dropped; {}", err);
error!("Impossible to remove mdb when Database is dropped; {}", err);
}
}
}

View File

@ -7,7 +7,6 @@ use std::sync::Arc;
use serde_derive::{Serialize, Deserialize};
use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use crate::database::serde::find_id::FindDocumentIdSerializer;
use crate::database::serde::SerializerError;
@ -168,7 +167,7 @@ impl Schema {
}
pub fn document_id<T>(&self, document: T) -> Result<DocumentId, SerializerError>
where T: Serialize,
where T: serde::Serialize,
{
let id_attribute_name = &self.inner.identifier;
let serializer = FindDocumentIdSerializer { id_attribute_name };

View File

@ -0,0 +1,17 @@
use sdset::{Set, SetBuf};
use serde_derive::{Serialize, Deserialize};
use crate::database::Index;
use crate::DocumentId;
#[derive(Serialize)]
pub enum WriteIndexEvent<'a> {
RemovedDocuments(&'a Set<DocumentId>),
UpdatedDocuments(&'a Index),
}
#[derive(Deserialize)]
pub enum ReadIndexEvent {
RemovedDocuments(SetBuf<DocumentId>),
UpdatedDocuments(Index),
}

View File

@ -3,23 +3,26 @@ use std::error::Error;
use rocksdb::rocksdb::{Writable, WriteBatch};
use hashbrown::hash_map::HashMap;
use sdset::{Set, SetBuf};
use serde::Serialize;
use fst::map::Map;
use sdset::Set;
use crate::database::index::{Positive, PositiveBuilder, Negative};
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
use crate::database::serde::serializer::Serializer;
use crate::database::serde::SerializerError;
use crate::database::schema::SchemaAttr;
use crate::tokenizer::TokenizerBuilder;
use crate::data::{DocIds, DocIndexes};
use crate::database::schema::Schema;
use crate::database::index::Index;
use crate::database::index::IndexBuilder;
use crate::database::{DATA_INDEX, DATA_RANKED_MAP};
use crate::database::{RankedMap, Number};
use crate::{DocumentId, DocIndex};
pub use self::index_event::{ReadIndexEvent, WriteIndexEvent};
pub use self::ranked_map_event::{ReadRankedMapEvent, WriteRankedMapEvent};
mod index_event;
mod ranked_map_event;
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
pub struct Update {
@ -106,46 +109,60 @@ impl RawUpdateBuilder {
}
pub fn build(self) -> Result<WriteBatch, Box<Error>> {
let negative = {
let mut removed_document_ids = Vec::new();
// create the list of all the removed documents
let removed_documents = {
let mut document_ids = Vec::new();
for (id, update_type) in self.documents_update {
if update_type == Deleted {
removed_document_ids.push(id);
document_ids.push(id);
}
}
removed_document_ids.sort_unstable();
let removed_document_ids = Set::new_unchecked(&removed_document_ids);
let doc_ids = DocIds::new(removed_document_ids);
Negative::new(doc_ids)
document_ids.sort_unstable();
SetBuf::new_unchecked(document_ids)
};
let positive = {
let mut positive_builder = PositiveBuilder::memory();
// create the Index of all the document updates
let index = {
let mut builder = IndexBuilder::new();
for (key, mut indexes) in self.indexed_words {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
positive_builder.insert(key, indexes)?;
builder.insert(key, indexes).unwrap();
}
let (map, indexes) = positive_builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
builder.build()
};
let index = Index { negative, positive };
// WARN: removed documents must absolutely
// be merged *before* document updates
// write the data-index
let mut bytes_index = Vec::new();
index.write_to_bytes(&mut bytes_index);
self.batch.merge(DATA_INDEX, &bytes_index)?;
// === index ===
let bytes_ranked_map = bincode::serialize(&self.documents_ranked_fields).unwrap();
self.batch.merge(DATA_RANKED_MAP, &bytes_ranked_map)?;
if !removed_documents.is_empty() {
// remove the documents using the appropriate IndexEvent
let event = WriteIndexEvent::RemovedDocuments(&removed_documents);
let event_bytes = bincode::serialize(&event).unwrap();
self.batch.merge(DATA_INDEX, &event_bytes)?;
}
// update the documents using the appropriate IndexEvent
let event = WriteIndexEvent::UpdatedDocuments(&index);
let event_bytes = bincode::serialize(&event).unwrap();
self.batch.merge(DATA_INDEX, &event_bytes)?;
// === ranked map ===
if !removed_documents.is_empty() {
// update the ranked map using the appropriate RankedMapEvent
let event = WriteRankedMapEvent::RemovedDocuments(&removed_documents);
let event_bytes = bincode::serialize(&event).unwrap();
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
}
// update the documents using the appropriate IndexEvent
let event = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields);
let event_bytes = bincode::serialize(&event).unwrap();
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
Ok(self.batch)
}

View File

@ -0,0 +1,17 @@
use sdset::{Set, SetBuf};
use serde_derive::{Serialize, Deserialize};
use crate::database::RankedMap;
use crate::DocumentId;
#[derive(Serialize)]
pub enum WriteRankedMapEvent<'a> {
RemovedDocuments(&'a Set<DocumentId>),
UpdatedDocuments(&'a RankedMap),
}
#[derive(Deserialize)]
pub enum ReadRankedMapEvent {
RemovedDocuments(SetBuf<DocumentId>),
UpdatedDocuments(RankedMap),
}

View File

@ -89,7 +89,7 @@ where D: Deref<Target=DB>,
let mut stream = {
let mut op_builder = fst::map::OpBuilder::new();
for automaton in &automatons {
let stream = self.view.index().positive.map().search(automaton);
let stream = self.view.index().map.search(automaton);
op_builder.push(stream);
}
op_builder.union()
@ -103,7 +103,7 @@ where D: Deref<Target=DB>,
let distance = automaton.eval(input).to_u8();
let is_exact = distance == 0 && input.len() == automaton.query_len();
let doc_indexes = &self.view.index().positive.indexes();
let doc_indexes = &self.view.index().indexes;
let doc_indexes = &doc_indexes[iv.value as usize];
for doc_index in doc_indexes {