diff --git a/Cargo.toml b/Cargo.toml index 79d404b8c..eff4dac16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ sdset = "0.2" serde = "1.0" serde_derive = "1.0" unidecode = "0.3" +uuid = { version = "0.7", features = ["serde", "v4"] } [dependencies.fst] git = "https://github.com/Kerollmops/fst.git" diff --git a/examples/create-index.rs b/examples/create-index.rs index 84a8b73b6..1fc8e7682 100644 --- a/examples/create-index.rs +++ b/examples/create-index.rs @@ -1,13 +1,15 @@ +use std::fs; use std::path::Path; use std::error::Error; use std::path::PathBuf; -use std::io::{self, Write}; use elapsed::measure_time; use moby_name_gen::random_name; use structopt::StructOpt; -use pentium::index::update::Update; +use pentium::index::schema::{Schema, SchemaBuilder, STORED, INDEXED}; +use pentium::index::update::{Update, PositiveUpdateBuilder}; +use pentium::tokenizer::DefaultBuilder; use pentium::index::Index; #[derive(Debug, StructOpt)] @@ -17,8 +19,47 @@ pub struct Cmd { pub csv_file: PathBuf, } -fn generate_update_from_csv(path: &Path) -> Result> { - unimplemented!() +fn generate_update_from_csv(path: &Path) -> Result<(Schema, Update), Box> { + let mut csv = csv::Reader::from_path(path)?; + + let mut attributes = Vec::new(); + let (schema, id_attr_index) = { + let mut id_attr_index = None; + let mut builder = SchemaBuilder::new(); + + for (i, header_name) in csv.headers()?.iter().enumerate() { + // FIXME this does not disallow multiple "id" fields + if header_name == "id" { id_attr_index = Some(i) }; + + let field = builder.new_attribute(header_name, STORED | INDEXED); + attributes.push(field); + } + + let id = match id_attr_index { + Some(index) => index, + None => return Err(String::from("No \"id\" field found which is mandatory").into()), + }; + + (builder.build(), id) + }; + + let update_path = PathBuf::from("./positive-update-xxx.sst"); + let tokenizer_builder = DefaultBuilder::new(); + let mut builder = PositiveUpdateBuilder::new(&update_path, schema.clone(), tokenizer_builder); + + for record in csv.records() { + let record = match record { + Ok(x) => x, + Err(e) => { eprintln!("{:?}", e); continue } + }; + + let id = record.into_iter().nth(id_attr_index).unwrap().parse()?; + for (value, attr) in record.into_iter().zip(&attributes) { + builder.update_field(id, *attr, value.to_string()); + } + } + + builder.build().map(|update| (schema, update)) } fn main() -> Result<(), Box> { @@ -27,14 +68,19 @@ fn main() -> Result<(), Box> { let path = random_name(); println!("generating the update..."); - let update = generate_update_from_csv(&command.csv_file)?; + let (schema, update) = generate_update_from_csv(&command.csv_file)?; println!("creating the index"); - let index = Index::open(&path)?; + let index = Index::create(&path, schema)?; println!("ingesting the changes in the index"); index.ingest_update(update)?; + // FIXME this is really ugly !!!! + // the index does not support moving update files + // so we must remove it by hand + fs::remove_file("./positive-update-xxx.sst")?; + println!("the index {:?} has been created!", path); Ok(()) diff --git a/examples/index-search.rs b/examples/search-index.rs similarity index 95% rename from examples/index-search.rs rename to examples/search-index.rs index 87b4c4195..c8d55c74f 100644 --- a/examples/index-search.rs +++ b/examples/search-index.rs @@ -29,7 +29,7 @@ fn main() -> Result<(), Box> { let (elapsed, result) = measure_time(|| index.search(&query)); match result { Ok(documents) => { - // display documents here ! + println!("{:?}", documents); println!("Finished in {}", elapsed) }, Err(e) => panic!("{}", e), diff --git a/src/blob/mod.rs b/src/blob/mod.rs index b8cb5d9e0..d00514a93 100644 --- a/src/blob/mod.rs +++ b/src/blob/mod.rs @@ -9,7 +9,13 @@ pub use self::positive_blob::{PositiveBlob, PositiveBlobBuilder}; pub use self::negative_blob::{NegativeBlob, NegativeBlobBuilder}; use std::error::Error; +use std::io::{Write, Read}; +use std::{io, fmt, mem}; + use fst::Map; +use uuid::Uuid; +use rocksdb::rocksdb::{DB, Snapshot}; + use crate::data::DocIndexes; pub enum Blob { @@ -26,14 +32,14 @@ impl Blob { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Sign { Positive, Negative, } impl Sign { - pub fn alternate(self) -> Sign { + pub fn invert(self) -> Sign { match self { Sign::Positive => Sign::Negative, Sign::Negative => Sign::Positive, @@ -41,6 +47,95 @@ impl Sign { } } -pub fn ordered_blobs_from_slice(slice: &[u8]) -> Result, Box> { - unimplemented!() +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct BlobName(Uuid); + +impl BlobName { + pub fn new() -> BlobName { + BlobName(Uuid::new_v4()) + } +} + +impl fmt::Display for BlobName { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("BlobName") + .field(&self.0.to_hyphenated().to_string()) + .finish() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct BlobInfo { + pub sign: Sign, + pub name: BlobName, +} + +impl BlobInfo { + pub fn new_positive() -> BlobInfo { + BlobInfo { + sign: Sign::Positive, + name: BlobName::new(), + } + } + + pub fn new_negative() -> BlobInfo { + BlobInfo { + sign: Sign::Negative, + name: BlobName::new(), + } + } + + pub fn read_from(reader: R) -> bincode::Result { + bincode::deserialize_from(reader) + } + + pub fn read_from_slice(slice: &[u8]) -> bincode::Result> { + let len = slice.len() / mem::size_of::(); + let mut blob_infos = Vec::with_capacity(len); + + let mut cursor = io::Cursor::new(slice); + while blob_infos.len() != len { + let blob_info = BlobInfo::read_from(&mut cursor)?; + blob_infos.push(blob_info); + } + + Ok(blob_infos) + } + + pub fn write_into(&self, writer: W) -> bincode::Result<()> { + bincode::serialize_into(writer, self) + } +} + +pub fn blobs_from_blob_infos(infos: &[BlobInfo], snapshot: &Snapshot<&DB>) -> Result, Box> { + let mut blobs = Vec::with_capacity(infos.len()); + + for info in infos { + let blob = match info.sign { + Sign::Positive => { + let key_map = format!("blob-{}-fst", info.name); + let map = match snapshot.get(key_map.as_bytes())? { + Some(value) => value.to_vec(), + None => return Err(format!("No fst entry found for blob {}", info.name).into()), + }; + let key_doc_idx = format!("blob-{}-doc-idx", info.name); + let doc_idx = match snapshot.get(key_doc_idx.as_bytes())? { + Some(value) => value.to_vec(), + None => return Err(format!("No doc-idx entry found for blob {}", info.name).into()), + }; + PositiveBlob::from_bytes(map, doc_idx).map(Blob::Positive)? + }, + Sign::Negative => { + let key_doc_ids = format!("blob-{}-doc-ids", info.name); + let doc_ids = match snapshot.get(key_doc_ids.as_bytes())? { + Some(value) => value.to_vec(), + None => return Err(format!("No doc-ids entry found for blob {}", info.name).into()), + }; + NegativeBlob::from_bytes(doc_ids).map(Blob::Negative)? + }, + }; + blobs.push(blob); + } + + Ok(blobs) } diff --git a/src/index/blob_name.rs b/src/index/blob_name.rs deleted file mode 100644 index 50bebcaa8..000000000 --- a/src/index/blob_name.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::fmt; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct BlobName; - -impl BlobName { - pub fn new() -> BlobName { - unimplemented!() - } -} - -impl fmt::Display for BlobName { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - unimplemented!() - } -} diff --git a/src/index/mod.rs b/src/index/mod.rs index 49cd24076..d50e0706d 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,4 +1,3 @@ -pub mod blob_name; pub mod schema; pub mod update; @@ -21,8 +20,7 @@ use crate::data::DocIdsBuilder; use crate::{DocIndex, DocumentId}; use crate::index::schema::Schema; use crate::index::update::Update; -use crate::blob::{PositiveBlobBuilder, Blob, Sign}; -use crate::blob::ordered_blobs_from_slice; +use crate::blob::{PositiveBlobBuilder, BlobInfo, Sign, Blob, blobs_from_blob_infos}; use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer}; use crate::rank::{criterion, Config, RankedStream}; use crate::automaton; @@ -112,12 +110,14 @@ impl Index { } pub fn search(&self, query: &str) -> Result, Box> { - // this snapshot will allow consistent operations on documents + // this snapshot will allow consistent reads for the whole search operation let snapshot = self.database.snapshot(); - // FIXME create a SNAPSHOT for the search ! let blobs = match snapshot.get(DATA_BLOBS_ORDER.as_bytes())? { - Some(value) => ordered_blobs_from_slice(&value)?, + Some(value) => { + let blob_infos = BlobInfo::read_from_slice(&value)?; + blobs_from_blob_infos(&blob_infos, &snapshot)? + }, None => Vec::new(), }; @@ -143,7 +143,7 @@ mod tests { use tempfile::NamedTempFile; use super::*; - use crate::index::schema::Schema; + use crate::index::schema::{Schema, SchemaBuilder, STORED, INDEXED}; use crate::index::update::{PositiveUpdateBuilder, NegativeUpdateBuilder}; #[test] @@ -151,7 +151,8 @@ mod tests { let path = NamedTempFile::new()?.into_temp_path(); let mut builder = NegativeUpdateBuilder::new(&path); - // you can insert documents in any order, it is sorted internally + // you can insert documents in any order, + // it is sorted internally builder.remove(1); builder.remove(5); builder.remove(2); @@ -165,19 +166,26 @@ mod tests { #[test] fn generate_positive_update() -> Result<(), Box> { + let title; + let description; + let schema = { + let mut builder = SchemaBuilder::new(); + title = builder.new_attribute("title", STORED | INDEXED); + description = builder.new_attribute("description", STORED | INDEXED); + builder.build() + }; - let schema = Schema::open("/meili/default.sch")?; + let sst_path = NamedTempFile::new()?.into_temp_path(); let tokenizer_builder = DefaultBuilder::new(); - let mut builder = PositiveUpdateBuilder::new("update-positive-0001.sst", schema.clone(), tokenizer_builder); + let mut builder = PositiveUpdateBuilder::new(&sst_path, schema.clone(), tokenizer_builder); - // you can insert documents in any order, it is sorted internally - let title_field = schema.attribute("title").unwrap(); - builder.update_field(1, title_field, "hallo!".to_owned()); - builder.update_field(5, title_field, "hello!".to_owned()); - builder.update_field(2, title_field, "hi!".to_owned()); + // you can insert documents in any order, + // it is sorted internally + builder.update_field(1, title, "hallo!".to_owned()); + builder.update_field(5, title, "hello!".to_owned()); + builder.update_field(2, title, "hi!".to_owned()); - let name_field = schema.attribute("name").unwrap(); - builder.remove_field(4, name_field); + builder.remove_field(4, description); let update = builder.build()?; diff --git a/src/index/schema.rs b/src/index/schema.rs index f1df50b07..426b4df8d 100644 --- a/src/index/schema.rs +++ b/src/index/schema.rs @@ -46,9 +46,11 @@ impl SchemaBuilder { SchemaBuilder { attrs: LinkedHashMap::new() } } - pub fn new_field>(&mut self, name: S, props: SchemaProps) -> SchemaAttr { + pub fn new_attribute>(&mut self, name: S, props: SchemaProps) -> SchemaAttr { let len = self.attrs.len(); - self.attrs.insert(name.into(), props); + if self.attrs.insert(name.into(), props).is_some() { + panic!("Field already inserted.") + } SchemaAttr(len as u32) } @@ -119,7 +121,7 @@ impl SchemaAttr { impl fmt::Display for SchemaAttr { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + self.0.fmt(f) } } @@ -130,9 +132,9 @@ mod tests { #[test] fn serialize_deserialize() -> bincode::Result<()> { let mut builder = SchemaBuilder::new(); - builder.new_field("alphabet", STORED); - builder.new_field("beta", STORED | INDEXED); - builder.new_field("gamma", INDEXED); + builder.new_attribute("alphabet", STORED); + builder.new_attribute("beta", STORED | INDEXED); + builder.new_attribute("gamma", INDEXED); let schema = builder.build(); let mut buffer = Vec::new(); diff --git a/src/index/update/mod.rs b/src/index/update/mod.rs index 8f6f22329..c6befe9f6 100644 --- a/src/index/update/mod.rs +++ b/src/index/update/mod.rs @@ -3,8 +3,7 @@ use std::error::Error; use ::rocksdb::rocksdb_options; -use crate::index::blob_name::BlobName; -use crate::blob::Sign; +use crate::blob::{BlobName, Sign}; mod negative_update; mod positive_update; @@ -18,17 +17,7 @@ pub struct Update { impl Update { pub fn open>(path: P) -> Result> { - let path = path.into(); - - let env_options = rocksdb_options::EnvOptions::new(); - let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); - let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); - file_writer.open(&path.to_string_lossy())?; - let infos = file_writer.finish()?; - - // FIXME check if the update contains a blobs-order entry - - Ok(Update { path }) + Ok(Update { path: path.into() }) } pub fn into_path_buf(self) -> PathBuf { diff --git a/src/index/update/negative_update.rs b/src/index/update/negative_update.rs index 1dfbea9b0..20356b5d1 100644 --- a/src/index/update/negative_update.rs +++ b/src/index/update/negative_update.rs @@ -3,9 +3,9 @@ use std::error::Error; use ::rocksdb::rocksdb_options; +use crate::blob::BlobInfo; use crate::index::DATA_BLOBS_ORDER; use crate::index::update::Update; -use crate::index::blob_name::BlobName; use crate::data::DocIdsBuilder; use crate::DocumentId; @@ -27,30 +27,28 @@ impl NegativeUpdateBuilder { } pub fn build(self) -> Result> { - let blob_name = BlobName::new(); + let blob_info = BlobInfo::new_negative(); let env_options = rocksdb_options::EnvOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); - file_writer.open(&self.path.to_string_lossy())?; - // TODO the blob-name must be written in bytes (16 bytes) - // along with the sign - unimplemented!("write the blob sign and name"); - - // write the blob name to be merged - let blob_name = blob_name.to_string(); - file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; - // write the doc ids - let blob_key = format!("BLOB-{}-doc-ids", blob_name); + let blob_key = format!("blob-{}-doc-ids", blob_info.name); let blob_doc_ids = self.doc_ids.into_inner()?; file_writer.put(blob_key.as_bytes(), &blob_doc_ids)?; + { + // write the blob name to be merged + let mut buffer = Vec::new(); + blob_info.write_into(&mut buffer); + file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), &buffer)?; + } + for id in blob_doc_ids { - let start = format!("DOCU-{}", id); - let end = format!("DOCU-{}", id + 1); + let start = format!("docu-{}", id); + let end = format!("docu-{}", id + 1); file_writer.delete_range(start.as_bytes(), end.as_bytes())?; } diff --git a/src/index/update/positive_update.rs b/src/index/update/positive_update.rs index 57b7a0ee8..9378951ce 100644 --- a/src/index/update/positive_update.rs +++ b/src/index/update/positive_update.rs @@ -7,10 +7,9 @@ use ::rocksdb::rocksdb_options; use crate::index::DATA_BLOBS_ORDER; use crate::index::update::Update; -use crate::index::blob_name::BlobName; use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; use crate::tokenizer::TokenizerBuilder; -use crate::blob::PositiveBlobBuilder; +use crate::blob::{BlobInfo, PositiveBlobBuilder}; use crate::{DocIndex, DocumentId}; pub enum NewState { @@ -53,7 +52,7 @@ impl PositiveUpdateBuilder where B: TokenizerBuilder { pub fn build(self) -> Result> { - let blob_name = BlobName::new(); + let blob_info = BlobInfo::new_positive(); let env_options = rocksdb_options::EnvOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); @@ -61,14 +60,6 @@ where B: TokenizerBuilder file_writer.open(&self.path.to_string_lossy())?; - // TODO the blob-name must be written in bytes (16 bytes) - // along with the sign - unimplemented!("write the blob sign and name"); - - // write the blob name to be merged - let blob_name = blob_name.to_string(); - file_writer.put(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; - let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); for ((document_id, field), state) in &self.new_states { let value = match state { @@ -96,18 +87,27 @@ where B: TokenizerBuilder } let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; - // write the fst - let blob_key = format!("BLOB-{}-fst", blob_name); - file_writer.put(blob_key.as_bytes(), &blob_fst_map)?; - // write the doc-idx - let blob_key = format!("BLOB-{}-doc-idx", blob_name); + let blob_key = format!("blob-{}-doc-idx", blob_info.name); file_writer.put(blob_key.as_bytes(), &blob_doc_idx)?; + // write the fst + let blob_key = format!("blob-{}-fst", blob_info.name); + file_writer.put(blob_key.as_bytes(), &blob_fst_map)?; + + { + // write the blob name to be merged + let mut buffer = Vec::new(); + blob_info.write_into(&mut buffer); + file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), &buffer)?; + } + // write all the documents fields updates - let mut key = String::from("DOCU-"); + let mut key = String::from("docu-"); let prefix_len = key.len(); + // FIXME write numbers in bytes not decimal representation + for ((id, field), state) in self.new_states { key.truncate(prefix_len); write!(&mut key, "{}-{}", id, field)?;