mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-10 21:44:34 +01:00
Merge #1429
1429: Do not block when sending update notifications r=curquiza a=irevoire transplant this [PR](https://github.com/meilisearch/transplant/pull/260) from @Kerollmops 🎉 Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
cb58a8c776
@ -8,6 +8,7 @@ use std::sync::Arc;
|
|||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashSet},
|
collections::{BTreeMap, HashSet},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
@ -19,6 +20,8 @@ use log::error;
|
|||||||
use parking_lot::{Mutex, MutexGuard};
|
use parking_lot::{Mutex, MutexGuard};
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
|
use tokio::time::timeout;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use codec::*;
|
use codec::*;
|
||||||
@ -120,7 +123,7 @@ impl UpdateStore {
|
|||||||
|
|
||||||
let state = Arc::new(StateLock::from_state(State::Idle));
|
let state = Arc::new(StateLock::from_state(State::Idle));
|
||||||
|
|
||||||
let (notification_sender, notification_receiver) = mpsc::channel(10);
|
let (notification_sender, notification_receiver) = mpsc::channel(1);
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
Self {
|
Self {
|
||||||
@ -146,22 +149,22 @@ impl UpdateStore {
|
|||||||
let update_store = Arc::new(update_store);
|
let update_store = Arc::new(update_store);
|
||||||
|
|
||||||
// Send a first notification to trigger the process.
|
// Send a first notification to trigger the process.
|
||||||
let _ = update_store.notification_sender.send(());
|
if let Err(TrySendError::Closed(())) = update_store.notification_sender.try_send(()) {
|
||||||
|
panic!("Failed to init update store");
|
||||||
// Init update loop to perform any pending updates at launch.
|
}
|
||||||
// Since we just launched the update store, and we still own the receiving end of the
|
|
||||||
// channel, this call is guaranteed to succeed.
|
|
||||||
update_store
|
|
||||||
.notification_sender
|
|
||||||
.try_send(())
|
|
||||||
.expect("Failed to init update store");
|
|
||||||
|
|
||||||
// We need a weak reference so we can take ownership on the arc later when we
|
// We need a weak reference so we can take ownership on the arc later when we
|
||||||
// want to close the index.
|
// want to close the index.
|
||||||
|
let duration = Duration::from_secs(10 * 60); // 10 minutes
|
||||||
let update_store_weak = Arc::downgrade(&update_store);
|
let update_store_weak = Arc::downgrade(&update_store);
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
// Block and wait for something to process.
|
// Block and wait for something to process with a timeout. The timeout
|
||||||
'outer: while notification_receiver.recv().await.is_some() {
|
// function returns a Result and we must just unlock the loop on Result.
|
||||||
|
'outer: while timeout(duration, notification_receiver.recv())
|
||||||
|
.await
|
||||||
|
.transpose()
|
||||||
|
.map_or(false, |r| r.is_ok())
|
||||||
|
{
|
||||||
loop {
|
loop {
|
||||||
match update_store_weak.upgrade() {
|
match update_store_weak.upgrade() {
|
||||||
Some(update_store) => {
|
Some(update_store) => {
|
||||||
@ -245,9 +248,10 @@ impl UpdateStore {
|
|||||||
|
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
|
|
||||||
self.notification_sender
|
if let Err(TrySendError::Closed(())) = self.notification_sender.try_send(()) {
|
||||||
.blocking_send(())
|
panic!("Update store loop exited");
|
||||||
.expect("Update store loop exited.");
|
}
|
||||||
|
|
||||||
Ok(meta)
|
Ok(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user