mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-10 21:44:34 +01:00
Merge pull request #108 from Kerollmops/refactor-index
Refactor the Index and Updates
This commit is contained in:
commit
4bc14aa261
@ -5,24 +5,24 @@ version = "0.3.0"
|
|||||||
authors = ["Kerollmops <renault.cle@gmail.com>"]
|
authors = ["Kerollmops <renault.cle@gmail.com>"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
arc-swap = "0.3"
|
||||||
bincode = "1.0"
|
bincode = "1.0"
|
||||||
byteorder = "1.2"
|
byteorder = "1.2"
|
||||||
arc-swap = "0.3"
|
|
||||||
elapsed = "0.1"
|
|
||||||
fst = "0.3"
|
fst = "0.3"
|
||||||
hashbrown = { version = "0.1", features = ["serde"] }
|
hashbrown = { version = "0.1", features = ["serde"] }
|
||||||
lazy_static = "1.1"
|
lazy_static = "1.1"
|
||||||
levenshtein_automata = { version = "0.1", features = ["fst_automaton"] }
|
levenshtein_automata = { version = "0.1", features = ["fst_automaton"] }
|
||||||
linked-hash-map = { version = "0.5", features = ["serde_impl"] }
|
linked-hash-map = { version = "0.5", features = ["serde_impl"] }
|
||||||
|
lockfree = "0.5"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
rayon = "1.0"
|
||||||
sdset = "0.3"
|
sdset = "0.3"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
serde_json = { version = "1.0", features = ["preserve_order"] }
|
serde_json = { version = "1.0", features = ["preserve_order"] }
|
||||||
|
size_format = "1.0"
|
||||||
slice-group-by = "0.2"
|
slice-group-by = "0.2"
|
||||||
unidecode = "0.3"
|
unidecode = "0.3"
|
||||||
rayon = "1.0"
|
|
||||||
lockfree = "0.5.1"
|
|
||||||
|
|
||||||
[dependencies.toml]
|
[dependencies.toml]
|
||||||
git = "https://github.com/Kerollmops/toml-rs.git"
|
git = "https://github.com/Kerollmops/toml-rs.git"
|
||||||
|
@ -4,6 +4,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::io::{self, BufRead, BufReader};
|
use std::io::{self, BufRead, BufReader};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::time::Instant;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -124,14 +125,13 @@ fn main() -> Result<(), Box<Error>> {
|
|||||||
None => HashSet::new(),
|
None => HashSet::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let (elapsed, result) = elapsed::measure_time(|| {
|
let start = Instant::now();
|
||||||
index(schema, &opt.database_path, &opt.csv_data_path, opt.update_group_size, &stop_words)
|
let result = index(schema, &opt.database_path, &opt.csv_data_path, opt.update_group_size, &stop_words);
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
return Err(e.into())
|
return Err(e.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("database created in {} at: {:?}", elapsed, opt.database_path);
|
println!("database created in {:.2?} at: {:?}", start.elapsed(), opt.database_path);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
|||||||
use std::collections::btree_map::{BTreeMap, Entry};
|
use std::collections::btree_map::{BTreeMap, Entry};
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
|
use std::time::Instant;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
||||||
@ -102,9 +103,9 @@ fn main() -> Result<(), Box<Error>> {
|
|||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let opt = Opt::from_args();
|
let opt = Opt::from_args();
|
||||||
|
|
||||||
let (elapsed, result) = elapsed::measure_time(|| Database::open(&opt.database_path));
|
let start = Instant::now();
|
||||||
let database = result?;
|
let database = Database::open(&opt.database_path)?;
|
||||||
println!("database prepared for you in {}", elapsed);
|
println!("database prepared for you in {:.2?}", start.elapsed());
|
||||||
|
|
||||||
let mut buffer = String::new();
|
let mut buffer = String::new();
|
||||||
let input = io::stdin();
|
let input = io::stdin();
|
||||||
@ -119,10 +120,10 @@ fn main() -> Result<(), Box<Error>> {
|
|||||||
let view = database.view("default")?;
|
let view = database.view("default")?;
|
||||||
let schema = view.schema();
|
let schema = view.schema();
|
||||||
|
|
||||||
let (elapsed, documents) = elapsed::measure_time(|| {
|
let start = Instant::now();
|
||||||
|
|
||||||
let builder = view.query_builder().unwrap();
|
let builder = view.query_builder().unwrap();
|
||||||
builder.query(query, 0..opt.number_results)
|
let documents = builder.query(query, 0..opt.number_results);
|
||||||
});
|
|
||||||
|
|
||||||
let number_of_documents = documents.len();
|
let number_of_documents = documents.len();
|
||||||
for doc in documents {
|
for doc in documents {
|
||||||
@ -160,7 +161,7 @@ fn main() -> Result<(), Box<Error>> {
|
|||||||
println!();
|
println!();
|
||||||
}
|
}
|
||||||
|
|
||||||
eprintln!("===== Found {} results in {} =====", number_of_documents, elapsed);
|
eprintln!("===== Found {} results in {:.2?} =====", number_of_documents, start.elapsed());
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,12 +1,15 @@
|
|||||||
use std::io::{self, Cursor, BufRead};
|
|
||||||
use std::slice::from_raw_parts;
|
use std::slice::from_raw_parts;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use sdset::Set;
|
use sdset::Set;
|
||||||
|
|
||||||
use crate::DocumentId;
|
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
use crate::data::SharedData;
|
use crate::data::SharedData;
|
||||||
|
use crate::DocumentId;
|
||||||
|
|
||||||
use super::into_u8_slice;
|
use super::into_u8_slice;
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
@ -19,21 +22,6 @@ impl DocIds {
|
|||||||
DocIds(data)
|
DocIds(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> io::Result<DocIds> {
|
|
||||||
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
|
||||||
let offset = cursor.position() as usize;
|
|
||||||
let doc_ids = cursor.get_ref().range(offset, len);
|
|
||||||
cursor.consume(len);
|
|
||||||
|
|
||||||
Ok(DocIds(doc_ids))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
|
||||||
let len = self.0.len() as u64;
|
|
||||||
bytes.write_u64::<LittleEndian>(len).unwrap();
|
|
||||||
bytes.extend_from_slice(&self.0);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
self.0.is_empty()
|
self.0.is_empty()
|
||||||
}
|
}
|
||||||
@ -52,3 +40,22 @@ impl AsRef<Set<DocumentId>> for DocIds {
|
|||||||
Set::new_unchecked(slice)
|
Set::new_unchecked(slice)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FromSharedDataCursor for DocIds {
|
||||||
|
type Error = Box<Error>;
|
||||||
|
|
||||||
|
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<DocIds, Self::Error> {
|
||||||
|
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
||||||
|
let data = cursor.extract(len);
|
||||||
|
|
||||||
|
Ok(DocIds(data))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriteToBytes for DocIds {
|
||||||
|
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
||||||
|
let len = self.0.len() as u64;
|
||||||
|
bytes.write_u64::<LittleEndian>(len).unwrap();
|
||||||
|
bytes.extend_from_slice(&self.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,14 +1,16 @@
|
|||||||
use std::io::{self, Write, Cursor, BufRead};
|
use std::io::{self, Write};
|
||||||
use std::slice::from_raw_parts;
|
use std::slice::from_raw_parts;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
use std::ops::Index;
|
use std::ops::Index;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use sdset::Set;
|
use sdset::Set;
|
||||||
|
|
||||||
use crate::DocIndex;
|
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
use crate::data::SharedData;
|
use crate::data::SharedData;
|
||||||
|
use crate::DocIndex;
|
||||||
|
|
||||||
use super::into_u8_slice;
|
use super::into_u8_slice;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -25,38 +27,6 @@ pub struct DocIndexes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DocIndexes {
|
impl DocIndexes {
|
||||||
pub fn from_bytes(bytes: Vec<u8>) -> io::Result<DocIndexes> {
|
|
||||||
let bytes = Arc::new(bytes);
|
|
||||||
let len = bytes.len();
|
|
||||||
let data = SharedData::new(bytes, 0, len);
|
|
||||||
let mut cursor = Cursor::new(data);
|
|
||||||
DocIndexes::from_cursor(&mut cursor)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> io::Result<DocIndexes> {
|
|
||||||
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
|
||||||
let offset = cursor.position() as usize;
|
|
||||||
let ranges = cursor.get_ref().range(offset, len);
|
|
||||||
cursor.consume(len);
|
|
||||||
|
|
||||||
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
|
||||||
let offset = cursor.position() as usize;
|
|
||||||
let indexes = cursor.get_ref().range(offset, len);
|
|
||||||
cursor.consume(len);
|
|
||||||
|
|
||||||
Ok(DocIndexes { ranges, indexes })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
|
||||||
let ranges_len = self.ranges.len() as u64;
|
|
||||||
let _ = bytes.write_u64::<LittleEndian>(ranges_len);
|
|
||||||
bytes.extend_from_slice(&self.ranges);
|
|
||||||
|
|
||||||
let indexes_len = self.indexes.len() as u64;
|
|
||||||
let _ = bytes.write_u64::<LittleEndian>(indexes_len);
|
|
||||||
bytes.extend_from_slice(&self.indexes);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get(&self, index: usize) -> Option<&Set<DocIndex>> {
|
pub fn get(&self, index: usize) -> Option<&Set<DocIndex>> {
|
||||||
self.ranges().get(index).map(|Range { start, end }| {
|
self.ranges().get(index).map(|Range { start, end }| {
|
||||||
let start = *start as usize;
|
let start = *start as usize;
|
||||||
@ -92,6 +62,32 @@ impl Index<usize> for DocIndexes {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FromSharedDataCursor for DocIndexes {
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<DocIndexes, Self::Error> {
|
||||||
|
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
||||||
|
let ranges = cursor.extract(len);
|
||||||
|
|
||||||
|
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
||||||
|
let indexes = cursor.extract(len);
|
||||||
|
|
||||||
|
Ok(DocIndexes { ranges, indexes })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriteToBytes for DocIndexes {
|
||||||
|
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
||||||
|
let ranges_len = self.ranges.len() as u64;
|
||||||
|
let _ = bytes.write_u64::<LittleEndian>(ranges_len);
|
||||||
|
bytes.extend_from_slice(&self.ranges);
|
||||||
|
|
||||||
|
let indexes_len = self.indexes.len() as u64;
|
||||||
|
let _ = bytes.write_u64::<LittleEndian>(indexes_len);
|
||||||
|
bytes.extend_from_slice(&self.indexes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct DocIndexesBuilder<W> {
|
pub struct DocIndexesBuilder<W> {
|
||||||
ranges: Vec<Range>,
|
ranges: Vec<Range>,
|
||||||
indexes: Vec<DocIndex>,
|
indexes: Vec<DocIndex>,
|
||||||
|
@ -1,55 +1,13 @@
|
|||||||
mod doc_ids;
|
mod doc_ids;
|
||||||
mod doc_indexes;
|
mod doc_indexes;
|
||||||
|
mod shared_data;
|
||||||
|
|
||||||
use std::slice::from_raw_parts;
|
use std::slice::from_raw_parts;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
use std::ops::Deref;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
pub use self::doc_ids::DocIds;
|
pub use self::doc_ids::DocIds;
|
||||||
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder};
|
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder};
|
||||||
|
pub use self::shared_data::SharedData;
|
||||||
#[derive(Default, Clone)]
|
|
||||||
pub struct SharedData {
|
|
||||||
pub bytes: Arc<Vec<u8>>,
|
|
||||||
pub offset: usize,
|
|
||||||
pub len: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SharedData {
|
|
||||||
pub fn from_bytes(vec: Vec<u8>) -> SharedData {
|
|
||||||
let len = vec.len();
|
|
||||||
let bytes = Arc::new(vec);
|
|
||||||
SharedData::new(bytes, 0, len)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> SharedData {
|
|
||||||
SharedData { bytes, offset, len }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn range(&self, offset: usize, len: usize) -> SharedData {
|
|
||||||
assert!(offset + len <= self.len);
|
|
||||||
SharedData {
|
|
||||||
bytes: self.bytes.clone(),
|
|
||||||
offset: self.offset + offset,
|
|
||||||
len: len,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for SharedData {
|
|
||||||
type Target = [u8];
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
self.as_ref()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsRef<[u8]> for SharedData {
|
|
||||||
fn as_ref(&self) -> &[u8] {
|
|
||||||
&self.bytes[self.offset..self.offset + self.len]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn into_u8_slice<T: Sized>(slice: &[T]) -> &[u8] {
|
unsafe fn into_u8_slice<T: Sized>(slice: &[T]) -> &[u8] {
|
||||||
let ptr = slice.as_ptr() as *const u8;
|
let ptr = slice.as_ptr() as *const u8;
|
||||||
|
48
src/data/shared_data.rs
Normal file
48
src/data/shared_data.rs
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::ops::Deref;
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub struct SharedData {
|
||||||
|
pub bytes: Arc<Vec<u8>>,
|
||||||
|
pub offset: usize,
|
||||||
|
pub len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SharedData {
|
||||||
|
pub fn from_bytes(vec: Vec<u8>) -> SharedData {
|
||||||
|
let len = vec.len();
|
||||||
|
let bytes = Arc::from(vec);
|
||||||
|
SharedData::new(bytes, 0, len)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> SharedData {
|
||||||
|
SharedData { bytes, offset, len }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn as_slice(&self) -> &[u8] {
|
||||||
|
&self.bytes[self.offset..self.offset + self.len]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn range(&self, offset: usize, len: usize) -> SharedData {
|
||||||
|
assert!(offset + len <= self.len);
|
||||||
|
SharedData {
|
||||||
|
bytes: self.bytes.clone(),
|
||||||
|
offset: self.offset + offset,
|
||||||
|
len: len,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for SharedData {
|
||||||
|
type Target = [u8];
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.as_slice()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8]> for SharedData {
|
||||||
|
fn as_ref(&self) -> &[u8] {
|
||||||
|
self.as_slice()
|
||||||
|
}
|
||||||
|
}
|
@ -1,60 +1,45 @@
|
|||||||
use std::io::{Write, BufRead, Cursor};
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
||||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use fst::{map, Map, Streamer, IntoStreamer};
|
use fst::{map, Map, IntoStreamer, Streamer};
|
||||||
use sdset::{Set, SetOperation};
|
|
||||||
use sdset::duo::Union;
|
|
||||||
use fst::raw::Fst;
|
use fst::raw::Fst;
|
||||||
|
use sdset::duo::{Union, DifferenceByKey};
|
||||||
|
use sdset::{Set, SetOperation};
|
||||||
|
|
||||||
|
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
use crate::data::{DocIndexes, DocIndexesBuilder};
|
use crate::data::{DocIndexes, DocIndexesBuilder};
|
||||||
use crate::data::SharedData;
|
use crate::{DocumentId, DocIndex};
|
||||||
use crate::DocIndex;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct Positive {
|
pub struct Index {
|
||||||
map: Map,
|
pub map: Map,
|
||||||
indexes: DocIndexes,
|
pub indexes: DocIndexes,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Positive {
|
impl Index {
|
||||||
pub fn new(map: Map, indexes: DocIndexes) -> Positive {
|
pub fn remove_documents(&self, documents: &Set<DocumentId>) -> Index {
|
||||||
Positive { map, indexes }
|
let mut buffer = Vec::new();
|
||||||
|
let mut builder = IndexBuilder::new();
|
||||||
|
let mut stream = self.into_stream();
|
||||||
|
|
||||||
|
while let Some((key, indexes)) = stream.next() {
|
||||||
|
buffer.clear();
|
||||||
|
|
||||||
|
let op = DifferenceByKey::new(indexes, documents, |x| x.document_id, |x| *x);
|
||||||
|
op.extend_vec(&mut buffer);
|
||||||
|
|
||||||
|
if !buffer.is_empty() {
|
||||||
|
let indexes = Set::new_unchecked(&buffer);
|
||||||
|
builder.insert(key, indexes).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Positive, Box<Error>> {
|
builder.build()
|
||||||
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
|
||||||
let offset = cursor.position() as usize;
|
|
||||||
let data = cursor.get_ref().range(offset, len);
|
|
||||||
|
|
||||||
let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?;
|
|
||||||
let map = Map::from(fst);
|
|
||||||
cursor.consume(len);
|
|
||||||
|
|
||||||
let indexes = DocIndexes::from_cursor(cursor)?;
|
|
||||||
|
|
||||||
Ok(Positive { map, indexes})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
pub fn union(&self, other: &Index) -> Index {
|
||||||
let slice = self.map.as_fst().as_bytes();
|
let mut builder = IndexBuilder::new();
|
||||||
let len = slice.len() as u64;
|
|
||||||
let _ = bytes.write_u64::<LittleEndian>(len);
|
|
||||||
bytes.extend_from_slice(slice);
|
|
||||||
|
|
||||||
self.indexes.write_to_bytes(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn map(&self) -> &Map {
|
|
||||||
&self.map
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn indexes(&self) -> &DocIndexes {
|
|
||||||
&self.indexes
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn union(&self, other: &Positive) -> Result<Positive, Box<Error>> {
|
|
||||||
let mut builder = PositiveBuilder::memory();
|
|
||||||
let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union();
|
let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union();
|
||||||
|
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
@ -63,19 +48,19 @@ impl Positive {
|
|||||||
match ivalues {
|
match ivalues {
|
||||||
[a, b] => {
|
[a, b] => {
|
||||||
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
|
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
|
||||||
let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?;
|
let indexes = &indexes[a.value as usize];
|
||||||
let a = Set::new_unchecked(indexes);
|
let a = Set::new_unchecked(indexes);
|
||||||
|
|
||||||
let indexes = if b.index == 0 { &self.indexes } else { &other.indexes };
|
let indexes = if b.index == 0 { &self.indexes } else { &other.indexes };
|
||||||
let indexes = indexes.get(b.value as usize).ok_or(format!("index not found"))?;
|
let indexes = &indexes[b.value as usize];
|
||||||
let b = Set::new_unchecked(indexes);
|
let b = Set::new_unchecked(indexes);
|
||||||
|
|
||||||
let op = Union::new(a, b);
|
let op = Union::new(a, b);
|
||||||
op.extend_vec(&mut buffer);
|
op.extend_vec(&mut buffer);
|
||||||
},
|
},
|
||||||
[a] => {
|
[x] => {
|
||||||
let indexes = if a.index == 0 { &self.indexes } else { &other.indexes };
|
let indexes = if x.index == 0 { &self.indexes } else { &other.indexes };
|
||||||
let indexes = indexes.get(a.value as usize).ok_or(format!("index not found"))?;
|
let indexes = &indexes[x.value as usize];
|
||||||
buffer.extend_from_slice(indexes)
|
buffer.extend_from_slice(indexes)
|
||||||
},
|
},
|
||||||
_ => continue,
|
_ => continue,
|
||||||
@ -83,23 +68,45 @@ impl Positive {
|
|||||||
|
|
||||||
if !buffer.is_empty() {
|
if !buffer.is_empty() {
|
||||||
let indexes = Set::new_unchecked(&buffer);
|
let indexes = Set::new_unchecked(&buffer);
|
||||||
builder.insert(key, indexes)?;
|
builder.insert(key, indexes).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (map, indexes) = builder.into_inner()?;
|
builder.build()
|
||||||
let map = Map::from_bytes(map)?;
|
|
||||||
let indexes = DocIndexes::from_bytes(indexes)?;
|
|
||||||
Ok(Positive { map, indexes })
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'m, 'a> IntoStreamer<'a> for &'m Positive {
|
impl FromSharedDataCursor for Index {
|
||||||
|
type Error = Box<Error>;
|
||||||
|
|
||||||
|
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Index, Self::Error> {
|
||||||
|
let len = cursor.read_u64::<LittleEndian>()? as usize;
|
||||||
|
let data = cursor.extract(len);
|
||||||
|
|
||||||
|
let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?;
|
||||||
|
let map = Map::from(fst);
|
||||||
|
|
||||||
|
let indexes = DocIndexes::from_shared_data_cursor(cursor)?;
|
||||||
|
|
||||||
|
Ok(Index { map, indexes})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriteToBytes for Index {
|
||||||
|
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
||||||
|
let slice = self.map.as_fst().as_bytes();
|
||||||
|
let len = slice.len() as u64;
|
||||||
|
let _ = bytes.write_u64::<LittleEndian>(len);
|
||||||
|
bytes.extend_from_slice(slice);
|
||||||
|
|
||||||
|
self.indexes.write_to_bytes(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'m, 'a> IntoStreamer<'a> for &'m Index {
|
||||||
type Item = (&'a [u8], &'a Set<DocIndex>);
|
type Item = (&'a [u8], &'a Set<DocIndex>);
|
||||||
/// The type of the stream to be constructed.
|
|
||||||
type Into = Stream<'m>;
|
type Into = Stream<'m>;
|
||||||
|
|
||||||
/// Construct a stream from `Self`.
|
|
||||||
fn into_stream(self) -> Self::Into {
|
fn into_stream(self) -> Self::Into {
|
||||||
Stream {
|
Stream {
|
||||||
map_stream: self.map.into_stream(),
|
map_stream: self.map.into_stream(),
|
||||||
@ -128,28 +135,26 @@ impl<'m, 'a> Streamer<'a> for Stream<'m> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PositiveBuilder<W, X> {
|
pub struct IndexBuilder {
|
||||||
map: fst::MapBuilder<W>,
|
map: fst::MapBuilder<Vec<u8>>,
|
||||||
indexes: DocIndexesBuilder<X>,
|
indexes: DocIndexesBuilder<Vec<u8>>,
|
||||||
value: u64,
|
value: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PositiveBuilder<Vec<u8>, Vec<u8>> {
|
impl IndexBuilder {
|
||||||
pub fn memory() -> Self {
|
pub fn new() -> Self {
|
||||||
PositiveBuilder {
|
IndexBuilder {
|
||||||
map: fst::MapBuilder::memory(),
|
map: fst::MapBuilder::memory(),
|
||||||
indexes: DocIndexesBuilder::memory(),
|
indexes: DocIndexesBuilder::memory(),
|
||||||
value: 0,
|
value: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<W: Write, X: Write> PositiveBuilder<W, X> {
|
|
||||||
/// If a key is inserted that is less than or equal to any previous key added,
|
/// If a key is inserted that is less than or equal to any previous key added,
|
||||||
/// then an error is returned. Similarly, if there was a problem writing
|
/// then an error is returned. Similarly, if there was a problem writing
|
||||||
/// to the underlying writer, an error is returned.
|
/// to the underlying writer, an error is returned.
|
||||||
// FIXME what if one write doesn't work but the other do ?
|
// FIXME what if one write doesn't work but the other do ?
|
||||||
pub fn insert<K>(&mut self, key: K, indexes: &Set<DocIndex>) -> Result<(), Box<Error>>
|
pub fn insert<K>(&mut self, key: K, indexes: &Set<DocIndex>) -> fst::Result<()>
|
||||||
where K: AsRef<[u8]>,
|
where K: AsRef<[u8]>,
|
||||||
{
|
{
|
||||||
self.map.insert(key, self.value)?;
|
self.map.insert(key, self.value)?;
|
||||||
@ -158,9 +163,13 @@ impl<W: Write, X: Write> PositiveBuilder<W, X> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
|
pub fn build(self) -> Index {
|
||||||
let map = self.map.into_inner()?;
|
let map = self.map.into_inner().unwrap();
|
||||||
let indexes = self.indexes.into_inner()?;
|
let indexes = self.indexes.into_inner().unwrap();
|
||||||
Ok((map, indexes))
|
|
||||||
|
let map = Map::from_bytes(map).unwrap();
|
||||||
|
let indexes = DocIndexes::from_bytes(indexes).unwrap();
|
||||||
|
|
||||||
|
Index { map, indexes }
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,82 +0,0 @@
|
|||||||
mod negative;
|
|
||||||
mod positive;
|
|
||||||
|
|
||||||
pub(crate) use self::negative::Negative;
|
|
||||||
pub(crate) use self::positive::{Positive, PositiveBuilder};
|
|
||||||
|
|
||||||
use std::error::Error;
|
|
||||||
use std::io::Cursor;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use fst::{IntoStreamer, Streamer};
|
|
||||||
use sdset::duo::DifferenceByKey;
|
|
||||||
use sdset::{Set, SetOperation};
|
|
||||||
use fst::Map;
|
|
||||||
|
|
||||||
use crate::data::{SharedData, DocIndexes};
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Index {
|
|
||||||
pub(crate) negative: Negative,
|
|
||||||
pub(crate) positive: Positive,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Index {
|
|
||||||
pub fn from_bytes(bytes: Vec<u8>) -> Result<Index, Box<Error>> {
|
|
||||||
let len = bytes.len();
|
|
||||||
Index::from_shared_bytes(Arc::new(bytes), 0, len)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_shared_bytes(
|
|
||||||
bytes: Arc<Vec<u8>>,
|
|
||||||
offset: usize,
|
|
||||||
len: usize,
|
|
||||||
) -> Result<Index, Box<Error>>
|
|
||||||
{
|
|
||||||
let data = SharedData::new(bytes, offset, len);
|
|
||||||
let mut cursor = Cursor::new(data);
|
|
||||||
|
|
||||||
let negative = Negative::from_cursor(&mut cursor)?;
|
|
||||||
let positive = Positive::from_cursor(&mut cursor)?;
|
|
||||||
Ok(Index { negative, positive })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
|
||||||
self.negative.write_to_bytes(bytes);
|
|
||||||
self.positive.write_to_bytes(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn merge(&self, other: &Index) -> Result<Index, Box<Error>> {
|
|
||||||
if other.negative.is_empty() {
|
|
||||||
let negative = Negative::default();
|
|
||||||
let positive = self.positive.union(&other.positive)?;
|
|
||||||
return Ok(Index { negative, positive })
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
let mut builder = PositiveBuilder::memory();
|
|
||||||
let mut stream = self.positive.into_stream();
|
|
||||||
while let Some((key, indexes)) = stream.next() {
|
|
||||||
let op = DifferenceByKey::new(indexes, &other.negative, |x| x.document_id, |x| *x);
|
|
||||||
|
|
||||||
buffer.clear();
|
|
||||||
op.extend_vec(&mut buffer);
|
|
||||||
|
|
||||||
if !buffer.is_empty() {
|
|
||||||
let indexes = Set::new_unchecked(&buffer);
|
|
||||||
builder.insert(key, indexes)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let positive = {
|
|
||||||
let (map, indexes) = builder.into_inner()?;
|
|
||||||
let map = Map::from_bytes(map)?;
|
|
||||||
let indexes = DocIndexes::from_bytes(indexes)?;
|
|
||||||
Positive::new(map, indexes)
|
|
||||||
};
|
|
||||||
|
|
||||||
let negative = Negative::default();
|
|
||||||
let positive = positive.union(&other.positive)?;
|
|
||||||
Ok(Index { negative, positive })
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,43 +0,0 @@
|
|||||||
use std::error::Error;
|
|
||||||
use std::io::Cursor;
|
|
||||||
use std::ops::Deref;
|
|
||||||
|
|
||||||
use sdset::Set;
|
|
||||||
use byteorder::{LittleEndian, WriteBytesExt};
|
|
||||||
|
|
||||||
use crate::data::SharedData;
|
|
||||||
use crate::data::DocIds;
|
|
||||||
use crate::DocumentId;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Negative(DocIds);
|
|
||||||
|
|
||||||
impl Negative {
|
|
||||||
pub fn new(doc_ids: DocIds) -> Negative {
|
|
||||||
Negative(doc_ids)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Negative, Box<Error>> {
|
|
||||||
let doc_ids = DocIds::from_cursor(cursor)?;
|
|
||||||
Ok(Negative(doc_ids))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
|
||||||
let slice = self.0.as_bytes();
|
|
||||||
let len = slice.len() as u64;
|
|
||||||
let _ = bytes.write_u64::<LittleEndian>(len);
|
|
||||||
bytes.extend_from_slice(slice);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
|
||||||
self.0.is_empty()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for Negative {
|
|
||||||
type Target = Set<DocumentId>;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
self.0.as_ref()
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,8 +1,7 @@
|
|||||||
use crate::DocumentId;
|
use std::time::Instant;
|
||||||
use crate::database::schema::SchemaAttr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@ -11,11 +10,19 @@ use std::ops::{Deref, DerefMut};
|
|||||||
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
|
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
|
||||||
use rocksdb::rocksdb::{Writable, Snapshot};
|
use rocksdb::rocksdb::{Writable, Snapshot};
|
||||||
use rocksdb::{DB, MergeOperands};
|
use rocksdb::{DB, MergeOperands};
|
||||||
|
use size_format::SizeFormatterBinary;
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
use lockfree::map::Map;
|
use lockfree::map::Map;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use log::{info, error, warn};
|
use log::{info, error, warn};
|
||||||
|
|
||||||
|
use crate::database::schema::SchemaAttr;
|
||||||
|
use crate::shared_data_cursor::FromSharedDataCursor;
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
|
use crate::DocumentId;
|
||||||
|
|
||||||
|
use self::update::{ReadIndexEvent, ReadRankedMapEvent};
|
||||||
|
|
||||||
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
||||||
pub use self::view::{DatabaseView, DocumentIter};
|
pub use self::view::{DatabaseView, DocumentIter};
|
||||||
pub use self::update::Update;
|
pub use self::update::Update;
|
||||||
@ -50,64 +57,81 @@ where D: Deref<Target=DB>
|
|||||||
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
|
fn retrieve_data_index<D>(snapshot: &Snapshot<D>) -> Result<Index, Box<Error>>
|
||||||
where D: Deref<Target=DB>
|
where D: Deref<Target=DB>
|
||||||
{
|
{
|
||||||
let (elapsed, vector) = elapsed::measure_time(|| snapshot.get(DATA_INDEX));
|
let start = Instant::now();
|
||||||
info!("loading index from kv-store took {}", elapsed);
|
let vector = snapshot.get(DATA_INDEX)?;
|
||||||
|
info!("loading index from kv-store took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
let index = match vector? {
|
match vector {
|
||||||
Some(vector) => {
|
Some(vector) => {
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
let bytes = vector.as_ref().to_vec();
|
let bytes = vector.as_ref().to_vec();
|
||||||
info!("index size if {} MiB", bytes.len() / 1024 / 1024);
|
info!("index size is {}B", SizeFormatterBinary::new(bytes.len() as u64));
|
||||||
|
|
||||||
let (elapsed, index) = elapsed::measure_time(|| Index::from_bytes(bytes));
|
let event = ReadIndexEvent::from_bytes(bytes)?;
|
||||||
info!("loading index from bytes took {}", elapsed);
|
let index = event.updated_documents().expect("BUG: invalid event deserialized");
|
||||||
index?
|
|
||||||
|
|
||||||
},
|
info!("loading index from bytes took {:.2?}", start.elapsed());
|
||||||
None => Index::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
|
},
|
||||||
|
None => Ok(Index::default()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retrieve_data_ranked_map<D>(snapshot: &Snapshot<D>) -> Result<RankedMap, Box<Error>>
|
fn retrieve_data_ranked_map<D>(snapshot: &Snapshot<D>) -> Result<RankedMap, Box<Error>>
|
||||||
where D: Deref<Target=DB>,
|
where D: Deref<Target=DB>,
|
||||||
{
|
{
|
||||||
match snapshot.get(DATA_RANKED_MAP)? {
|
let start = Instant::now();
|
||||||
Some(vector) => Ok(bincode::deserialize(&*vector)?),
|
let vector = snapshot.get(DATA_RANKED_MAP)?;
|
||||||
None => Ok(HashMap::new()),
|
info!("loading ranked map from kv-store took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
|
match vector {
|
||||||
|
Some(vector) => {
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
let bytes = vector.as_ref().to_vec();
|
||||||
|
info!("ranked map size is {}B", SizeFormatterBinary::new(bytes.len() as u64));
|
||||||
|
|
||||||
|
let event = ReadRankedMapEvent::from_bytes(bytes)?;
|
||||||
|
let ranked_map = event.updated_documents().expect("BUG: invalid event deserialized");
|
||||||
|
|
||||||
|
info!("loading ranked map from bytes took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
|
Ok(ranked_map)
|
||||||
|
},
|
||||||
|
None => Ok(RankedMap::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
fn merge_indexes(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
||||||
let mut index: Option<Index> = None;
|
use self::update::ReadIndexEvent::{self, *};
|
||||||
for bytes in existing.into_iter().chain(operands) {
|
use self::update::WriteIndexEvent;
|
||||||
let operand = Index::from_bytes(bytes.to_vec()).unwrap();
|
|
||||||
let merged = match index {
|
|
||||||
Some(ref index) => index.merge(&operand).unwrap(),
|
|
||||||
None => operand,
|
|
||||||
};
|
|
||||||
|
|
||||||
index.replace(merged);
|
let mut index = Index::default();
|
||||||
|
for bytes in existing.into_iter().chain(operands) {
|
||||||
|
match ReadIndexEvent::from_bytes(bytes.to_vec()).unwrap() {
|
||||||
|
RemovedDocuments(d) => index = index.remove_documents(d.as_ref()),
|
||||||
|
UpdatedDocuments(i) => index = index.union(&i),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let index = index.unwrap_or_default();
|
WriteIndexEvent::UpdatedDocuments(&index).into_bytes()
|
||||||
let mut bytes = Vec::new();
|
|
||||||
index.write_to_bytes(&mut bytes);
|
|
||||||
bytes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
fn merge_ranked_maps(existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
||||||
let mut ranked_map: Option<RankedMap> = None;
|
use self::update::ReadRankedMapEvent::{self, *};
|
||||||
|
use self::update::WriteRankedMapEvent;
|
||||||
|
|
||||||
|
let mut ranked_map = RankedMap::default();
|
||||||
for bytes in existing.into_iter().chain(operands) {
|
for bytes in existing.into_iter().chain(operands) {
|
||||||
let operand: RankedMap = bincode::deserialize(bytes).unwrap();
|
match ReadRankedMapEvent::from_bytes(bytes.to_vec()).unwrap() {
|
||||||
match ranked_map {
|
RemovedDocuments(d) => ranked_map.retain(|(k, _), _| !d.as_ref().binary_search(k).is_ok()),
|
||||||
Some(ref mut ranked_map) => ranked_map.extend(operand),
|
UpdatedDocuments(i) => ranked_map.extend(i),
|
||||||
None => { ranked_map.replace(operand); },
|
}
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let ranked_map = ranked_map.unwrap_or_default();
|
WriteRankedMapEvent::UpdatedDocuments(&ranked_map).into_bytes()
|
||||||
bincode::serialize(&ranked_map).unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
fn merge_operator(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
|
||||||
@ -247,7 +271,7 @@ impl Drop for DatabaseIndex {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.must_die.load(Ordering::Relaxed) {
|
if self.must_die.load(Ordering::Relaxed) {
|
||||||
if let Err(err) = fs::remove_dir_all(&self.path) {
|
if let Err(err) = fs::remove_dir_all(&self.path) {
|
||||||
error!("Impossible to remove mdb when Database id dropped; {}", err);
|
error!("Impossible to remove mdb when Database is dropped; {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use serde_derive::{Serialize, Deserialize};
|
use serde_derive::{Serialize, Deserialize};
|
||||||
use linked_hash_map::LinkedHashMap;
|
use linked_hash_map::LinkedHashMap;
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
use crate::database::serde::find_id::FindDocumentIdSerializer;
|
use crate::database::serde::find_id::FindDocumentIdSerializer;
|
||||||
use crate::database::serde::SerializerError;
|
use crate::database::serde::SerializerError;
|
||||||
@ -168,7 +167,7 @@ impl Schema {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn document_id<T>(&self, document: T) -> Result<DocumentId, SerializerError>
|
pub fn document_id<T>(&self, document: T) -> Result<DocumentId, SerializerError>
|
||||||
where T: Serialize,
|
where T: serde::Serialize,
|
||||||
{
|
{
|
||||||
let id_attribute_name = &self.inner.identifier;
|
let id_attribute_name = &self.inner.identifier;
|
||||||
let serializer = FindDocumentIdSerializer { id_attribute_name };
|
let serializer = FindDocumentIdSerializer { id_attribute_name };
|
||||||
|
55
src/database/update/index_event.rs
Normal file
55
src/database/update/index_event.rs
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
use byteorder::{ReadBytesExt, WriteBytesExt};
|
||||||
|
|
||||||
|
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
|
use crate::database::Index;
|
||||||
|
use crate::data::DocIds;
|
||||||
|
|
||||||
|
pub enum WriteIndexEvent<'a> {
|
||||||
|
RemovedDocuments(&'a DocIds),
|
||||||
|
UpdatedDocuments(&'a Index),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> WriteToBytes for WriteIndexEvent<'a> {
|
||||||
|
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
||||||
|
match self {
|
||||||
|
WriteIndexEvent::RemovedDocuments(doc_ids) => {
|
||||||
|
let _ = bytes.write_u8(0);
|
||||||
|
doc_ids.write_to_bytes(bytes);
|
||||||
|
},
|
||||||
|
WriteIndexEvent::UpdatedDocuments(index) => {
|
||||||
|
let _ = bytes.write_u8(1);
|
||||||
|
index.write_to_bytes(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ReadIndexEvent {
|
||||||
|
RemovedDocuments(DocIds),
|
||||||
|
UpdatedDocuments(Index),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadIndexEvent {
|
||||||
|
pub fn updated_documents(self) -> Option<Index> {
|
||||||
|
use ReadIndexEvent::*;
|
||||||
|
match self {
|
||||||
|
RemovedDocuments(_) => None,
|
||||||
|
UpdatedDocuments(index) => Some(index),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromSharedDataCursor for ReadIndexEvent {
|
||||||
|
type Error = Box<Error>;
|
||||||
|
|
||||||
|
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Self, Self::Error> {
|
||||||
|
match cursor.read_u8()? {
|
||||||
|
0 => DocIds::from_shared_data_cursor(cursor).map(ReadIndexEvent::RemovedDocuments),
|
||||||
|
1 => Index::from_shared_data_cursor(cursor).map(ReadIndexEvent::UpdatedDocuments),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -3,23 +3,28 @@ use std::error::Error;
|
|||||||
|
|
||||||
use rocksdb::rocksdb::{Writable, WriteBatch};
|
use rocksdb::rocksdb::{Writable, WriteBatch};
|
||||||
use hashbrown::hash_map::HashMap;
|
use hashbrown::hash_map::HashMap;
|
||||||
|
use sdset::{Set, SetBuf};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use fst::map::Map;
|
|
||||||
use sdset::Set;
|
|
||||||
|
|
||||||
use crate::database::index::{Positive, PositiveBuilder, Negative};
|
|
||||||
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
|
use crate::database::document_key::{DocumentKey, DocumentKeyAttr};
|
||||||
use crate::database::serde::serializer::Serializer;
|
use crate::database::serde::serializer::Serializer;
|
||||||
use crate::database::serde::SerializerError;
|
use crate::database::serde::SerializerError;
|
||||||
use crate::database::schema::SchemaAttr;
|
use crate::database::schema::SchemaAttr;
|
||||||
use crate::tokenizer::TokenizerBuilder;
|
|
||||||
use crate::data::{DocIds, DocIndexes};
|
|
||||||
use crate::database::schema::Schema;
|
use crate::database::schema::Schema;
|
||||||
use crate::database::index::Index;
|
use crate::database::index::IndexBuilder;
|
||||||
use crate::database::{DATA_INDEX, DATA_RANKED_MAP};
|
use crate::database::{DATA_INDEX, DATA_RANKED_MAP};
|
||||||
use crate::database::{RankedMap, Number};
|
use crate::database::{RankedMap, Number};
|
||||||
|
use crate::tokenizer::TokenizerBuilder;
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
|
use crate::data::DocIds;
|
||||||
use crate::{DocumentId, DocIndex};
|
use crate::{DocumentId, DocIndex};
|
||||||
|
|
||||||
|
pub use self::index_event::{ReadIndexEvent, WriteIndexEvent};
|
||||||
|
pub use self::ranked_map_event::{ReadRankedMapEvent, WriteRankedMapEvent};
|
||||||
|
|
||||||
|
mod index_event;
|
||||||
|
mod ranked_map_event;
|
||||||
|
|
||||||
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
|
pub type Token = Vec<u8>; // TODO could be replaced by a SmallVec
|
||||||
|
|
||||||
pub struct Update {
|
pub struct Update {
|
||||||
@ -106,46 +111,57 @@ impl RawUpdateBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn build(self) -> Result<WriteBatch, Box<Error>> {
|
pub fn build(self) -> Result<WriteBatch, Box<Error>> {
|
||||||
let negative = {
|
// create the list of all the removed documents
|
||||||
let mut removed_document_ids = Vec::new();
|
let removed_documents = {
|
||||||
|
let mut document_ids = Vec::new();
|
||||||
for (id, update_type) in self.documents_update {
|
for (id, update_type) in self.documents_update {
|
||||||
if update_type == Deleted {
|
if update_type == Deleted {
|
||||||
removed_document_ids.push(id);
|
document_ids.push(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
removed_document_ids.sort_unstable();
|
document_ids.sort_unstable();
|
||||||
let removed_document_ids = Set::new_unchecked(&removed_document_ids);
|
let setbuf = SetBuf::new_unchecked(document_ids);
|
||||||
let doc_ids = DocIds::new(removed_document_ids);
|
DocIds::new(&setbuf)
|
||||||
|
|
||||||
Negative::new(doc_ids)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let positive = {
|
// create the Index of all the document updates
|
||||||
let mut positive_builder = PositiveBuilder::memory();
|
let index = {
|
||||||
|
let mut builder = IndexBuilder::new();
|
||||||
for (key, mut indexes) in self.indexed_words {
|
for (key, mut indexes) in self.indexed_words {
|
||||||
indexes.sort_unstable();
|
indexes.sort_unstable();
|
||||||
let indexes = Set::new_unchecked(&indexes);
|
let indexes = Set::new_unchecked(&indexes);
|
||||||
positive_builder.insert(key, indexes)?;
|
builder.insert(key, indexes).unwrap();
|
||||||
}
|
}
|
||||||
|
builder.build()
|
||||||
let (map, indexes) = positive_builder.into_inner()?;
|
|
||||||
let map = Map::from_bytes(map)?;
|
|
||||||
let indexes = DocIndexes::from_bytes(indexes)?;
|
|
||||||
|
|
||||||
Positive::new(map, indexes)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let index = Index { negative, positive };
|
// WARN: removed documents must absolutely
|
||||||
|
// be merged *before* document updates
|
||||||
|
|
||||||
// write the data-index
|
// === index ===
|
||||||
let mut bytes_index = Vec::new();
|
|
||||||
index.write_to_bytes(&mut bytes_index);
|
|
||||||
self.batch.merge(DATA_INDEX, &bytes_index)?;
|
|
||||||
|
|
||||||
let bytes_ranked_map = bincode::serialize(&self.documents_ranked_fields).unwrap();
|
if !removed_documents.is_empty() {
|
||||||
self.batch.merge(DATA_RANKED_MAP, &bytes_ranked_map)?;
|
// remove the documents using the appropriate IndexEvent
|
||||||
|
let event_bytes = WriteIndexEvent::RemovedDocuments(&removed_documents).into_bytes();
|
||||||
|
self.batch.merge(DATA_INDEX, &event_bytes)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// update the documents using the appropriate IndexEvent
|
||||||
|
let event_bytes = WriteIndexEvent::UpdatedDocuments(&index).into_bytes();
|
||||||
|
self.batch.merge(DATA_INDEX, &event_bytes)?;
|
||||||
|
|
||||||
|
// === ranked map ===
|
||||||
|
|
||||||
|
if !removed_documents.is_empty() {
|
||||||
|
// update the ranked map using the appropriate RankedMapEvent
|
||||||
|
let event_bytes = WriteRankedMapEvent::RemovedDocuments(&removed_documents).into_bytes();
|
||||||
|
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// update the documents using the appropriate IndexEvent
|
||||||
|
let event_bytes = WriteRankedMapEvent::UpdatedDocuments(&self.documents_ranked_fields).into_bytes();
|
||||||
|
self.batch.merge(DATA_RANKED_MAP, &event_bytes)?;
|
||||||
|
|
||||||
Ok(self.batch)
|
Ok(self.batch)
|
||||||
}
|
}
|
58
src/database/update/ranked_map_event.rs
Normal file
58
src/database/update/ranked_map_event.rs
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
use byteorder::{ReadBytesExt, WriteBytesExt};
|
||||||
|
|
||||||
|
use crate::shared_data_cursor::{SharedDataCursor, FromSharedDataCursor};
|
||||||
|
use crate::write_to_bytes::WriteToBytes;
|
||||||
|
use crate::database::RankedMap;
|
||||||
|
use crate::data::DocIds;
|
||||||
|
|
||||||
|
pub enum WriteRankedMapEvent<'a> {
|
||||||
|
RemovedDocuments(&'a DocIds),
|
||||||
|
UpdatedDocuments(&'a RankedMap),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> WriteToBytes for WriteRankedMapEvent<'a> {
|
||||||
|
fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
|
||||||
|
match self {
|
||||||
|
WriteRankedMapEvent::RemovedDocuments(doc_ids) => {
|
||||||
|
let _ = bytes.write_u8(0);
|
||||||
|
doc_ids.write_to_bytes(bytes);
|
||||||
|
},
|
||||||
|
WriteRankedMapEvent::UpdatedDocuments(ranked_map) => {
|
||||||
|
let _ = bytes.write_u8(1);
|
||||||
|
bincode::serialize_into(bytes, ranked_map).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ReadRankedMapEvent {
|
||||||
|
RemovedDocuments(DocIds),
|
||||||
|
UpdatedDocuments(RankedMap),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadRankedMapEvent {
|
||||||
|
pub fn updated_documents(self) -> Option<RankedMap> {
|
||||||
|
use ReadRankedMapEvent::*;
|
||||||
|
match self {
|
||||||
|
RemovedDocuments(_) => None,
|
||||||
|
UpdatedDocuments(ranked_map) => Some(ranked_map),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromSharedDataCursor for ReadRankedMapEvent {
|
||||||
|
type Error = Box<Error>;
|
||||||
|
|
||||||
|
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Self, Self::Error> {
|
||||||
|
match cursor.read_u8()? {
|
||||||
|
0 => DocIds::from_shared_data_cursor(cursor).map(ReadRankedMapEvent::RemovedDocuments),
|
||||||
|
1 => {
|
||||||
|
let ranked_map = bincode::deserialize_from(cursor)?;
|
||||||
|
Ok(ReadRankedMapEvent::UpdatedDocuments(ranked_map))
|
||||||
|
},
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -6,6 +6,8 @@ pub mod data;
|
|||||||
pub mod rank;
|
pub mod rank;
|
||||||
pub mod tokenizer;
|
pub mod tokenizer;
|
||||||
mod common_words;
|
mod common_words;
|
||||||
|
mod shared_data_cursor;
|
||||||
|
mod write_to_bytes;
|
||||||
|
|
||||||
use serde_derive::{Serialize, Deserialize};
|
use serde_derive::{Serialize, Deserialize};
|
||||||
|
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
use std::{cmp, mem, vec, str, char};
|
use std::{cmp, mem, vec, str, char};
|
||||||
use std::ops::{Deref, Range};
|
use std::ops::{Deref, Range};
|
||||||
|
use std::time::Instant;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
use rayon::slice::ParallelSliceMut;
|
use rayon::slice::ParallelSliceMut;
|
||||||
use slice_group_by::GroupByMut;
|
use slice_group_by::GroupByMut;
|
||||||
use elapsed::measure_time;
|
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use fst::Streamer;
|
use fst::Streamer;
|
||||||
use rocksdb::DB;
|
use rocksdb::DB;
|
||||||
@ -89,7 +89,7 @@ where D: Deref<Target=DB>,
|
|||||||
let mut stream = {
|
let mut stream = {
|
||||||
let mut op_builder = fst::map::OpBuilder::new();
|
let mut op_builder = fst::map::OpBuilder::new();
|
||||||
for automaton in &automatons {
|
for automaton in &automatons {
|
||||||
let stream = self.view.index().positive.map().search(automaton);
|
let stream = self.view.index().map.search(automaton);
|
||||||
op_builder.push(stream);
|
op_builder.push(stream);
|
||||||
}
|
}
|
||||||
op_builder.union()
|
op_builder.union()
|
||||||
@ -103,7 +103,7 @@ where D: Deref<Target=DB>,
|
|||||||
let distance = automaton.eval(input).to_u8();
|
let distance = automaton.eval(input).to_u8();
|
||||||
let is_exact = distance == 0 && input.len() == automaton.query_len();
|
let is_exact = distance == 0 && input.len() == automaton.query_len();
|
||||||
|
|
||||||
let doc_indexes = &self.view.index().positive.indexes();
|
let doc_indexes = &self.view.index().indexes;
|
||||||
let doc_indexes = &doc_indexes[iv.value as usize];
|
let doc_indexes = &doc_indexes[iv.value as usize];
|
||||||
|
|
||||||
for doc_index in doc_indexes {
|
for doc_index in doc_indexes {
|
||||||
@ -143,8 +143,9 @@ where D: Deref<Target=DB>,
|
|||||||
return builder.query(query, range);
|
return builder.query(query, range);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (elapsed, mut documents) = measure_time(|| self.query_all(query));
|
let start = Instant::now();
|
||||||
info!("query_all took {}", elapsed);
|
let mut documents = self.query_all(query);
|
||||||
|
info!("query_all took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
let mut groups = vec![documents.as_mut_slice()];
|
let mut groups = vec![documents.as_mut_slice()];
|
||||||
|
|
||||||
@ -163,10 +164,9 @@ where D: Deref<Target=DB>,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let (elapsed, _) = measure_time(|| {
|
let start = Instant::now();
|
||||||
group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b));
|
group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b));
|
||||||
});
|
info!("criterion {} sort took {:.2?}", ci, start.elapsed());
|
||||||
info!("criterion {} sort took {}", ci, elapsed);
|
|
||||||
|
|
||||||
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
|
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
|
||||||
documents_seen += group.len();
|
documents_seen += group.len();
|
||||||
@ -214,8 +214,9 @@ where D: Deref<Target=DB>,
|
|||||||
K: Hash + Eq,
|
K: Hash + Eq,
|
||||||
{
|
{
|
||||||
pub fn query(self, query: &str, range: Range<usize>) -> Vec<Document> {
|
pub fn query(self, query: &str, range: Range<usize>) -> Vec<Document> {
|
||||||
let (elapsed, mut documents) = measure_time(|| self.inner.query_all(query));
|
let start = Instant::now();
|
||||||
info!("query_all took {}", elapsed);
|
let mut documents = self.inner.query_all(query);
|
||||||
|
info!("query_all took {:.2?}", start.elapsed());
|
||||||
|
|
||||||
let mut groups = vec![documents.as_mut_slice()];
|
let mut groups = vec![documents.as_mut_slice()];
|
||||||
let mut key_cache = HashMap::new();
|
let mut key_cache = HashMap::new();
|
||||||
@ -244,10 +245,9 @@ where D: Deref<Target=DB>,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let (elapsed, _) = measure_time(|| {
|
let start = Instant::now();
|
||||||
group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b));
|
group.par_sort_unstable_by(|a, b| criterion.evaluate(a, b));
|
||||||
});
|
info!("criterion {} sort took {:.2?}", ci, start.elapsed());
|
||||||
info!("criterion {} sort took {}", ci, elapsed);
|
|
||||||
|
|
||||||
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
|
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
|
||||||
// we must compute the real distinguished len of this sub-group
|
// we must compute the real distinguished len of this sub-group
|
||||||
|
56
src/shared_data_cursor.rs
Normal file
56
src/shared_data_cursor.rs
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
use std::io::{self, Read, Cursor, BufRead};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use crate::data::SharedData;
|
||||||
|
|
||||||
|
pub struct SharedDataCursor(Cursor<SharedData>);
|
||||||
|
|
||||||
|
impl SharedDataCursor {
|
||||||
|
pub fn from_bytes(bytes: Vec<u8>) -> SharedDataCursor {
|
||||||
|
let len = bytes.len();
|
||||||
|
let bytes = Arc::new(bytes);
|
||||||
|
|
||||||
|
SharedDataCursor::from_shared_bytes(bytes, 0, len)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> SharedDataCursor {
|
||||||
|
let data = SharedData::new(bytes, offset, len);
|
||||||
|
let cursor = Cursor::new(data);
|
||||||
|
|
||||||
|
SharedDataCursor(cursor)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn extract(&mut self, amt: usize) -> SharedData {
|
||||||
|
let offset = self.0.position() as usize;
|
||||||
|
let extracted = self.0.get_ref().range(offset, amt);
|
||||||
|
self.0.consume(amt);
|
||||||
|
|
||||||
|
extracted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for SharedDataCursor {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
self.0.read(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufRead for SharedDataCursor {
|
||||||
|
fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
||||||
|
self.0.fill_buf()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(&mut self, amt: usize) {
|
||||||
|
self.0.consume(amt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait FromSharedDataCursor: Sized {
|
||||||
|
type Error;
|
||||||
|
|
||||||
|
fn from_shared_data_cursor(cursor: &mut SharedDataCursor) -> Result<Self, Self::Error>;
|
||||||
|
|
||||||
|
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error> {
|
||||||
|
let mut cursor = SharedDataCursor::from_bytes(bytes);
|
||||||
|
Self::from_shared_data_cursor(&mut cursor)
|
||||||
|
}
|
||||||
|
}
|
9
src/write_to_bytes.rs
Normal file
9
src/write_to_bytes.rs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
pub trait WriteToBytes {
|
||||||
|
fn write_to_bytes(&self, bytes: &mut Vec<u8>);
|
||||||
|
|
||||||
|
fn into_bytes(&self) -> Vec<u8> {
|
||||||
|
let mut bytes = Vec::new();
|
||||||
|
self.write_to_bytes(&mut bytes);
|
||||||
|
bytes
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user