Merge pull request #61 from meilisearch/update-handler

create update handler trait
This commit is contained in:
Clément Renault 2020-12-22 13:42:00 +01:00 committed by GitHub
commit cd158d4cde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 10 deletions

View File

@ -306,7 +306,8 @@ async fn main() -> anyhow::Result<()> {
let update_store = UpdateStore::open( let update_store = UpdateStore::open(
update_store_options, update_store_options,
update_store_path, update_store_path,
move |update_id, meta, content| { // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
move |update_id, meta, content:&_| {
// We prepare the update by using the update builder. // We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(); let mut update_builder = UpdateBuilder::new();
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {

View File

@ -61,6 +61,7 @@ impl FacetStringOperator {
FacetStringOperator::Equal(s.to_lowercase()) FacetStringOperator::Equal(s.to_lowercase())
} }
#[allow(dead_code)]
fn not_equal(s: &str) -> Self { fn not_equal(s: &str) -> Self {
FacetStringOperator::equal(s).negate() FacetStringOperator::equal(s).negate()
} }

View File

@ -18,15 +18,26 @@ pub struct UpdateStore<M, N> {
notification_sender: Sender<()>, notification_sender: Sender<()>,
} }
pub trait UpdateHandler<M, N> {
fn handle_update(&mut self, update_id: u64, meta: M, content: &[u8]) -> heed::Result<N>;
}
impl<M, N, F> UpdateHandler<M, N> for F
where F: FnMut(u64, M, &[u8]) -> heed::Result<N> + Send + 'static {
fn handle_update(&mut self, update_id: u64, meta: M, content: &[u8]) -> heed::Result<N> {
self(update_id, meta, content)
}
}
impl<M: 'static, N: 'static> UpdateStore<M, N> { impl<M: 'static, N: 'static> UpdateStore<M, N> {
pub fn open<P, F>( pub fn open<P, U>(
mut options: EnvOpenOptions, mut options: EnvOpenOptions,
path: P, path: P,
mut update_function: F, mut update_handler: U,
) -> heed::Result<Arc<UpdateStore<M, N>>> ) -> heed::Result<Arc<UpdateStore<M, N>>>
where where
P: AsRef<Path>, P: AsRef<Path>,
F: FnMut(u64, M, &[u8]) -> heed::Result<N> + Send + 'static, U: UpdateHandler<M, N> + Send + 'static,
M: for<'a> Deserialize<'a>, M: for<'a> Deserialize<'a>,
N: Serialize, N: Serialize,
{ {
@ -55,7 +66,7 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
// Block and wait for something to process. // Block and wait for something to process.
for () in notification_receiver { for () in notification_receiver {
loop { loop {
match update_store_cloned.process_pending_update(&mut update_function) { match update_store_cloned.process_pending_update(&mut update_handler) {
Ok(Some(_)) => (), Ok(Some(_)) => (),
Ok(None) => break, Ok(None) => break,
Err(e) => eprintln!("error while processing update: {}", e), Err(e) => eprintln!("error while processing update: {}", e),
@ -125,9 +136,9 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and /// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed. /// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update<F>(&self, mut f: F) -> heed::Result<Option<(u64, N)>> fn process_pending_update<U>(&self, handler: &mut U) -> heed::Result<Option<(u64, N)>>
where where
F: FnMut(u64, M, &[u8]) -> heed::Result<N>, U: UpdateHandler<M, N>,
M: for<'a> Deserialize<'a>, M: for<'a> Deserialize<'a>,
N: Serialize, N: Serialize,
{ {
@ -144,7 +155,7 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
.expect("associated update content"); .expect("associated update content");
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let new_meta = (f)(first_id.get(), first_meta, first_content)?; let new_meta = handler.handle_update(first_id.get(), first_meta, first_content)?;
drop(rtxn); drop(rtxn);
// Once the pending update have been successfully processed // Once the pending update have been successfully processed
@ -298,7 +309,7 @@ mod tests {
fn simple() { fn simple() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let options = EnvOpenOptions::new(); let options = EnvOpenOptions::new();
let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content| { let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content:&_| {
Ok(meta + " processed") Ok(meta + " processed")
}).unwrap(); }).unwrap();
@ -316,7 +327,7 @@ mod tests {
fn long_running_update() { fn long_running_update() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let options = EnvOpenOptions::new(); let options = EnvOpenOptions::new();
let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content| { let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content:&_| {
thread::sleep(Duration::from_millis(400)); thread::sleep(Duration::from_millis(400));
Ok(meta + " processed") Ok(meta + " processed")
}).unwrap(); }).unwrap();