Merge pull request #100 from qdequele/master

Allow users to manage multiple database indexes
This commit is contained in:
Clément Renault 2019-02-07 14:49:52 +01:00 committed by GitHub
commit 4c0279729b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 220 additions and 43 deletions

View File

@ -22,6 +22,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
slice-group-by = "0.2"
unidecode = "0.3"
rayon = "1.0"
lockfree = "0.5.1"
[dependencies.toml]
git = "https://github.com/Kerollmops/toml-rs.git"

View File

@ -50,7 +50,9 @@ fn index(
stop_words: &HashSet<String>,
) -> Result<Database, Box<Error>>
{
let database = Database::create(database_path, &schema)?;
let database = Database::create(database_path)?;
database.create_index("default", &schema)?;
let mut rdr = csv::Reader::from_path(csv_data_path)?;
let mut raw_record = csv::StringRecord::new();
@ -61,7 +63,7 @@ fn index(
while !end_of_file {
let tokenizer_builder = DefaultBuilder::new();
let mut update = database.start_update()?;
let mut update = database.start_update("default")?;
loop {
end_of_file = !rdr.read_record(&mut raw_record)?;

View File

@ -116,7 +116,7 @@ fn main() -> Result<(), Box<Error>> {
if input.read_line(&mut buffer)? == 0 { break }
let query = buffer.trim_end_matches('\n');
let view = database.view();
let view = database.view("default")?;
let schema = view.schema();
let (elapsed, documents) = elapsed::measure_time(|| {

View File

@ -1,13 +1,17 @@
use std::error::Error;
use std::path::Path;
use std::ops::Deref;
use std::sync::Arc;
use std::error::Error;
use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::ops::{Deref, DerefMut};
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, MergeOperands};
use crossbeam::atomic::ArcCell;
use log::info;
use log::{info, error, warn};
use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
use rocksdb::{DB, MergeOperands};
use lockfree::map::Map;
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
pub use self::view::{DatabaseView, DocumentIter};
@ -77,21 +81,48 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan
bytes
}
pub struct Database {
db: Arc<DB>,
// This view is updated each time the DB ingests an update
view: ArcCell<DatabaseView<Arc<DB>>>,
pub struct IndexUpdate {
index: String,
update: Update,
}
impl Database {
pub fn create<P: AsRef<Path>>(path: P, schema: &Schema) -> Result<Database, Box<Error>> {
impl Deref for IndexUpdate {
type Target = Update;
fn deref(&self) -> &Update {
&self.update
}
}
impl DerefMut for IndexUpdate {
fn deref_mut(&mut self) -> &mut Update {
&mut self.update
}
}
struct DatabaseIndex {
db: Arc<DB>,
// This view is updated each time the DB ingests an update
view: ArcCell<DatabaseView<Arc<DB>>>,
// This path is the path to the mdb folder stored on disk
path: PathBuf,
// must_die false by default, must be set as true when the Index is dropped.
// It's used to erase the folder saved on disk when the user request to delete an index
must_die: AtomicBool,
}
impl DatabaseIndex {
fn create<P: AsRef<Path>>(path: P, schema: &Schema) -> Result<DatabaseIndex, Box<Error>> {
let path = path.as_ref();
if path.exists() {
return Err(format!("File already exists at path: {}, cannot create database.",
path.display()).into())
}
let path = path.to_string_lossy();
let path_lossy = path.to_string_lossy();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
// opts.error_if_exists(true); // FIXME pull request that
@ -99,7 +130,7 @@ impl Database {
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?;
let mut schema_bytes = Vec::new();
schema.write_to_bin(&mut schema_bytes)?;
@ -109,11 +140,17 @@ impl Database {
let snapshot = Snapshot::new(db.clone());
let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?));
Ok(Database { db, view })
Ok(DatabaseIndex {
db: db,
view: view,
path: path.to_path_buf(),
must_die: AtomicBool::new(false)
})
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
let path = path.as_ref().to_string_lossy();
fn open<P: AsRef<Path>>(path: P) -> Result<DatabaseIndex, Box<Error>> {
let path_lossy = path.as_ref().to_string_lossy();
let mut opts = DBOptions::new();
opts.create_if_missing(false);
@ -121,7 +158,7 @@ impl Database {
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?;
// FIXME create a generic function to do that !
let _schema = match db.get(DATA_SCHEMA)? {
@ -133,10 +170,19 @@ impl Database {
let snapshot = Snapshot::new(db.clone());
let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?));
Ok(Database { db, view })
Ok(DatabaseIndex {
db: db,
view: view,
path: path.as_ref().to_path_buf(),
must_die: AtomicBool::new(false)
})
}
pub fn start_update(&self) -> Result<Update, Box<Error>> {
fn must_die(&self) {
self.must_die.store(true, Ordering::Relaxed)
}
fn start_update(&self) -> Result<Update, Box<Error>> {
let schema = match self.db.get(DATA_SCHEMA)? {
Some(value) => Schema::read_from_bin(&*value)?,
None => panic!("Database does not contain a schema"),
@ -145,7 +191,7 @@ impl Database {
Ok(Update::new(schema))
}
pub fn commit_update(&self, update: Update) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
fn commit_update(&self, update: Update) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
let batch = update.build()?;
self.db.write(batch)?;
@ -156,11 +202,115 @@ impl Database {
Ok(view)
}
pub fn view(&self) -> Arc<DatabaseView<Arc<DB>>> {
fn view(&self) -> Arc<DatabaseView<Arc<DB>>> {
self.view.get()
}
}
impl Drop for DatabaseIndex {
fn drop(&mut self) {
if self.must_die.load(Ordering::Relaxed) {
if let Err(err) = fs::remove_dir_all(&self.path) {
error!("Impossible to remove mdb when Database id dropped; {}", err);
}
}
}
}
pub struct Database {
indexes: Map<String, Arc<DatabaseIndex>>,
path: PathBuf,
}
impl Database {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
Ok(Database {
indexes: Map::new(),
path: path.as_ref().to_path_buf(),
})
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
let entries = fs::read_dir(&path)?;
let indexes = Map::new();
for entry in entries {
let path = match entry {
Ok(p) => p.path(),
Err(err) => {
warn!("Impossible to retrieve the path from an entry; {}", err);
continue
}
};
let name = match path.file_stem().and_then(OsStr::to_str) {
Some(name) => name.to_owned(),
None => continue
};
let db = match DatabaseIndex::open(path.clone()) {
Ok(db) => db,
Err(err) => {
warn!("Impossible to open the database; {}", err);
continue
}
};
info!("Load database {}", name);
indexes.insert(name, Arc::new(db));
}
Ok(Database {
indexes: indexes,
path: path.as_ref().to_path_buf(),
})
}
pub fn create_index(&self, name: &str, schema: &Schema) -> Result<(), Box<Error>> {
let index_path = self.path.join(name);
if index_path.exists() {
return Err("Index already exists".into());
}
let index = DatabaseIndex::create(index_path, schema)?;
self.indexes.insert(name.to_owned(), Arc::new(index));
Ok(())
}
pub fn delete_index(&self, name: &str) -> Result<(), Box<Error>> {
let index_guard = self.indexes.remove(name).ok_or("Index not found")?;
index_guard.val().must_die();
Ok(())
}
pub fn list_indexes(&self) -> Vec<String> {
self.indexes.iter().map(|g| g.key().clone()).collect()
}
pub fn start_update(&self, index: &str) -> Result<IndexUpdate, Box<Error>> {
let index_guard = self.indexes.get(index).ok_or("Index not found")?;
let update = index_guard.val().start_update()?;
Ok(IndexUpdate { index: index.to_owned(), update })
}
pub fn commit_update(&self, update: IndexUpdate)-> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
let index_guard = self.indexes.get(&update.index).ok_or("Index not found")?;
index_guard.val().commit_update(update.update)
}
pub fn view(&self, index: &str) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
let index_guard = self.indexes.get(index).ok_or("Index not found")?;
Ok(index_guard.val().view())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
@ -179,6 +329,7 @@ mod tests {
let stop_words = HashSet::new();
let meilidb_path = dir.path().join("meilidb.mdb");
let meilidb_index_name = "default";
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct SimpleDoc {
@ -197,7 +348,9 @@ mod tests {
builder.build()
};
let database = Database::create(&meilidb_path, &schema)?;
let database = Database::create(&meilidb_path)?;
database.create_index(meilidb_index_name, &schema)?;
let doc0 = SimpleDoc {
id: 0,
@ -213,7 +366,7 @@ mod tests {
};
let tokenizer_builder = DefaultBuilder::new();
let mut builder = database.start_update()?;
let mut builder = database.start_update(meilidb_index_name)?;
let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
@ -235,6 +388,7 @@ mod tests {
let stop_words = HashSet::new();
let meilidb_path = dir.path().join("meilidb.mdb");
let meilidb_index_name = "default";
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct SimpleDoc {
@ -253,7 +407,9 @@ mod tests {
builder.build()
};
let database = Database::create(&meilidb_path, &schema)?;
let database = Database::create(&meilidb_path)?;
database.create_index(meilidb_index_name, &schema)?;
let doc0 = SimpleDoc {
id: 0,
@ -282,12 +438,12 @@ mod tests {
let tokenizer_builder = DefaultBuilder::new();
let mut builder = database.start_update()?;
let mut builder = database.start_update(meilidb_index_name)?;
let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
database.commit_update(builder)?;
let mut builder = database.start_update()?;
let mut builder = database.start_update(meilidb_index_name)?;
let docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?;
let docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?;
let view = database.commit_update(builder)?;
@ -359,7 +515,10 @@ mod bench {
let schema = builder.build();
let db_path = dir.path().join("bench.mdb");
let database = Database::create(db_path.clone(), &schema)?;
let index_name = "default";
let database = Database::create(&db_path)?;
database.create_index(index_name, &schema)?;
#[derive(Serialize)]
struct Document {
@ -369,7 +528,7 @@ mod bench {
}
let tokenizer_builder = DefaultBuilder;
let mut builder = database.start_update()?;
let mut builder = database.start_update(index_name)?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..300 {
@ -404,7 +563,10 @@ mod bench {
let schema = builder.build();
let db_path = dir.path().join("bench.mdb");
let database = Database::create(db_path.clone(), &schema)?;
let index_name = "default";
let database = Database::create(&db_path)?;
database.create_index(index_name, &schema)?;
#[derive(Serialize)]
struct Document {
@ -414,7 +576,7 @@ mod bench {
}
let tokenizer_builder = DefaultBuilder;
let mut builder = database.start_update()?;
let mut builder = database.start_update(index_name)?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..3000 {
@ -450,7 +612,10 @@ mod bench {
let schema = builder.build();
let db_path = dir.path().join("bench.mdb");
let database = Database::create(db_path.clone(), &schema)?;
let index_name = "default";
let database = Database::create(&db_path)?;
database.create_index(index_name, &schema)?;
#[derive(Serialize)]
struct Document {
@ -460,7 +625,7 @@ mod bench {
}
let tokenizer_builder = DefaultBuilder;
let mut builder = database.start_update()?;
let mut builder = database.start_update(index_name)?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..30_000 {
@ -495,7 +660,10 @@ mod bench {
let schema = builder.build();
let db_path = dir.path().join("bench.mdb");
let database = Database::create(db_path.clone(), &schema)?;
let index_name = "default";
let database = Database::create(&db_path)?;
database.create_index(index_name, &schema)?;
#[derive(Serialize)]
struct Document {
@ -505,7 +673,7 @@ mod bench {
}
let tokenizer_builder = DefaultBuilder;
let mut builder = database.start_update()?;
let mut builder = database.start_update(index_name)?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..300 {
@ -540,7 +708,10 @@ mod bench {
let schema = builder.build();
let db_path = dir.path().join("bench.mdb");
let database = Database::create(db_path.clone(), &schema)?;
let index_name = "default";
let database = Database::create(&db_path)?;
database.create_index(index_name, &schema)?;
#[derive(Serialize)]
struct Document {
@ -550,7 +721,7 @@ mod bench {
}
let tokenizer_builder = DefaultBuilder;
let mut builder = database.start_update()?;
let mut builder = database.start_update(index_name)?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..3000 {
@ -586,7 +757,10 @@ mod bench {
let schema = builder.build();
let db_path = dir.path().join("bench.mdb");
let database = Database::create(db_path.clone(), &schema)?;
let index_name = "default";
let database = Database::create(&db_path)?;
database.create_index(index_name, &schema)?;
#[derive(Serialize)]
struct Document {
@ -596,7 +770,7 @@ mod bench {
}
let tokenizer_builder = DefaultBuilder;
let mut builder = database.start_update()?;
let mut builder = database.start_update(index_name)?;
let mut rng = XorShiftRng::seed_from_u64(42);
for i in 0..30_000 {