Support a basic update callback system

This commit is contained in:
Clément Renault 2019-10-09 11:45:19 +02:00
parent 2a4707d51e
commit 5f3072e67e
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
7 changed files with 66 additions and 27 deletions

View file

@ -8,14 +8,23 @@ use log::{debug, error};
use crate::{store, update, Index, MResult};
pub type BoxUpdateFn = Box<dyn Fn(update::UpdateResult) + Send + Sync + 'static>;
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct Database {
pub rkv: Arc<RwLock<rkv::Rkv>>,
main_store: rkv::SingleStore,
indexes_store: rkv::SingleStore,
indexes: RwLock<HashMap<String, (Index, thread::JoinHandle<()>)>>,
indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>,
}
fn update_awaiter(receiver: Receiver<()>, rkv: Arc<RwLock<rkv::Rkv>>, index: Index) {
fn update_awaiter(
receiver: Receiver<()>,
rkv: Arc<RwLock<rkv::Rkv>>,
update_fn: Arc<ArcSwapFn>,
index: Index,
)
{
for () in receiver {
// consume all updates in order (oldest first)
loop {
@ -29,7 +38,13 @@ fn update_awaiter(receiver: Receiver<()>, rkv: Arc<RwLock<rkv::Rkv>>, index: Ind
Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break }
};
match update::update_task(&mut writer, index.clone(), None as Option::<fn(_)>) {
let update_fn = update_fn.load();
let update_fn: Option<&dyn Fn(update::UpdateResult)> = match *update_fn {
Some(ref f) => Some(f.as_ref()),
None => None,
};
match update::update_task(&mut writer, index.clone(), update_fn) {
Ok(true) => if let Err(e) = writer.commit() { error!("update transaction failed: {}", e) },
// no more updates to handle for now
Ok(false) => { debug!("no more updates"); writer.abort(); break },
@ -78,15 +93,21 @@ impl Database {
let (sender, receiver) = crossbeam_channel::bounded(100);
let index = store::open(&rkv_read, &index_name, sender.clone())?;
let update_fn = Arc::new(ArcSwapFn::empty());
let rkv_clone = rkv.clone();
let index_clone = index.clone();
let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone));
let update_fn_clone = update_fn.clone();
let handle = thread::spawn(move || {
update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone)
});
// send an update notification to make sure that
// possible previous boot updates are consumed
sender.send(()).unwrap();
let result = indexes.insert(index_name, (index, handle));
let result = indexes.insert(index_name, (index, update_fn, handle));
assert!(result.is_none(), "The index should not have been already open");
}
@ -95,12 +116,20 @@ impl Database {
Ok(Database { rkv, main_store, indexes_store, indexes: RwLock::new(indexes) })
}
pub fn open_index(&self, name: impl Into<String>) -> MResult<Index> {
pub fn open_index(
&self,
name: impl Into<String>,
update_fn: Option<BoxUpdateFn>,
) -> MResult<Index>
{
let indexes_lock = self.indexes.read().unwrap();
let name = name.into();
match indexes_lock.get(&name) {
Some((index, _)) => Ok(index.clone()),
Some((index, old_update_fn, _)) => {
old_update_fn.swap(update_fn.map(Arc::new));
Ok(index.clone())
},
None => {
drop(indexes_lock);
@ -117,8 +146,16 @@ impl Database {
indexes_write.entry(name).or_insert_with(|| {
let rkv_clone = self.rkv.clone();
let index_clone = index.clone();
let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone));
(index.clone(), handle)
let update_fn = update_fn.map(Arc::new);
let update_fn = Arc::new(ArcSwapFn::new(update_fn));
let update_fn_clone = update_fn.clone();
let handle = thread::spawn(move || {
update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone)
});
(index.clone(), update_fn, handle)
});
}

View file

@ -15,13 +15,13 @@ pub mod raw_indexer;
pub mod serde;
pub mod store;
pub use self::database::Database;
pub use self::database::{Database, BoxUpdateFn};
pub use self::error::{Error, MResult};
pub use self::number::{Number, ParseNumberError};
pub use self::ranked_map::RankedMap;
pub use self::raw_document::RawDocument;
pub use self::store::Index;
pub use self::update::UpdateStatus;
pub use self::update::{UpdateStatus, UpdateResult};
use zerocopy::{AsBytes, FromBytes};
use ::serde::{Serialize, Deserialize};

View file

@ -296,10 +296,10 @@ mod tests {
use sdset::SetBuf;
use tempfile::TempDir;
use crate::automaton::normalize_str;
use crate::database::{Database, BoxUpdateFn};
use crate::DocIndex;
use crate::store::Index;
use crate::database::Database;
use crate::automaton::normalize_str;
fn set_from_stream<'f, I, S>(stream: I) -> Set
where
@ -392,7 +392,8 @@ mod tests {
fn from_iter<I: IntoIterator<Item=(&'a str, &'a [DocIndex])>>(iter: I) -> Self {
let tempdir = TempDir::new().unwrap();
let database = Database::open_or_create(&tempdir).unwrap();
let index = database.open_index("default").unwrap();
let update_fn = None as Option::<BoxUpdateFn>;
let index = database.open_index("default", update_fn).unwrap();
let rkv = database.rkv.read().unwrap();
let mut writer = rkv.write().unwrap();

View file

@ -98,7 +98,7 @@ pub fn next_update_id(
pub fn update_task(
writer: &mut rkv::Writer,
index: store::Index,
mut callback: Option<impl FnOnce(UpdateResult)>,
mut callback: Option<impl Fn(UpdateResult)>,
) -> MResult<bool>
{
let (update_id, update) = match index.updates.pop_front(writer)? {
@ -111,6 +111,7 @@ pub fn update_task(
let (update_type, result, duration) = match update {
Update::SchemaUpdate(schema) => {
let start = Instant::now();
let update_type = UpdateType::SchemaUpdate { schema: schema.clone() };
let result = apply_schema_update(writer, index.main, &schema);