diff --git a/examples/create-database.rs b/examples/create-database.rs index 9a2784586..4f17d54a9 100644 --- a/examples/create-database.rs +++ b/examples/create-database.rs @@ -32,7 +32,7 @@ struct Document<'a> ( ); fn index(schema: Schema, database_path: &Path, csv_data_path: &Path) -> Result> { - let database = Database::create(database_path, schema.clone())?; + let database = Database::create(database_path, &schema)?; println!("start indexing..."); diff --git a/src/automaton.rs b/src/automaton.rs index d3eb81961..972b2ce51 100644 --- a/src/automaton.rs +++ b/src/automaton.rs @@ -50,6 +50,7 @@ impl AutomatonExt for DfaExt { } } +#[derive(Copy, Clone)] enum PrefixSetting { Prefix, NoPrefix, diff --git a/src/database/database.rs b/src/database/database.rs deleted file mode 100644 index ec77a62dc..000000000 --- a/src/database/database.rs +++ /dev/null @@ -1,638 +0,0 @@ -use std::sync::{Arc, Mutex}; -use std::error::Error; -use std::path::Path; - -use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions}; -use rocksdb::rocksdb::{Writable, Snapshot}; -use rocksdb::{DB, DBVector, MergeOperands}; -use crossbeam::atomic::ArcCell; - -use crate::database::index::Index; -use crate::database::{DatabaseView, Update, Schema}; -use crate::database::{DATA_INDEX, DATA_SCHEMA}; - -pub struct Database { - // DB is under a Mutex to sync update ingestions and separate DB update locking - // and DatabaseView acquiring locking in other words: - // "Block readers the minimum possible amount of time" - db: Mutex>, - - // This view is updated each time the DB ingests an update - view: ArcCell>>, -} - -impl Database { - pub fn create>(path: P, schema: Schema) -> Result> { - let path = path.as_ref(); - if path.exists() { - return Err(format!("File already exists at path: {}, cannot create database.", - path.display()).into()) - } - - let path = path.to_string_lossy(); - let mut opts = DBOptions::new(); - opts.create_if_missing(true); - // opts.error_if_exists(true); // FIXME pull request that - - let mut cf_opts = ColumnFamilyOptions::new(); - cf_opts.add_merge_operator("data-index merge operator", merge_indexes); - - let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?; - - let mut schema_bytes = Vec::new(); - schema.write_to_bin(&mut schema_bytes)?; - db.put(DATA_SCHEMA, &schema_bytes)?; - - let db = Arc::new(db); - let snapshot = Snapshot::new(db.clone()); - let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); - - Ok(Database { db: Mutex::new(db), view }) - } - - pub fn open>(path: P) -> Result> { - let path = path.as_ref().to_string_lossy(); - - let mut opts = DBOptions::new(); - opts.create_if_missing(false); - - let mut cf_opts = ColumnFamilyOptions::new(); - cf_opts.add_merge_operator("data-index merge operator", merge_indexes); - - let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?; - - // FIXME create a generic function to do that ! - let _schema = match db.get(DATA_SCHEMA)? { - Some(value) => Schema::read_from_bin(&*value)?, - None => return Err(String::from("Database does not contain a schema").into()), - }; - - let db = Arc::new(db); - let snapshot = Snapshot::new(db.clone()); - let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); - - Ok(Database { db: Mutex::new(db), view }) - } - - pub fn ingest_update_file(&self, update: Update) -> Result>>, Box> { - let snapshot = { - // We must have a mutex here to ensure that update ingestions and compactions - // are done atomatically and in the right order. - // This way update ingestions will block other update ingestions without blocking view - // creations while doing the "data-index" compaction - let db = match self.db.lock() { - Ok(db) => db, - Err(e) => return Err(e.to_string().into()), - }; - - let path = update.path().to_string_lossy(); - let options = IngestExternalFileOptions::new(); - // options.move_files(move_update); - - let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); - db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?; - - // Compacting to trigger the merge operator only one time - // while ingesting the update and not each time searching - db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX)); - - Snapshot::new(db.clone()) - }; - - let view = Arc::new(DatabaseView::new(snapshot)?); - self.view.set(view.clone()); - - Ok(view) - } - - pub fn get(&self, key: &[u8]) -> Result, Box> { - self.view().get(key) - } - - pub fn flush(&self) -> Result<(), Box> { - match self.db.lock() { - Ok(db) => Ok(db.flush(true)?), - Err(e) => Err(e.to_string().into()), - } - } - - pub fn view(&self) -> Arc>> { - self.view.get() - } -} - -fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - assert_eq!(key, DATA_INDEX, "The merge operator only supports \"data-index\" merging"); - - let mut index: Option = None; - - for bytes in existing.into_iter().chain(operands) { - let bytes_len = bytes.len(); - let bytes = Arc::new(bytes.to_vec()); - let operand = Index::from_shared_bytes(bytes, 0, bytes_len); - let operand = operand.expect("BUG: could not deserialize index"); - - let merged = match index { - Some(ref index) => index.merge(&operand).expect("BUG: could not merge index"), - None => operand, - }; - - index.replace(merged); - } - - let index = index.unwrap_or_default(); - let mut bytes = Vec::new(); - index.write_to_bytes(&mut bytes); - bytes -} - -#[cfg(test)] -mod tests { - use super::*; - use std::error::Error; - - use serde_derive::{Serialize, Deserialize}; - use tempfile::tempdir; - - use crate::database::schema::{SchemaBuilder, STORED, INDEXED}; - use crate::database::update::UpdateBuilder; - use crate::tokenizer::DefaultBuilder; - - #[test] - fn ingest_one_update_file() -> Result<(), Box> { - let dir = tempdir()?; - - let rocksdb_path = dir.path().join("rocksdb.rdb"); - - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] - struct SimpleDoc { - id: u64, - title: String, - description: String, - timestamp: u64, - } - - let schema = { - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("id", STORED); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - builder.new_attribute("timestamp", STORED); - builder.build() - }; - - let database = Database::create(&rocksdb_path, schema.clone())?; - - let update_path = dir.path().join("update.sst"); - - let doc0 = SimpleDoc { - id: 0, - title: String::from("I am a title"), - description: String::from("I am a description"), - timestamp: 1234567, - }; - let doc1 = SimpleDoc { - id: 1, - title: String::from("I am the second title"), - description: String::from("I am the second description"), - timestamp: 7654321, - }; - - let docid0; - let docid1; - let update = { - let tokenizer_builder = DefaultBuilder::new(); - let mut builder = UpdateBuilder::new(update_path, schema); - - docid0 = builder.update_document(&doc0, &tokenizer_builder)?; - docid1 = builder.update_document(&doc1, &tokenizer_builder)?; - - builder.build()? - }; - - database.ingest_update_file(update)?; - let view = database.view(); - - let de_doc0: SimpleDoc = view.document_by_id(docid0)?; - let de_doc1: SimpleDoc = view.document_by_id(docid1)?; - - assert_eq!(doc0, de_doc0); - assert_eq!(doc1, de_doc1); - - Ok(dir.close()?) - } - - #[test] - fn ingest_two_update_files() -> Result<(), Box> { - let dir = tempdir()?; - - let rocksdb_path = dir.path().join("rocksdb.rdb"); - - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] - struct SimpleDoc { - id: u64, - title: String, - description: String, - timestamp: u64, - } - - let schema = { - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("id", STORED); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - builder.new_attribute("timestamp", STORED); - builder.build() - }; - - let database = Database::create(&rocksdb_path, schema.clone())?; - - let doc0 = SimpleDoc { - id: 0, - title: String::from("I am a title"), - description: String::from("I am a description"), - timestamp: 1234567, - }; - let doc1 = SimpleDoc { - id: 1, - title: String::from("I am the second title"), - description: String::from("I am the second description"), - timestamp: 7654321, - }; - let doc2 = SimpleDoc { - id: 2, - title: String::from("I am the third title"), - description: String::from("I am the third description"), - timestamp: 7654321, - }; - let doc3 = SimpleDoc { - id: 3, - title: String::from("I am the fourth title"), - description: String::from("I am the fourth description"), - timestamp: 7654321, - }; - - let docid0; - let docid1; - let update1 = { - let tokenizer_builder = DefaultBuilder::new(); - let update_path = dir.path().join("update-000.sst"); - let mut builder = UpdateBuilder::new(update_path, schema.clone()); - - docid0 = builder.update_document(&doc0, &tokenizer_builder)?; - docid1 = builder.update_document(&doc1, &tokenizer_builder)?; - - builder.build()? - }; - - let docid2; - let docid3; - let update2 = { - let tokenizer_builder = DefaultBuilder::new(); - let update_path = dir.path().join("update-001.sst"); - let mut builder = UpdateBuilder::new(update_path, schema); - - docid2 = builder.update_document(&doc2, &tokenizer_builder)?; - docid3 = builder.update_document(&doc3, &tokenizer_builder)?; - - builder.build()? - }; - - database.ingest_update_file(update1)?; - database.ingest_update_file(update2)?; - - let view = database.view(); - - let de_doc0: SimpleDoc = view.document_by_id(docid0)?; - let de_doc1: SimpleDoc = view.document_by_id(docid1)?; - - assert_eq!(doc0, de_doc0); - assert_eq!(doc1, de_doc1); - - let de_doc2: SimpleDoc = view.document_by_id(docid2)?; - let de_doc3: SimpleDoc = view.document_by_id(docid3)?; - - assert_eq!(doc2, de_doc2); - assert_eq!(doc3, de_doc3); - - Ok(dir.close()?) - } -} - -#[cfg(all(feature = "nightly", test))] -mod bench { - extern crate test; - - use super::*; - use std::error::Error; - use std::iter::repeat_with; - use self::test::Bencher; - - use rand::distributions::Alphanumeric; - use rand_xorshift::XorShiftRng; - use rand::{Rng, SeedableRng}; - use rand::seq::SliceRandom; - use serde_derive::Serialize; - - use crate::tokenizer::DefaultBuilder; - use crate::database::update::UpdateBuilder; - use crate::database::schema::*; - - fn random_sentences(number: usize, rng: &mut R) -> String { - let mut words = String::new(); - - for i in 0..number { - let word_len = rng.gen_range(1, 12); - let iter = repeat_with(|| rng.sample(Alphanumeric)).take(word_len); - words.extend(iter); - - if i == number - 1 { // last word - let final_ = [".", "?", "!", "..."].choose(rng).cloned(); - words.extend(final_); - } else { - let middle = [",", ", "].choose(rng).cloned(); - words.extend(middle); - } - } - - words - } - - #[bench] - fn open_little_database(bench: &mut Bencher) -> Result<(), Box> { - let dir = tempfile::tempdir()?; - - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - let schema = builder.build(); - - let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), schema.clone())?; - - #[derive(Serialize)] - struct Document { - id: u64, - title: String, - description: String, - } - - let path = dir.path().join("update-000.sst"); - let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema.clone()); - let mut rng = XorShiftRng::seed_from_u64(42); - - for i in 0..300 { - let document = Document { - id: i, - title: random_sentences(rng.gen_range(1, 8), &mut rng), - description: random_sentences(rng.gen_range(20, 200), &mut rng), - }; - builder.update_document(&document, &tokenizer_builder)?; - } - - let update = builder.build()?; - database.ingest_update_file(update)?; - - drop(database); - - bench.iter(|| { - let database = Database::open(db_path.clone()).unwrap(); - test::black_box(|| database); - }); - - Ok(()) - } - - #[bench] - fn open_medium_database(bench: &mut Bencher) -> Result<(), Box> { - let dir = tempfile::tempdir()?; - - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - let schema = builder.build(); - - let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), schema.clone())?; - - #[derive(Serialize)] - struct Document { - id: u64, - title: String, - description: String, - } - - let path = dir.path().join("update-000.sst"); - let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema.clone()); - let mut rng = XorShiftRng::seed_from_u64(42); - - for i in 0..3000 { - let document = Document { - id: i, - title: random_sentences(rng.gen_range(1, 8), &mut rng), - description: random_sentences(rng.gen_range(20, 200), &mut rng), - }; - builder.update_document(&document, &tokenizer_builder)?; - } - - let update = builder.build()?; - database.ingest_update_file(update)?; - - drop(database); - - bench.iter(|| { - let database = Database::open(db_path.clone()).unwrap(); - test::black_box(|| database); - }); - - Ok(()) - } - - #[bench] - #[ignore] - fn open_big_database(bench: &mut Bencher) -> Result<(), Box> { - let dir = tempfile::tempdir()?; - - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - let schema = builder.build(); - - let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), schema.clone())?; - - #[derive(Serialize)] - struct Document { - id: u64, - title: String, - description: String, - } - - let path = dir.path().join("update-000.sst"); - let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema.clone()); - let mut rng = XorShiftRng::seed_from_u64(42); - - for i in 0..30_000 { - let document = Document { - id: i, - title: random_sentences(rng.gen_range(1, 8), &mut rng), - description: random_sentences(rng.gen_range(20, 200), &mut rng), - }; - builder.update_document(&document, &tokenizer_builder)?; - } - - let update = builder.build()?; - database.ingest_update_file(update)?; - - drop(database); - - bench.iter(|| { - let database = Database::open(db_path.clone()).unwrap(); - test::black_box(|| database); - }); - - Ok(()) - } - - #[bench] - fn search_oneletter_little_database(bench: &mut Bencher) -> Result<(), Box> { - let dir = tempfile::tempdir()?; - - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - let schema = builder.build(); - - let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), schema.clone())?; - - #[derive(Serialize)] - struct Document { - id: u64, - title: String, - description: String, - } - - let path = dir.path().join("update-000.sst"); - let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema.clone()); - let mut rng = XorShiftRng::seed_from_u64(42); - - for i in 0..300 { - let document = Document { - id: i, - title: random_sentences(rng.gen_range(1, 8), &mut rng), - description: random_sentences(rng.gen_range(20, 200), &mut rng), - }; - builder.update_document(&document, &tokenizer_builder)?; - } - - let update = builder.build()?; - let view = database.ingest_update_file(update)?; - - bench.iter(|| { - for q in &["a", "b", "c", "d", "e"] { - let documents = view.query_builder().unwrap().query(q, 0..20); - test::black_box(|| documents); - } - }); - - Ok(()) - } - - #[bench] - fn search_oneletter_medium_database(bench: &mut Bencher) -> Result<(), Box> { - let dir = tempfile::tempdir()?; - - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - let schema = builder.build(); - - let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), schema.clone())?; - - #[derive(Serialize)] - struct Document { - id: u64, - title: String, - description: String, - } - - let path = dir.path().join("update-000.sst"); - let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema.clone()); - let mut rng = XorShiftRng::seed_from_u64(42); - - for i in 0..3000 { - let document = Document { - id: i, - title: random_sentences(rng.gen_range(1, 8), &mut rng), - description: random_sentences(rng.gen_range(20, 200), &mut rng), - }; - builder.update_document(&document, &tokenizer_builder)?; - } - - let update = builder.build()?; - let view = database.ingest_update_file(update)?; - - bench.iter(|| { - for q in &["a", "b", "c", "d", "e"] { - let documents = view.query_builder().unwrap().query(q, 0..20); - test::black_box(|| documents); - } - }); - - Ok(()) - } - - #[bench] - #[ignore] - fn search_oneletter_big_database(bench: &mut Bencher) -> Result<(), Box> { - let dir = tempfile::tempdir()?; - - let mut builder = SchemaBuilder::with_identifier("id"); - builder.new_attribute("title", STORED | INDEXED); - builder.new_attribute("description", STORED | INDEXED); - let schema = builder.build(); - - let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), schema.clone())?; - - #[derive(Serialize)] - struct Document { - id: u64, - title: String, - description: String, - } - - let path = dir.path().join("update-000.sst"); - let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema.clone()); - let mut rng = XorShiftRng::seed_from_u64(42); - - for i in 0..30_000 { - let document = Document { - id: i, - title: random_sentences(rng.gen_range(1, 8), &mut rng), - description: random_sentences(rng.gen_range(20, 200), &mut rng), - }; - builder.update_document(&document, &tokenizer_builder)?; - } - - let update = builder.build()?; - let view = database.ingest_update_file(update)?; - - bench.iter(|| { - for q in &["a", "b", "c", "d", "e"] { - let documents = view.query_builder().unwrap().query(q, 0..20); - test::black_box(|| documents); - } - }); - - Ok(()) - } -} diff --git a/src/database/mod.rs b/src/database/mod.rs index 2830f934b..20f20d544 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,14 +1,17 @@ +use std::sync::{Arc, Mutex}; use std::error::Error; use std::ops::Deref; -use std::sync::Arc; +use std::path::Path; -use rocksdb::rocksdb::{DB, Snapshot}; +use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions}; +use rocksdb::rocksdb::{Writable, Snapshot}; +use rocksdb::{DB, DBVector, MergeOperands}; +use crossbeam::atomic::ArcCell; pub use self::document_key::{DocumentKey, DocumentKeyAttr}; -pub use self::database_view::{DatabaseView, DocumentIter}; +pub use self::view::{DatabaseView, DocumentIter}; pub use self::update::{Update, UpdateBuilder}; pub use self::serde::SerializerError; -pub use self::database::Database; pub use self::schema::Schema; pub use self::index::Index; @@ -17,12 +20,11 @@ const DATA_SCHEMA: &[u8] = b"data-schema"; pub mod schema; pub(crate) mod index; -mod update; -mod serde; -mod database; -mod document_key; -mod database_view; mod deserializer; +mod document_key; +mod serde; +mod update; +mod view; fn retrieve_data_schema(snapshot: &Snapshot) -> Result> where D: Deref @@ -45,3 +47,629 @@ where D: Deref None => Ok(Index::default()), } } + +fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { + assert_eq!(key, DATA_INDEX, "The merge operator only supports \"data-index\" merging"); + + let mut index: Option = None; + + for bytes in existing.into_iter().chain(operands) { + let bytes_len = bytes.len(); + let bytes = Arc::new(bytes.to_vec()); + let operand = Index::from_shared_bytes(bytes, 0, bytes_len); + let operand = operand.expect("BUG: could not deserialize index"); + + let merged = match index { + Some(ref index) => index.merge(&operand).expect("BUG: could not merge index"), + None => operand, + }; + + index.replace(merged); + } + + let index = index.unwrap_or_default(); + let mut bytes = Vec::new(); + index.write_to_bytes(&mut bytes); + bytes +} + +pub struct Database { + // DB is under a Mutex to sync update ingestions and separate DB update locking + // and DatabaseView acquiring locking in other words: + // "Block readers the minimum possible amount of time" + db: Mutex>, + + // This view is updated each time the DB ingests an update + view: ArcCell>>, +} + +impl Database { + pub fn create>(path: P, schema: &Schema) -> Result> { + let path = path.as_ref(); + if path.exists() { + return Err(format!("File already exists at path: {}, cannot create database.", + path.display()).into()) + } + + let path = path.to_string_lossy(); + let mut opts = DBOptions::new(); + opts.create_if_missing(true); + // opts.error_if_exists(true); // FIXME pull request that + + let mut cf_opts = ColumnFamilyOptions::new(); + cf_opts.add_merge_operator("data-index merge operator", merge_indexes); + + let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?; + + let mut schema_bytes = Vec::new(); + schema.write_to_bin(&mut schema_bytes)?; + db.put(DATA_SCHEMA, &schema_bytes)?; + + let db = Arc::new(db); + let snapshot = Snapshot::new(db.clone()); + let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); + + Ok(Database { db: Mutex::new(db), view }) + } + + pub fn open>(path: P) -> Result> { + let path = path.as_ref().to_string_lossy(); + + let mut opts = DBOptions::new(); + opts.create_if_missing(false); + + let mut cf_opts = ColumnFamilyOptions::new(); + cf_opts.add_merge_operator("data-index merge operator", merge_indexes); + + let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?; + + // FIXME create a generic function to do that ! + let _schema = match db.get(DATA_SCHEMA)? { + Some(value) => Schema::read_from_bin(&*value)?, + None => return Err(String::from("Database does not contain a schema").into()), + }; + + let db = Arc::new(db); + let snapshot = Snapshot::new(db.clone()); + let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); + + Ok(Database { db: Mutex::new(db), view }) + } + + pub fn ingest_update_file(&self, update: Update) -> Result>>, Box> { + let snapshot = { + // We must have a mutex here to ensure that update ingestions and compactions + // are done atomatically and in the right order. + // This way update ingestions will block other update ingestions without blocking view + // creations while doing the "data-index" compaction + let db = match self.db.lock() { + Ok(db) => db, + Err(e) => return Err(e.to_string().into()), + }; + + let path = update.path().to_string_lossy(); + let options = IngestExternalFileOptions::new(); + // options.move_files(move_update); + + let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); + db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?; + + // Compacting to trigger the merge operator only one time + // while ingesting the update and not each time searching + db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX)); + + Snapshot::new(db.clone()) + }; + + let view = Arc::new(DatabaseView::new(snapshot)?); + self.view.set(view.clone()); + + Ok(view) + } + + pub fn get(&self, key: &[u8]) -> Result, Box> { + self.view().get(key) + } + + pub fn flush(&self) -> Result<(), Box> { + match self.db.lock() { + Ok(db) => Ok(db.flush(true)?), + Err(e) => Err(e.to_string().into()), + } + } + + pub fn view(&self) -> Arc>> { + self.view.get() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::error::Error; + + use serde_derive::{Serialize, Deserialize}; + use tempfile::tempdir; + + use crate::database::schema::{SchemaBuilder, STORED, INDEXED}; + use crate::database::update::UpdateBuilder; + use crate::tokenizer::DefaultBuilder; + + #[test] + fn ingest_one_update_file() -> Result<(), Box> { + let dir = tempdir()?; + + let rocksdb_path = dir.path().join("rocksdb.rdb"); + + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] + struct SimpleDoc { + id: u64, + title: String, + description: String, + timestamp: u64, + } + + let schema = { + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("id", STORED); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + builder.new_attribute("timestamp", STORED); + builder.build() + }; + + let database = Database::create(&rocksdb_path, &schema)?; + + let update_path = dir.path().join("update.sst"); + + let doc0 = SimpleDoc { + id: 0, + title: String::from("I am a title"), + description: String::from("I am a description"), + timestamp: 1234567, + }; + let doc1 = SimpleDoc { + id: 1, + title: String::from("I am the second title"), + description: String::from("I am the second description"), + timestamp: 7654321, + }; + + let docid0; + let docid1; + let update = { + let tokenizer_builder = DefaultBuilder::new(); + let mut builder = UpdateBuilder::new(update_path, schema); + + docid0 = builder.update_document(&doc0, &tokenizer_builder)?; + docid1 = builder.update_document(&doc1, &tokenizer_builder)?; + + builder.build()? + }; + + database.ingest_update_file(update)?; + let view = database.view(); + + let de_doc0: SimpleDoc = view.document_by_id(docid0)?; + let de_doc1: SimpleDoc = view.document_by_id(docid1)?; + + assert_eq!(doc0, de_doc0); + assert_eq!(doc1, de_doc1); + + Ok(dir.close()?) + } + + #[test] + fn ingest_two_update_files() -> Result<(), Box> { + let dir = tempdir()?; + + let rocksdb_path = dir.path().join("rocksdb.rdb"); + + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] + struct SimpleDoc { + id: u64, + title: String, + description: String, + timestamp: u64, + } + + let schema = { + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("id", STORED); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + builder.new_attribute("timestamp", STORED); + builder.build() + }; + + let database = Database::create(&rocksdb_path, &schema)?; + + let doc0 = SimpleDoc { + id: 0, + title: String::from("I am a title"), + description: String::from("I am a description"), + timestamp: 1234567, + }; + let doc1 = SimpleDoc { + id: 1, + title: String::from("I am the second title"), + description: String::from("I am the second description"), + timestamp: 7654321, + }; + let doc2 = SimpleDoc { + id: 2, + title: String::from("I am the third title"), + description: String::from("I am the third description"), + timestamp: 7654321, + }; + let doc3 = SimpleDoc { + id: 3, + title: String::from("I am the fourth title"), + description: String::from("I am the fourth description"), + timestamp: 7654321, + }; + + let docid0; + let docid1; + let update1 = { + let tokenizer_builder = DefaultBuilder::new(); + let update_path = dir.path().join("update-000.sst"); + let mut builder = UpdateBuilder::new(update_path, schema.clone()); + + docid0 = builder.update_document(&doc0, &tokenizer_builder)?; + docid1 = builder.update_document(&doc1, &tokenizer_builder)?; + + builder.build()? + }; + + let docid2; + let docid3; + let update2 = { + let tokenizer_builder = DefaultBuilder::new(); + let update_path = dir.path().join("update-001.sst"); + let mut builder = UpdateBuilder::new(update_path, schema); + + docid2 = builder.update_document(&doc2, &tokenizer_builder)?; + docid3 = builder.update_document(&doc3, &tokenizer_builder)?; + + builder.build()? + }; + + database.ingest_update_file(update1)?; + database.ingest_update_file(update2)?; + + let view = database.view(); + + let de_doc0: SimpleDoc = view.document_by_id(docid0)?; + let de_doc1: SimpleDoc = view.document_by_id(docid1)?; + + assert_eq!(doc0, de_doc0); + assert_eq!(doc1, de_doc1); + + let de_doc2: SimpleDoc = view.document_by_id(docid2)?; + let de_doc3: SimpleDoc = view.document_by_id(docid3)?; + + assert_eq!(doc2, de_doc2); + assert_eq!(doc3, de_doc3); + + Ok(dir.close()?) + } +} + +#[cfg(all(feature = "nightly", test))] +mod bench { + extern crate test; + + use super::*; + use std::error::Error; + use std::iter::repeat_with; + use self::test::Bencher; + + use rand::distributions::Alphanumeric; + use rand_xorshift::XorShiftRng; + use rand::{Rng, SeedableRng}; + use rand::seq::SliceRandom; + use serde_derive::Serialize; + + use crate::tokenizer::DefaultBuilder; + use crate::database::update::UpdateBuilder; + use crate::database::schema::*; + + fn random_sentences(number: usize, rng: &mut R) -> String { + let mut words = String::new(); + + for i in 0..number { + let word_len = rng.gen_range(1, 12); + let iter = repeat_with(|| rng.sample(Alphanumeric)).take(word_len); + words.extend(iter); + + if i == number - 1 { // last word + let final_ = [".", "?", "!", "..."].choose(rng).cloned(); + words.extend(final_); + } else { + let middle = [",", ", "].choose(rng).cloned(); + words.extend(middle); + } + } + + words + } + + #[bench] + fn open_little_database(bench: &mut Bencher) -> Result<(), Box> { + let dir = tempfile::tempdir()?; + + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + let schema = builder.build(); + + let db_path = dir.path().join("bench.mdb"); + let database = Database::create(db_path.clone(), &schema)?; + + #[derive(Serialize)] + struct Document { + id: u64, + title: String, + description: String, + } + + let path = dir.path().join("update-000.sst"); + let tokenizer_builder = DefaultBuilder; + let mut builder = UpdateBuilder::new(path, schema); + let mut rng = XorShiftRng::seed_from_u64(42); + + for i in 0..300 { + let document = Document { + id: i, + title: random_sentences(rng.gen_range(1, 8), &mut rng), + description: random_sentences(rng.gen_range(20, 200), &mut rng), + }; + builder.update_document(&document, &tokenizer_builder)?; + } + + let update = builder.build()?; + database.ingest_update_file(update)?; + + drop(database); + + bench.iter(|| { + let database = Database::open(db_path.clone()).unwrap(); + test::black_box(|| database); + }); + + Ok(()) + } + + #[bench] + fn open_medium_database(bench: &mut Bencher) -> Result<(), Box> { + let dir = tempfile::tempdir()?; + + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + let schema = builder.build(); + + let db_path = dir.path().join("bench.mdb"); + let database = Database::create(db_path.clone(), &schema)?; + + #[derive(Serialize)] + struct Document { + id: u64, + title: String, + description: String, + } + + let path = dir.path().join("update-000.sst"); + let tokenizer_builder = DefaultBuilder; + let mut builder = UpdateBuilder::new(path, schema); + let mut rng = XorShiftRng::seed_from_u64(42); + + for i in 0..3000 { + let document = Document { + id: i, + title: random_sentences(rng.gen_range(1, 8), &mut rng), + description: random_sentences(rng.gen_range(20, 200), &mut rng), + }; + builder.update_document(&document, &tokenizer_builder)?; + } + + let update = builder.build()?; + database.ingest_update_file(update)?; + + drop(database); + + bench.iter(|| { + let database = Database::open(db_path.clone()).unwrap(); + test::black_box(|| database); + }); + + Ok(()) + } + + #[bench] + #[ignore] + fn open_big_database(bench: &mut Bencher) -> Result<(), Box> { + let dir = tempfile::tempdir()?; + + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + let schema = builder.build(); + + let db_path = dir.path().join("bench.mdb"); + let database = Database::create(db_path.clone(), &schema)?; + + #[derive(Serialize)] + struct Document { + id: u64, + title: String, + description: String, + } + + let path = dir.path().join("update-000.sst"); + let tokenizer_builder = DefaultBuilder; + let mut builder = UpdateBuilder::new(path, schema); + let mut rng = XorShiftRng::seed_from_u64(42); + + for i in 0..30_000 { + let document = Document { + id: i, + title: random_sentences(rng.gen_range(1, 8), &mut rng), + description: random_sentences(rng.gen_range(20, 200), &mut rng), + }; + builder.update_document(&document, &tokenizer_builder)?; + } + + let update = builder.build()?; + database.ingest_update_file(update)?; + + drop(database); + + bench.iter(|| { + let database = Database::open(db_path.clone()).unwrap(); + test::black_box(|| database); + }); + + Ok(()) + } + + #[bench] + fn search_oneletter_little_database(bench: &mut Bencher) -> Result<(), Box> { + let dir = tempfile::tempdir()?; + + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + let schema = builder.build(); + + let db_path = dir.path().join("bench.mdb"); + let database = Database::create(db_path.clone(), &schema)?; + + #[derive(Serialize)] + struct Document { + id: u64, + title: String, + description: String, + } + + let path = dir.path().join("update-000.sst"); + let tokenizer_builder = DefaultBuilder; + let mut builder = UpdateBuilder::new(path, schema); + let mut rng = XorShiftRng::seed_from_u64(42); + + for i in 0..300 { + let document = Document { + id: i, + title: random_sentences(rng.gen_range(1, 8), &mut rng), + description: random_sentences(rng.gen_range(20, 200), &mut rng), + }; + builder.update_document(&document, &tokenizer_builder)?; + } + + let update = builder.build()?; + let view = database.ingest_update_file(update)?; + + bench.iter(|| { + for q in &["a", "b", "c", "d", "e"] { + let documents = view.query_builder().unwrap().query(q, 0..20); + test::black_box(|| documents); + } + }); + + Ok(()) + } + + #[bench] + fn search_oneletter_medium_database(bench: &mut Bencher) -> Result<(), Box> { + let dir = tempfile::tempdir()?; + + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + let schema = builder.build(); + + let db_path = dir.path().join("bench.mdb"); + let database = Database::create(db_path.clone(), &schema)?; + + #[derive(Serialize)] + struct Document { + id: u64, + title: String, + description: String, + } + + let path = dir.path().join("update-000.sst"); + let tokenizer_builder = DefaultBuilder; + let mut builder = UpdateBuilder::new(path, schema); + let mut rng = XorShiftRng::seed_from_u64(42); + + for i in 0..3000 { + let document = Document { + id: i, + title: random_sentences(rng.gen_range(1, 8), &mut rng), + description: random_sentences(rng.gen_range(20, 200), &mut rng), + }; + builder.update_document(&document, &tokenizer_builder)?; + } + + let update = builder.build()?; + let view = database.ingest_update_file(update)?; + + bench.iter(|| { + for q in &["a", "b", "c", "d", "e"] { + let documents = view.query_builder().unwrap().query(q, 0..20); + test::black_box(|| documents); + } + }); + + Ok(()) + } + + #[bench] + #[ignore] + fn search_oneletter_big_database(bench: &mut Bencher) -> Result<(), Box> { + let dir = tempfile::tempdir()?; + + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("title", STORED | INDEXED); + builder.new_attribute("description", STORED | INDEXED); + let schema = builder.build(); + + let db_path = dir.path().join("bench.mdb"); + let database = Database::create(db_path.clone(), &schema)?; + + #[derive(Serialize)] + struct Document { + id: u64, + title: String, + description: String, + } + + let path = dir.path().join("update-000.sst"); + let tokenizer_builder = DefaultBuilder; + let mut builder = UpdateBuilder::new(path, schema); + let mut rng = XorShiftRng::seed_from_u64(42); + + for i in 0..30_000 { + let document = Document { + id: i, + title: random_sentences(rng.gen_range(1, 8), &mut rng), + description: random_sentences(rng.gen_range(20, 200), &mut rng), + }; + builder.update_document(&document, &tokenizer_builder)?; + } + + let update = builder.build()?; + let view = database.ingest_update_file(update)?; + + bench.iter(|| { + for q in &["a", "b", "c", "d", "e"] { + let documents = view.query_builder().unwrap().query(q, 0..20); + test::black_box(|| documents); + } + }); + + Ok(()) + } +} diff --git a/src/database/schema.rs b/src/database/schema.rs index 60a258824..6e3deabf5 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -26,11 +26,11 @@ pub struct SchemaProps { } impl SchemaProps { - pub fn is_stored(&self) -> bool { + pub fn is_stored(self) -> bool { self.stored } - pub fn is_indexed(&self) -> bool { + pub fn is_indexed(self) -> bool { self.indexed } } diff --git a/src/database/database_view.rs b/src/database/view.rs similarity index 100% rename from src/database/database_view.rs rename to src/database/view.rs diff --git a/src/lib.rs b/src/lib.rs index 01b9cb85d..fdf19edea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ pub mod database; pub mod data; pub mod rank; pub mod tokenizer; -pub mod vec_read_only; mod common_words; use std::fmt; @@ -42,7 +41,7 @@ impl Attribute { return Err(AttributeError::IndexTooBig) } - let attribute = (attribute as u32) << 22; + let attribute = u32::from(attribute) << 22; Ok(Attribute(attribute | index)) } @@ -66,12 +65,12 @@ impl Attribute { } #[inline] - pub fn attribute(&self) -> u16 { + pub fn attribute(self) -> u16 { (self.0 >> 22) as u16 } #[inline] - pub fn word_index(&self) -> u32 { + pub fn word_index(self) -> u32 { self.0 & 0b0000_0000_0011_1111_1111_1111_1111 } } @@ -118,7 +117,7 @@ impl WordArea { } let byte_index = byte_index << 10; - Ok(WordArea(byte_index | (length as u32))) + Ok(WordArea(byte_index | u32::from(length))) } fn new_faillible(byte_index: u32, length: u16) -> WordArea { @@ -134,12 +133,12 @@ impl WordArea { } #[inline] - pub fn byte_index(&self) -> u32 { + pub fn byte_index(self) -> u32 { self.0 >> 10 } #[inline] - pub fn length(&self) -> u16 { + pub fn length(self) -> u16 { (self.0 & 0b0000_0000_0000_0000_0011_1111_1111) as u16 } } diff --git a/src/rank/criterion/mod.rs b/src/rank/criterion/mod.rs index 0252176c9..a5dc7ab26 100644 --- a/src/rank/criterion/mod.rs +++ b/src/rank/criterion/mod.rs @@ -29,7 +29,6 @@ pub use self::{ pub trait Criterion where D: Deref { - #[inline] fn evaluate(&self, lhs: &Document, rhs: &Document, view: &DatabaseView) -> Ordering; #[inline] @@ -62,6 +61,7 @@ where D: Deref } } +#[derive(Default)] pub struct CriteriaBuilder where D: Deref { diff --git a/src/rank/criterion/sort_by.rs b/src/rank/criterion/sort_by.rs index bce8d0d90..8f1fef11c 100644 --- a/src/rank/criterion/sort_by.rs +++ b/src/rank/criterion/sort_by.rs @@ -46,13 +46,18 @@ use crate::rank::Document; /// let criterion = builder.build(); /// /// ``` -#[derive(Default)] pub struct SortBy { _phantom: marker::PhantomData, } impl SortBy { pub fn new() -> Self { + SortBy::default() + } +} + +impl Default for SortBy { + fn default() -> SortBy { SortBy { _phantom: marker::PhantomData } } } diff --git a/src/vec_read_only.rs b/src/vec_read_only.rs deleted file mode 100644 index efb3317ee..000000000 --- a/src/vec_read_only.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::ops::Deref; -use std::sync::Arc; -use std::fmt; - -#[derive(Clone, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct VecReadOnly { - inner: Arc>, - offset: usize, - len: usize, -} - -impl VecReadOnly { - pub fn new(vec: Vec) -> Self { - let len = vec.len(); - Self { - inner: Arc::new(vec), - offset: 0, - len: len, - } - } - - pub fn len(&self) -> usize { - self.len - } - - pub fn range(&self, offset: usize, len: usize) -> Self { - Self { - inner: self.inner.clone(), - offset: self.offset + offset, - len: len, - } - } - - pub fn as_slice(&self) -> &[T] { - &self.inner[self.offset..self.offset + self.len] - } -} - -impl Deref for VecReadOnly { - type Target = [T]; - - fn deref(&self) -> &Self::Target { - self.as_slice() - } -} - -impl fmt::Debug for VecReadOnly { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt(f) - } -}