diff --git a/Cargo.toml b/Cargo.toml index 9be65a7b8..deb12e4bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] } slice-group-by = "0.2" unidecode = "0.3" rayon = "1.0" +lockfree = "0.5.1" [dependencies.toml] git = "https://github.com/Kerollmops/toml-rs.git" diff --git a/examples/create-database.rs b/examples/create-database.rs index 043b297ef..ee5324919 100644 --- a/examples/create-database.rs +++ b/examples/create-database.rs @@ -50,7 +50,9 @@ fn index( stop_words: &HashSet, ) -> Result> { - let database = Database::create(database_path, &schema)?; + let database = Database::create(database_path)?; + + database.create_index("default", &schema)?; let mut rdr = csv::Reader::from_path(csv_data_path)?; let mut raw_record = csv::StringRecord::new(); @@ -61,7 +63,7 @@ fn index( while !end_of_file { let tokenizer_builder = DefaultBuilder::new(); - let mut update = database.start_update()?; + let mut update = database.start_update("default")?; loop { end_of_file = !rdr.read_record(&mut raw_record)?; diff --git a/examples/query-database.rs b/examples/query-database.rs index d1e6a0e17..72e990960 100644 --- a/examples/query-database.rs +++ b/examples/query-database.rs @@ -116,7 +116,7 @@ fn main() -> Result<(), Box> { if input.read_line(&mut buffer)? == 0 { break } let query = buffer.trim_end_matches('\n'); - let view = database.view(); + let view = database.view("default")?; let schema = view.schema(); let (elapsed, documents) = elapsed::measure_time(|| { diff --git a/src/database/mod.rs b/src/database/mod.rs index ce15de752..701a7e23b 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,13 +1,17 @@ -use std::error::Error; -use std::path::Path; -use std::ops::Deref; use std::sync::Arc; +use std::error::Error; +use std::ffi::OsStr; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::ops::{Deref, DerefMut}; -use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions}; -use rocksdb::rocksdb::{Writable, Snapshot}; -use rocksdb::{DB, MergeOperands}; use crossbeam::atomic::ArcCell; -use log::info; +use log::{info, error, warn}; +use rocksdb::rocksdb::{Writable, Snapshot}; +use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions}; +use rocksdb::{DB, MergeOperands}; +use lockfree::map::Map; pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; @@ -77,21 +81,48 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan bytes } -pub struct Database { - db: Arc, - // This view is updated each time the DB ingests an update - view: ArcCell>>, +pub struct IndexUpdate { + index: String, + update: Update, } -impl Database { - pub fn create>(path: P, schema: &Schema) -> Result> { +impl Deref for IndexUpdate { + type Target = Update; + + fn deref(&self) -> &Update { + &self.update + } +} + +impl DerefMut for IndexUpdate { + fn deref_mut(&mut self) -> &mut Update { + &mut self.update + } +} + +struct DatabaseIndex { + db: Arc, + + // This view is updated each time the DB ingests an update + view: ArcCell>>, + + // This path is the path to the mdb folder stored on disk + path: PathBuf, + + // must_die false by default, must be set as true when the Index is dropped. + // It's used to erase the folder saved on disk when the user request to delete an index + must_die: AtomicBool, +} + +impl DatabaseIndex { + fn create>(path: P, schema: &Schema) -> Result> { let path = path.as_ref(); if path.exists() { return Err(format!("File already exists at path: {}, cannot create database.", path.display()).into()) } - let path = path.to_string_lossy(); + let path_lossy = path.to_string_lossy(); let mut opts = DBOptions::new(); opts.create_if_missing(true); // opts.error_if_exists(true); // FIXME pull request that @@ -99,7 +130,7 @@ impl Database { let mut cf_opts = ColumnFamilyOptions::new(); cf_opts.add_merge_operator("data-index merge operator", merge_indexes); - let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?; + let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?; let mut schema_bytes = Vec::new(); schema.write_to_bin(&mut schema_bytes)?; @@ -109,11 +140,17 @@ impl Database { let snapshot = Snapshot::new(db.clone()); let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); - Ok(Database { db, view }) + + Ok(DatabaseIndex { + db: db, + view: view, + path: path.to_path_buf(), + must_die: AtomicBool::new(false) + }) } - pub fn open>(path: P) -> Result> { - let path = path.as_ref().to_string_lossy(); + fn open>(path: P) -> Result> { + let path_lossy = path.as_ref().to_string_lossy(); let mut opts = DBOptions::new(); opts.create_if_missing(false); @@ -121,7 +158,7 @@ impl Database { let mut cf_opts = ColumnFamilyOptions::new(); cf_opts.add_merge_operator("data-index merge operator", merge_indexes); - let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?; + let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?; // FIXME create a generic function to do that ! let _schema = match db.get(DATA_SCHEMA)? { @@ -133,10 +170,19 @@ impl Database { let snapshot = Snapshot::new(db.clone()); let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); - Ok(Database { db, view }) + Ok(DatabaseIndex { + db: db, + view: view, + path: path.as_ref().to_path_buf(), + must_die: AtomicBool::new(false) + }) } - pub fn start_update(&self) -> Result> { + fn must_die(&self) { + self.must_die.store(true, Ordering::Relaxed) + } + + fn start_update(&self) -> Result> { let schema = match self.db.get(DATA_SCHEMA)? { Some(value) => Schema::read_from_bin(&*value)?, None => panic!("Database does not contain a schema"), @@ -145,7 +191,7 @@ impl Database { Ok(Update::new(schema)) } - pub fn commit_update(&self, update: Update) -> Result>>, Box> { + fn commit_update(&self, update: Update) -> Result>>, Box> { let batch = update.build()?; self.db.write(batch)?; @@ -156,11 +202,115 @@ impl Database { Ok(view) } - pub fn view(&self) -> Arc>> { + fn view(&self) -> Arc>> { self.view.get() } } +impl Drop for DatabaseIndex { + fn drop(&mut self) { + if self.must_die.load(Ordering::Relaxed) { + if let Err(err) = fs::remove_dir_all(&self.path) { + error!("Impossible to remove mdb when Database id dropped; {}", err); + } + } + } +} + +pub struct Database { + indexes: Map>, + path: PathBuf, +} + +impl Database { + pub fn create>(path: P) -> Result> { + Ok(Database { + indexes: Map::new(), + path: path.as_ref().to_path_buf(), + }) + } + + pub fn open>(path: P) -> Result> { + let entries = fs::read_dir(&path)?; + + let indexes = Map::new(); + for entry in entries { + let path = match entry { + Ok(p) => p.path(), + Err(err) => { + warn!("Impossible to retrieve the path from an entry; {}", err); + continue + } + }; + + let name = match path.file_stem().and_then(OsStr::to_str) { + Some(name) => name.to_owned(), + None => continue + }; + + let db = match DatabaseIndex::open(path.clone()) { + Ok(db) => db, + Err(err) => { + warn!("Impossible to open the database; {}", err); + continue + } + }; + + info!("Load database {}", name); + indexes.insert(name, Arc::new(db)); + } + + Ok(Database { + indexes: indexes, + path: path.as_ref().to_path_buf(), + }) + } + + pub fn create_index(&self, name: &str, schema: &Schema) -> Result<(), Box> { + let index_path = self.path.join(name); + + if index_path.exists() { + return Err("Index already exists".into()); + } + + let index = DatabaseIndex::create(index_path, schema)?; + self.indexes.insert(name.to_owned(), Arc::new(index)); + + Ok(()) + } + + pub fn delete_index(&self, name: &str) -> Result<(), Box> { + let index_guard = self.indexes.remove(name).ok_or("Index not found")?; + index_guard.val().must_die(); + + Ok(()) + } + + pub fn list_indexes(&self) -> Vec { + self.indexes.iter().map(|g| g.key().clone()).collect() + } + + pub fn start_update(&self, index: &str) -> Result> { + let index_guard = self.indexes.get(index).ok_or("Index not found")?; + let update = index_guard.val().start_update()?; + + Ok(IndexUpdate { index: index.to_owned(), update }) + } + + pub fn commit_update(&self, update: IndexUpdate)-> Result>>, Box> { + let index_guard = self.indexes.get(&update.index).ok_or("Index not found")?; + + index_guard.val().commit_update(update.update) + } + + pub fn view(&self, index: &str) -> Result>>, Box> { + let index_guard = self.indexes.get(index).ok_or("Index not found")?; + + Ok(index_guard.val().view()) + } + +} + #[cfg(test)] mod tests { use std::collections::HashSet; @@ -179,6 +329,7 @@ mod tests { let stop_words = HashSet::new(); let meilidb_path = dir.path().join("meilidb.mdb"); + let meilidb_index_name = "default"; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] struct SimpleDoc { @@ -197,7 +348,9 @@ mod tests { builder.build() }; - let database = Database::create(&meilidb_path, &schema)?; + let database = Database::create(&meilidb_path)?; + + database.create_index(meilidb_index_name, &schema)?; let doc0 = SimpleDoc { id: 0, @@ -213,7 +366,7 @@ mod tests { }; let tokenizer_builder = DefaultBuilder::new(); - let mut builder = database.start_update()?; + let mut builder = database.start_update(meilidb_index_name)?; let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?; let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?; @@ -235,6 +388,7 @@ mod tests { let stop_words = HashSet::new(); let meilidb_path = dir.path().join("meilidb.mdb"); + let meilidb_index_name = "default"; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] struct SimpleDoc { @@ -253,7 +407,9 @@ mod tests { builder.build() }; - let database = Database::create(&meilidb_path, &schema)?; + let database = Database::create(&meilidb_path)?; + + database.create_index(meilidb_index_name, &schema)?; let doc0 = SimpleDoc { id: 0, @@ -282,12 +438,12 @@ mod tests { let tokenizer_builder = DefaultBuilder::new(); - let mut builder = database.start_update()?; + let mut builder = database.start_update(meilidb_index_name)?; let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?; let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?; database.commit_update(builder)?; - let mut builder = database.start_update()?; + let mut builder = database.start_update(meilidb_index_name)?; let docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?; let docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?; let view = database.commit_update(builder)?; @@ -359,7 +515,10 @@ mod bench { let schema = builder.build(); let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), &schema)?; + let index_name = "default"; + + let database = Database::create(&db_path)?; + database.create_index(index_name, &schema)?; #[derive(Serialize)] struct Document { @@ -369,7 +528,7 @@ mod bench { } let tokenizer_builder = DefaultBuilder; - let mut builder = database.start_update()?; + let mut builder = database.start_update(index_name)?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..300 { @@ -404,7 +563,10 @@ mod bench { let schema = builder.build(); let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), &schema)?; + let index_name = "default"; + + let database = Database::create(&db_path)?; + database.create_index(index_name, &schema)?; #[derive(Serialize)] struct Document { @@ -414,7 +576,7 @@ mod bench { } let tokenizer_builder = DefaultBuilder; - let mut builder = database.start_update()?; + let mut builder = database.start_update(index_name)?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..3000 { @@ -450,7 +612,10 @@ mod bench { let schema = builder.build(); let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), &schema)?; + let index_name = "default"; + + let database = Database::create(&db_path)?; + database.create_index(index_name, &schema)?; #[derive(Serialize)] struct Document { @@ -460,7 +625,7 @@ mod bench { } let tokenizer_builder = DefaultBuilder; - let mut builder = database.start_update()?; + let mut builder = database.start_update(index_name)?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..30_000 { @@ -495,7 +660,10 @@ mod bench { let schema = builder.build(); let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), &schema)?; + let index_name = "default"; + + let database = Database::create(&db_path)?; + database.create_index(index_name, &schema)?; #[derive(Serialize)] struct Document { @@ -505,7 +673,7 @@ mod bench { } let tokenizer_builder = DefaultBuilder; - let mut builder = database.start_update()?; + let mut builder = database.start_update(index_name)?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..300 { @@ -540,7 +708,10 @@ mod bench { let schema = builder.build(); let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), &schema)?; + let index_name = "default"; + + let database = Database::create(&db_path)?; + database.create_index(index_name, &schema)?; #[derive(Serialize)] struct Document { @@ -550,7 +721,7 @@ mod bench { } let tokenizer_builder = DefaultBuilder; - let mut builder = database.start_update()?; + let mut builder = database.start_update(index_name)?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..3000 { @@ -586,7 +757,10 @@ mod bench { let schema = builder.build(); let db_path = dir.path().join("bench.mdb"); - let database = Database::create(db_path.clone(), &schema)?; + let index_name = "default"; + + let database = Database::create(&db_path)?; + database.create_index(index_name, &schema)?; #[derive(Serialize)] struct Document { @@ -596,7 +770,7 @@ mod bench { } let tokenizer_builder = DefaultBuilder; - let mut builder = database.start_update()?; + let mut builder = database.start_update(index_name)?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..30_000 {