feat: Introduce a working key-value based database

This commit is contained in:
Clément Renault 2018-11-22 15:44:51 +01:00
parent 86f23d2695
commit 66dac923bf
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
10 changed files with 217 additions and 94 deletions

View File

@ -15,6 +15,7 @@ sdset = "0.2"
serde = "1.0"
serde_derive = "1.0"
unidecode = "0.3"
uuid = { version = "0.7", features = ["serde", "v4"] }
[dependencies.fst]
git = "https://github.com/Kerollmops/fst.git"

View File

@ -1,13 +1,15 @@
use std::fs;
use std::path::Path;
use std::error::Error;
use std::path::PathBuf;
use std::io::{self, Write};
use elapsed::measure_time;
use moby_name_gen::random_name;
use structopt::StructOpt;
use pentium::index::update::Update;
use pentium::index::schema::{Schema, SchemaBuilder, STORED, INDEXED};
use pentium::index::update::{Update, PositiveUpdateBuilder};
use pentium::tokenizer::DefaultBuilder;
use pentium::index::Index;
#[derive(Debug, StructOpt)]
@ -17,8 +19,47 @@ pub struct Cmd {
pub csv_file: PathBuf,
}
fn generate_update_from_csv(path: &Path) -> Result<Update, Box<Error>> {
unimplemented!()
fn generate_update_from_csv(path: &Path) -> Result<(Schema, Update), Box<Error>> {
let mut csv = csv::Reader::from_path(path)?;
let mut attributes = Vec::new();
let (schema, id_attr_index) = {
let mut id_attr_index = None;
let mut builder = SchemaBuilder::new();
for (i, header_name) in csv.headers()?.iter().enumerate() {
// FIXME this does not disallow multiple "id" fields
if header_name == "id" { id_attr_index = Some(i) };
let field = builder.new_attribute(header_name, STORED | INDEXED);
attributes.push(field);
}
let id = match id_attr_index {
Some(index) => index,
None => return Err(String::from("No \"id\" field found which is mandatory").into()),
};
(builder.build(), id)
};
let update_path = PathBuf::from("./positive-update-xxx.sst");
let tokenizer_builder = DefaultBuilder::new();
let mut builder = PositiveUpdateBuilder::new(&update_path, schema.clone(), tokenizer_builder);
for record in csv.records() {
let record = match record {
Ok(x) => x,
Err(e) => { eprintln!("{:?}", e); continue }
};
let id = record.into_iter().nth(id_attr_index).unwrap().parse()?;
for (value, attr) in record.into_iter().zip(&attributes) {
builder.update_field(id, *attr, value.to_string());
}
}
builder.build().map(|update| (schema, update))
}
fn main() -> Result<(), Box<Error>> {
@ -27,14 +68,19 @@ fn main() -> Result<(), Box<Error>> {
let path = random_name();
println!("generating the update...");
let update = generate_update_from_csv(&command.csv_file)?;
let (schema, update) = generate_update_from_csv(&command.csv_file)?;
println!("creating the index");
let index = Index::open(&path)?;
let index = Index::create(&path, schema)?;
println!("ingesting the changes in the index");
index.ingest_update(update)?;
// FIXME this is really ugly !!!!
// the index does not support moving update files
// so we must remove it by hand
fs::remove_file("./positive-update-xxx.sst")?;
println!("the index {:?} has been created!", path);
Ok(())

View File

@ -29,7 +29,7 @@ fn main() -> Result<(), Box<Error>> {
let (elapsed, result) = measure_time(|| index.search(&query));
match result {
Ok(documents) => {
// display documents here !
println!("{:?}", documents);
println!("Finished in {}", elapsed)
},
Err(e) => panic!("{}", e),

View File

@ -9,7 +9,13 @@ pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder};
pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder};
use std::error::Error;
use std::io::{Write, Read};
use std::{io, fmt, mem};
use fst::Map;
use uuid::Uuid;
use rocksdb::rocksdb::{DB, Snapshot};
use crate::data::DocIndexes;
pub enum Blob {
@ -26,14 +32,14 @@ impl Blob {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Sign {
Positive,
Negative,
}
impl Sign {
pub fn alternate(self) -> Sign {
pub fn invert(self) -> Sign {
match self {
Sign::Positive => Sign::Negative,
Sign::Negative => Sign::Positive,
@ -41,6 +47,95 @@ impl Sign {
}
}
pub fn ordered_blobs_from_slice(slice: &[u8]) -> Result<Vec<Blob>, Box<Error>> {
unimplemented!()
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct BlobName(Uuid);
impl BlobName {
pub fn new() -> BlobName {
BlobName(Uuid::new_v4())
}
}
impl fmt::Display for BlobName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("BlobName")
.field(&self.0.to_hyphenated().to_string())
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct BlobInfo {
pub sign: Sign,
pub name: BlobName,
}
impl BlobInfo {
pub fn new_positive() -> BlobInfo {
BlobInfo {
sign: Sign::Positive,
name: BlobName::new(),
}
}
pub fn new_negative() -> BlobInfo {
BlobInfo {
sign: Sign::Negative,
name: BlobName::new(),
}
}
pub fn read_from<R: Read>(reader: R) -> bincode::Result<BlobInfo> {
bincode::deserialize_from(reader)
}
pub fn read_from_slice(slice: &[u8]) -> bincode::Result<Vec<BlobInfo>> {
let len = slice.len() / mem::size_of::<BlobInfo>();
let mut blob_infos = Vec::with_capacity(len);
let mut cursor = io::Cursor::new(slice);
while blob_infos.len() != len {
let blob_info = BlobInfo::read_from(&mut cursor)?;
blob_infos.push(blob_info);
}
Ok(blob_infos)
}
pub fn write_into<W: Write>(&self, writer: W) -> bincode::Result<()> {
bincode::serialize_into(writer, self)
}
}
pub fn blobs_from_blob_infos(infos: &[BlobInfo], snapshot: &Snapshot<&DB>) -> Result<Vec<Blob>, Box<Error>> {
let mut blobs = Vec::with_capacity(infos.len());
for info in infos {
let blob = match info.sign {
Sign::Positive => {
let key_map = format!("blob-{}-fst", info.name);
let map = match snapshot.get(key_map.as_bytes())? {
Some(value) => value.to_vec(),
None => return Err(format!("No fst entry found for blob {}", info.name).into()),
};
let key_doc_idx = format!("blob-{}-doc-idx", info.name);
let doc_idx = match snapshot.get(key_doc_idx.as_bytes())? {
Some(value) => value.to_vec(),
None => return Err(format!("No doc-idx entry found for blob {}", info.name).into()),
};
PositiveBlob::from_bytes(map, doc_idx).map(Blob::Positive)?
},
Sign::Negative => {
let key_doc_ids = format!("blob-{}-doc-ids", info.name);
let doc_ids = match snapshot.get(key_doc_ids.as_bytes())? {
Some(value) => value.to_vec(),
None => return Err(format!("No doc-ids entry found for blob {}", info.name).into()),
};
NegativeBlob::from_bytes(doc_ids).map(Blob::Negative)?
},
};
blobs.push(blob);
}
Ok(blobs)
}

View File

@ -1,16 +0,0 @@
use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BlobName;
impl BlobName {
pub fn new() -> BlobName {
unimplemented!()
}
}
impl fmt::Display for BlobName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
unimplemented!()
}
}

View File

@ -1,4 +1,3 @@
pub mod blob_name;
pub mod schema;
pub mod update;
@ -21,8 +20,7 @@ use crate::data::DocIdsBuilder;
use crate::{DocIndex, DocumentId};
use crate::index::schema::Schema;
use crate::index::update::Update;
use crate::blob::{PositiveBlobBuilder, Blob, Sign};
use crate::blob::ordered_blobs_from_slice;
use crate::blob::{PositiveBlobBuilder, BlobInfo, Sign, Blob, blobs_from_blob_infos};
use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer};
use crate::rank::{criterion, Config, RankedStream};
use crate::automaton;
@ -112,12 +110,14 @@ impl Index {
}
pub fn search(&self, query: &str) -> Result<Vec<Document>, Box<Error>> {
// this snapshot will allow consistent operations on documents
// this snapshot will allow consistent reads for the whole search operation
let snapshot = self.database.snapshot();
// FIXME create a SNAPSHOT for the search !
let blobs = match snapshot.get(DATA_BLOBS_ORDER.as_bytes())? {
Some(value) => ordered_blobs_from_slice(&value)?,
Some(value) => {
let blob_infos = BlobInfo::read_from_slice(&value)?;
blobs_from_blob_infos(&blob_infos, &snapshot)?
},
None => Vec::new(),
};
@ -143,7 +143,7 @@ mod tests {
use tempfile::NamedTempFile;
use super::*;
use crate::index::schema::Schema;
use crate::index::schema::{Schema, SchemaBuilder, STORED, INDEXED};
use crate::index::update::{PositiveUpdateBuilder, NegativeUpdateBuilder};
#[test]
@ -151,7 +151,8 @@ mod tests {
let path = NamedTempFile::new()?.into_temp_path();
let mut builder = NegativeUpdateBuilder::new(&path);
// you can insert documents in any order, it is sorted internally
// you can insert documents in any order,
// it is sorted internally
builder.remove(1);
builder.remove(5);
builder.remove(2);
@ -165,19 +166,26 @@ mod tests {
#[test]
fn generate_positive_update() -> Result<(), Box<Error>> {
let title;
let description;
let schema = {
let mut builder = SchemaBuilder::new();
title = builder.new_attribute("title", STORED | INDEXED);
description = builder.new_attribute("description", STORED | INDEXED);
builder.build()
};
let schema = Schema::open("/meili/default.sch")?;
let sst_path = NamedTempFile::new()?.into_temp_path();
let tokenizer_builder = DefaultBuilder::new();
let mut builder = PositiveUpdateBuilder::new("update-positive-0001.sst", schema.clone(), tokenizer_builder);
let mut builder = PositiveUpdateBuilder::new(&sst_path, schema.clone(), tokenizer_builder);
// you can insert documents in any order, it is sorted internally
let title_field = schema.attribute("title").unwrap();
builder.update_field(1, title_field, "hallo!".to_owned());
builder.update_field(5, title_field, "hello!".to_owned());
builder.update_field(2, title_field, "hi!".to_owned());
// you can insert documents in any order,
// it is sorted internally
builder.update_field(1, title, "hallo!".to_owned());
builder.update_field(5, title, "hello!".to_owned());
builder.update_field(2, title, "hi!".to_owned());
let name_field = schema.attribute("name").unwrap();
builder.remove_field(4, name_field);
builder.remove_field(4, description);
let update = builder.build()?;

View File

@ -46,9 +46,11 @@ impl SchemaBuilder {
SchemaBuilder { attrs: LinkedHashMap::new() }
}
pub fn new_field<S: Into<String>>(&mut self, name: S, props: SchemaProps) -> SchemaAttr {
pub fn new_attribute<S: Into<String>>(&mut self, name: S, props: SchemaProps) -> SchemaAttr {
let len = self.attrs.len();
self.attrs.insert(name.into(), props);
if self.attrs.insert(name.into(), props).is_some() {
panic!("Field already inserted.")
}
SchemaAttr(len as u32)
}
@ -119,7 +121,7 @@ impl SchemaAttr {
impl fmt::Display for SchemaAttr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
self.0.fmt(f)
}
}
@ -130,9 +132,9 @@ mod tests {
#[test]
fn serialize_deserialize() -> bincode::Result<()> {
let mut builder = SchemaBuilder::new();
builder.new_field("alphabet", STORED);
builder.new_field("beta", STORED | INDEXED);
builder.new_field("gamma", INDEXED);
builder.new_attribute("alphabet", STORED);
builder.new_attribute("beta", STORED | INDEXED);
builder.new_attribute("gamma", INDEXED);
let schema = builder.build();
let mut buffer = Vec::new();

View File

@ -3,8 +3,7 @@ use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::index::blob_name::BlobName;
use crate::blob::Sign;
use crate::blob::{BlobName, Sign};
mod negative_update;
mod positive_update;
@ -18,17 +17,7 @@ pub struct Update {
impl Update {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Update, Box<Error>> {
let path = path.into();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&path.to_string_lossy())?;
let infos = file_writer.finish()?;
// FIXME check if the update contains a blobs-order entry
Ok(Update { path })
Ok(Update { path: path.into() })
}
pub fn into_path_buf(self) -> PathBuf {

View File

@ -3,9 +3,9 @@ use std::error::Error;
use ::rocksdb::rocksdb_options;
use crate::blob::BlobInfo;
use crate::index::DATA_BLOBS_ORDER;
use crate::index::update::Update;
use crate::index::blob_name::BlobName;
use crate::data::DocIdsBuilder;
use crate::DocumentId;
@ -27,30 +27,28 @@ impl NegativeUpdateBuilder {
}
pub fn build(self) -> Result<Update, Box<Error>> {
let blob_name = BlobName::new();
let blob_info = BlobInfo::new_negative();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?;
// TODO the blob-name must be written in bytes (16 bytes)
// along with the sign
unimplemented!("write the blob sign and name");
// write the blob name to be merged
let blob_name = blob_name.to_string();
file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?;
// write the doc ids
let blob_key = format!("BLOB-{}-doc-ids", blob_name);
let blob_key = format!("blob-{}-doc-ids", blob_info.name);
let blob_doc_ids = self.doc_ids.into_inner()?;
file_writer.put(blob_key.as_bytes(), &blob_doc_ids)?;
{
// write the blob name to be merged
let mut buffer = Vec::new();
blob_info.write_into(&mut buffer);
file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), &buffer)?;
}
for id in blob_doc_ids {
let start = format!("DOCU-{}", id);
let end = format!("DOCU-{}", id + 1);
let start = format!("docu-{}", id);
let end = format!("docu-{}", id + 1);
file_writer.delete_range(start.as_bytes(), end.as_bytes())?;
}

View File

@ -7,10 +7,9 @@ use ::rocksdb::rocksdb_options;
use crate::index::DATA_BLOBS_ORDER;
use crate::index::update::Update;
use crate::index::blob_name::BlobName;
use crate::index::schema::{SchemaProps, Schema, SchemaAttr};
use crate::tokenizer::TokenizerBuilder;
use crate::blob::PositiveBlobBuilder;
use crate::blob::{BlobInfo, PositiveBlobBuilder};
use crate::{DocIndex, DocumentId};
pub enum NewState {
@ -53,7 +52,7 @@ impl<B> PositiveUpdateBuilder<B>
where B: TokenizerBuilder
{
pub fn build(self) -> Result<Update, Box<Error>> {
let blob_name = BlobName::new();
let blob_info = BlobInfo::new_positive();
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
@ -61,14 +60,6 @@ where B: TokenizerBuilder
file_writer.open(&self.path.to_string_lossy())?;
// TODO the blob-name must be written in bytes (16 bytes)
// along with the sign
unimplemented!("write the blob sign and name");
// write the blob name to be merged
let blob_name = blob_name.to_string();
file_writer.put(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?;
let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());
for ((document_id, field), state) in &self.new_states {
let value = match state {
@ -96,18 +87,27 @@ where B: TokenizerBuilder
}
let (blob_fst_map, blob_doc_idx) = builder.into_inner()?;
// write the fst
let blob_key = format!("BLOB-{}-fst", blob_name);
file_writer.put(blob_key.as_bytes(), &blob_fst_map)?;
// write the doc-idx
let blob_key = format!("BLOB-{}-doc-idx", blob_name);
let blob_key = format!("blob-{}-doc-idx", blob_info.name);
file_writer.put(blob_key.as_bytes(), &blob_doc_idx)?;
// write the fst
let blob_key = format!("blob-{}-fst", blob_info.name);
file_writer.put(blob_key.as_bytes(), &blob_fst_map)?;
{
// write the blob name to be merged
let mut buffer = Vec::new();
blob_info.write_into(&mut buffer);
file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), &buffer)?;
}
// write all the documents fields updates
let mut key = String::from("DOCU-");
let mut key = String::from("docu-");
let prefix_len = key.len();
// FIXME write numbers in bytes not decimal representation
for ((id, field), state) in self.new_states {
key.truncate(prefix_len);
write!(&mut key, "{}-{}", id, field)?;