From 956012da95c78d83a8832e8e2e4dc27d506ee58c Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Wed, 5 May 2021 19:06:07 +0200 Subject: [PATCH] fix dump lock --- .../index_controller/update_actor/actor.rs | 24 +++-------------- .../update_actor/update_store.rs | 26 ++++++++++++++++--- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index fe4458acd..54d068f14 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -239,28 +239,12 @@ where let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - update_store.dump(&uuids, path.to_path_buf())?; - - // Perform the dump of each index concurently. Only a third of the capabilities of - // the index actor at a time not to put too much pressure on the index actor - let path = &path; - let handle = &index_handle; - - let mut stream = futures::stream::iter(uuids.iter()) - .map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone())) - .buffer_unordered(CONCURRENT_INDEX_MSG / 3); - - Handle::current().block_on(async { - while let Some(res) = stream.next().await { - res?; - } - Ok(()) - }) + update_store.dump(&uuids, path.to_path_buf(), index_handle)?; + Ok(()) }) .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; - + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; Ok(()) } diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index f3d7dfd0a..524fefe84 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -16,10 +16,11 @@ use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; +use futures::StreamExt; use super::UpdateMeta; use crate::helpers::EnvSizer; -use crate::index_controller::{IndexActorHandle, updates::*}; +use crate::index_controller::{IndexActorHandle, updates::*, index_actor::CONCURRENT_INDEX_MSG}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; @@ -519,7 +520,12 @@ impl UpdateStore { Ok(()) } - pub fn dump(&self, uuids: &HashSet<(String, Uuid)>, path: PathBuf) -> anyhow::Result<()> { + pub fn dump( + &self, + uuids: &HashSet<(String, Uuid)>, + path: PathBuf, + handle: impl IndexActorHandle + ) -> anyhow::Result<()> { use std::io::prelude::*; let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -555,7 +561,21 @@ impl UpdateStore { } } - Ok(()) + + // Perform the dump of each index concurently. Only a third of the capabilities of + // the index actor at a time not to put too much pressure on the index actor + let path = &path; + + let mut stream = futures::stream::iter(uuids.iter()) + .map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone())) + .buffer_unordered(CONCURRENT_INDEX_MSG / 3); + + Handle::current().block_on(async { + while let Some(res) = stream.next().await { + res?; + } + Ok(()) + }) } pub fn get_info(&self) -> anyhow::Result {