fix dump lock

This commit is contained in:
Marin Postma 2021-05-05 19:06:07 +02:00 committed by tamo
parent 24192fc550
commit 956012da95
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
2 changed files with 27 additions and 23 deletions

View File

@ -239,28 +239,12 @@ where
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
let update_store = self.store.clone(); let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> { tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
update_store.dump(&uuids, path.to_path_buf())?; update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(())
// Perform the dump of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let handle = &index_handle;
let mut stream = futures::stream::iter(uuids.iter())
.map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
}) })
.await .await
.map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?; .map_err(|e| UpdateError::Error(e.into()))?;
Ok(()) Ok(())
} }

View File

@ -16,10 +16,11 @@ use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use futures::StreamExt;
use super::UpdateMeta; use super::UpdateMeta;
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
use crate::index_controller::{IndexActorHandle, updates::*}; use crate::index_controller::{IndexActorHandle, updates::*, index_actor::CONCURRENT_INDEX_MSG};
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>; type BEU64 = U64<heed::byteorder::BE>;
@ -519,7 +520,12 @@ impl UpdateStore {
Ok(()) Ok(())
} }
pub fn dump(&self, uuids: &HashSet<(String, Uuid)>, path: PathBuf) -> anyhow::Result<()> { pub fn dump(
&self,
uuids: &HashSet<(String, Uuid)>,
path: PathBuf,
handle: impl IndexActorHandle
) -> anyhow::Result<()> {
use std::io::prelude::*; use std::io::prelude::*;
let state_lock = self.state.write(); let state_lock = self.state.write();
state_lock.swap(State::Dumping); state_lock.swap(State::Dumping);
@ -555,7 +561,21 @@ impl UpdateStore {
} }
} }
Ok(())
// Perform the dump of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let mut stream = futures::stream::iter(uuids.iter())
.map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
} }
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> { pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {