diff --git a/.gitignore b/.gitignore index 6e03cb642..7d6c8de60 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **/*.rs.bk Cargo.lock /*.rkv +/query-history.txt diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 3c64717d4..cec7b2b6e 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Clément Renault "] edition = "2018" [dependencies] +arc-swap = "0.4.3" bincode = "1.1.4" byteorder = "1.3.2" crossbeam-channel = "0.3.9" diff --git a/meilidb-core/examples/from_file.rs b/meilidb-core/examples/from_file.rs index ad8382f93..e98cfcb78 100644 --- a/meilidb-core/examples/from_file.rs +++ b/meilidb-core/examples/from_file.rs @@ -5,14 +5,14 @@ use std::io::Write; use std::iter::FromIterator; use std::path::{Path, PathBuf}; use std::time::{Instant, Duration}; -use std::{fs, io}; +use std::{fs, io, sync::mpsc}; use rustyline::{Editor, Config}; use serde::{Serialize, Deserialize}; use structopt::StructOpt; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; -use meilidb_core::{Highlight, Database}; +use meilidb_core::{Highlight, Database, UpdateResult, BoxUpdateFn}; use meilidb_schema::SchemaAttr; const INDEX_NAME: &str = "default"; @@ -79,8 +79,10 @@ struct Document(HashMap); fn index_command(command: IndexCommand, database: Database) -> Result<(), Box> { let start = Instant::now(); + let (sender, receiver) = mpsc::sync_channel(0); + let update_fn = move |update: UpdateResult| sender.send(update.update_id).unwrap(); + let index = database.open_index(INDEX_NAME, Some(Box::new(update_fn)))?; let rkv = database.rkv.read().unwrap(); - let index = database.open_index(INDEX_NAME)?; let schema = { let string = fs::read_to_string(&command.schema)?; @@ -139,14 +141,9 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box Result<(), Box> { let rkv = database.rkv.read().unwrap(); - let index = database.open_index(INDEX_NAME)?; + let update_fn = None as Option::; + let index = database.open_index(INDEX_NAME, update_fn)?; let reader = rkv.read().unwrap(); let schema = index.main.schema(&reader)?; diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 154e6040e..d6b6c10d0 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -8,14 +8,23 @@ use log::{debug, error}; use crate::{store, update, Index, MResult}; +pub type BoxUpdateFn = Box; +type ArcSwapFn = arc_swap::ArcSwapOption; + pub struct Database { pub rkv: Arc>, main_store: rkv::SingleStore, indexes_store: rkv::SingleStore, - indexes: RwLock)>>, + indexes: RwLock, thread::JoinHandle<()>)>>, } -fn update_awaiter(receiver: Receiver<()>, rkv: Arc>, index: Index) { +fn update_awaiter( + receiver: Receiver<()>, + rkv: Arc>, + update_fn: Arc, + index: Index, +) +{ for () in receiver { // consume all updates in order (oldest first) loop { @@ -29,7 +38,13 @@ fn update_awaiter(receiver: Receiver<()>, rkv: Arc>, index: Ind Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break } }; - match update::update_task(&mut writer, index.clone(), None as Option::) { + let update_fn = update_fn.load(); + let update_fn: Option<&dyn Fn(update::UpdateResult)> = match *update_fn { + Some(ref f) => Some(f.as_ref()), + None => None, + }; + + match update::update_task(&mut writer, index.clone(), update_fn) { Ok(true) => if let Err(e) = writer.commit() { error!("update transaction failed: {}", e) }, // no more updates to handle for now Ok(false) => { debug!("no more updates"); writer.abort(); break }, @@ -78,15 +93,21 @@ impl Database { let (sender, receiver) = crossbeam_channel::bounded(100); let index = store::open(&rkv_read, &index_name, sender.clone())?; + let update_fn = Arc::new(ArcSwapFn::empty()); + let rkv_clone = rkv.clone(); let index_clone = index.clone(); - let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone)); + let update_fn_clone = update_fn.clone(); + + let handle = thread::spawn(move || { + update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone) + }); // send an update notification to make sure that // possible previous boot updates are consumed sender.send(()).unwrap(); - let result = indexes.insert(index_name, (index, handle)); + let result = indexes.insert(index_name, (index, update_fn, handle)); assert!(result.is_none(), "The index should not have been already open"); } @@ -95,12 +116,20 @@ impl Database { Ok(Database { rkv, main_store, indexes_store, indexes: RwLock::new(indexes) }) } - pub fn open_index(&self, name: impl Into) -> MResult { + pub fn open_index( + &self, + name: impl Into, + update_fn: Option, + ) -> MResult + { let indexes_lock = self.indexes.read().unwrap(); let name = name.into(); match indexes_lock.get(&name) { - Some((index, _)) => Ok(index.clone()), + Some((index, old_update_fn, _)) => { + old_update_fn.swap(update_fn.map(Arc::new)); + Ok(index.clone()) + }, None => { drop(indexes_lock); @@ -117,8 +146,16 @@ impl Database { indexes_write.entry(name).or_insert_with(|| { let rkv_clone = self.rkv.clone(); let index_clone = index.clone(); - let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone)); - (index.clone(), handle) + + let update_fn = update_fn.map(Arc::new); + let update_fn = Arc::new(ArcSwapFn::new(update_fn)); + let update_fn_clone = update_fn.clone(); + + let handle = thread::spawn(move || { + update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone) + }); + + (index.clone(), update_fn, handle) }); } diff --git a/meilidb-core/src/lib.rs b/meilidb-core/src/lib.rs index 034bf4f1a..9802ff2d5 100644 --- a/meilidb-core/src/lib.rs +++ b/meilidb-core/src/lib.rs @@ -15,13 +15,13 @@ pub mod raw_indexer; pub mod serde; pub mod store; -pub use self::database::Database; +pub use self::database::{Database, BoxUpdateFn}; pub use self::error::{Error, MResult}; pub use self::number::{Number, ParseNumberError}; pub use self::ranked_map::RankedMap; pub use self::raw_document::RawDocument; pub use self::store::Index; -pub use self::update::UpdateStatus; +pub use self::update::{UpdateStatus, UpdateResult}; use zerocopy::{AsBytes, FromBytes}; use ::serde::{Serialize, Deserialize}; diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index 4f7fe5766..efdb07ea1 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -296,10 +296,10 @@ mod tests { use sdset::SetBuf; use tempfile::TempDir; + use crate::automaton::normalize_str; + use crate::database::{Database, BoxUpdateFn}; use crate::DocIndex; use crate::store::Index; - use crate::database::Database; - use crate::automaton::normalize_str; fn set_from_stream<'f, I, S>(stream: I) -> Set where @@ -392,7 +392,8 @@ mod tests { fn from_iter>(iter: I) -> Self { let tempdir = TempDir::new().unwrap(); let database = Database::open_or_create(&tempdir).unwrap(); - let index = database.open_index("default").unwrap(); + let update_fn = None as Option::; + let index = database.open_index("default", update_fn).unwrap(); let rkv = database.rkv.read().unwrap(); let mut writer = rkv.write().unwrap(); diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 0386ecc02..53b92680c 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -98,7 +98,7 @@ pub fn next_update_id( pub fn update_task( writer: &mut rkv::Writer, index: store::Index, - mut callback: Option, + mut callback: Option, ) -> MResult { let (update_id, update) = match index.updates.pop_front(writer)? { @@ -111,6 +111,7 @@ pub fn update_task( let (update_type, result, duration) = match update { Update::SchemaUpdate(schema) => { let start = Instant::now(); + let update_type = UpdateType::SchemaUpdate { schema: schema.clone() }; let result = apply_schema_update(writer, index.main, &schema);